初始化计算引擎

This commit is contained in:
谷成伟 2024-10-31 17:46:56 +08:00
parent 1033f63006
commit 3be8531fe6
9 changed files with 483 additions and 1 deletions

View File

@ -29,6 +29,8 @@
<undertow.version>2.3.14.Final</undertow.version> <undertow.version>2.3.14.Final</undertow.version>
<apache.poi>5.3.0</apache.poi> <apache.poi>5.3.0</apache.poi>
<taosdata.verson>3.2.10</taosdata.verson> <taosdata.verson>3.2.10</taosdata.verson>
<disruptor.version>3.4.4</disruptor.version>
<aviator.version>5.4.3</aviator.version>
</properties> </properties>
<dependencies> <dependencies>
@ -53,6 +55,11 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId> <artifactId>spring-boot-starter-websocket</artifactId>
</dependency> </dependency>
<!-- 定时任务支持-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<!--spring 配置支持--> <!--spring 配置支持-->
<dependency> <dependency>
@ -160,9 +167,14 @@
<dependency> <dependency>
<groupId>com.lmax</groupId> <groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId> <artifactId>disruptor</artifactId>
<version>3.4.4</version> <version>${disruptor.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.googlecode.aviator</groupId>
<artifactId>aviator</artifactId>
<version>${aviator.version}</version>
</dependency>
<!--aop切面--> <!--aop切面-->
<dependency> <dependency>

View File

@ -0,0 +1,18 @@
package com.das.modules.calc.domain.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.util.Date;
@TableName("sys_calc_module")
@Data
public class CalcModule {
private String name;
private String script;
private String version;
private String description;
private Integer disabled;
private Date updateTime;
private String cron;
}

View File

@ -0,0 +1,14 @@
package com.das.modules.calc.domain.vo;
import lombok.Data;
/**
* 设备缓存数据
*/
@Data
public class DeviceInfoCache {
private Long deviceId;
private String deviceCode;
private Integer objectType;
private Long parentDeviceId;
}

View File

@ -0,0 +1,58 @@
package com.das.modules.calc.functions;
import com.das.modules.calc.domain.vo.DeviceInfoCache;
import com.das.modules.calc.service.CacheService;
import com.das.modules.data.domain.SnapshotValueQueryParam;
import com.das.modules.data.service.DataService;
import com.googlecode.aviator.runtime.function.AbstractFunction;
import com.googlecode.aviator.runtime.type.*;
import com.googlecode.aviator.utils.Env;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Aviator扩展函数 - 获取设备实时数据
* 函数格式: rt(deviceId, attrs)
*
*/
@Slf4j
public class FunctionRealData extends AbstractFunction {
private DataService dataService = null;
private CacheService cacheService = null;
public FunctionRealData(DataService dataService, CacheService cacheService) {
this.dataService = dataService;
this.cacheService = cacheService;
}
@Override
public String getName() {
return "rt";
}
@Override
public AviatorObject call(Map<String, Object> env, AviatorObject deviceCode, AviatorObject attributes) {
//设备Code
String code = (String)deviceCode.getValue(env);
DeviceInfoCache deviceInfoCache = cacheService.getDeviceInfoCache(code);
//属性列表
List<String> list = (List<String>)attributes.getValue(env);
List<String> attrs = list.stream().map(String::toLowerCase).toList();
SnapshotValueQueryParam snapshotValueQueryParam = new SnapshotValueQueryParam();
snapshotValueQueryParam.setDeviceId(String.valueOf(deviceInfoCache.getDeviceId()));
snapshotValueQueryParam.setAttributes(attrs);
Map<String, Map<String, Object>> stringMapMap = dataService.querySnapshotValues(List.of(snapshotValueQueryParam));
Map<String,Object> data = stringMapMap.get(String.valueOf(deviceInfoCache.getDeviceId()));
Map<String,Object> result = new HashMap<>(list.size());
for (String s : list) {
String key = s.toLowerCase();
Object value = data.get(key);
result.put(s, value);
}
return AviatorRuntimeJavaType.valueOf(result);
}
}

View File

@ -0,0 +1,85 @@
package com.das.modules.calc.functions;
import com.das.modules.calc.domain.vo.DeviceInfoCache;
import com.das.modules.calc.service.CacheService;
import com.das.modules.data.domain.SnapshotValueQueryParam;
import com.das.modules.data.service.DataService;
import com.das.modules.node.domain.bo.CalculateRTData;
import com.googlecode.aviator.runtime.function.AbstractFunction;
import com.googlecode.aviator.runtime.function.AbstractVariadicFunction;
import com.googlecode.aviator.runtime.function.FunctionUtils;
import com.googlecode.aviator.runtime.type.AviatorObject;
import com.googlecode.aviator.runtime.type.AviatorRuntimeJavaType;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
/**
* Aviator扩展函数 - 获取设备实时数据
* 函数格式: rt(deviceId, attrs)
*
*/
@Slf4j
public class FunctionSaveCalcData extends AbstractVariadicFunction {
private DataService dataService = null;
private CacheService cacheService = null;
public FunctionSaveCalcData(DataService dataService, CacheService cacheService) {
this.dataService = dataService;
this.cacheService = cacheService;
}
@Override
public String getName() {
return "save";
}
// @Override
// public AviatorObject call(Map<String, Object> env, AviatorObject deviceCode, AviatorObject attributes) {
// //设备Code
// String code = (String)deviceCode.getValue(env);
// DeviceInfoCache deviceInfoCache = cacheService.getDeviceInfoCache(code);
// //属性列表
// List<String> list = (List<String>)attributes.getValue(env);
// List<String> attrs = list.stream().map(String::toLowerCase).toList();
// SnapshotValueQueryParam snapshotValueQueryParam = new SnapshotValueQueryParam();
// snapshotValueQueryParam.setDeviceId(String.valueOf(deviceInfoCache.getDeviceId()));
// snapshotValueQueryParam.setAttributes(attrs);
// Map<String, Map<String, Object>> stringMapMap = dataService.querySnapshotValues(List.of(snapshotValueQueryParam));
// Map<String,Object> data = stringMapMap.get(String.valueOf(deviceInfoCache.getDeviceId()));
// log.info("{}", data);
// Map<String,Object> result = new HashMap<>(list.size());
// for (String s : list) {
// String key = s.toLowerCase();
// Object value = data.get(key);
// result.put(s, value);
// }
// return AviatorRuntimeJavaType.valueOf(result);
// }
@Override
public AviatorObject variadicCall(Map<String, Object> env, AviatorObject... args) {
if (args.length < 4) { return AviatorRuntimeJavaType.valueOf(1);}
if ( (args.length - 1) % 3 != 0) { return AviatorRuntimeJavaType.valueOf(1);}
//deviceCode
String code = (String)args[0].getValue(env);
DeviceInfoCache deviceInfoCache = cacheService.getDeviceInfoCache(code);
List<CalculateRTData> dataList = new ArrayList<>();
for (int i = 1; i < args.length; i+=3) {
Date date = (Date)FunctionUtils.getJavaObject(args[i], env);
String attr = (String)args[i+1].getValue(env);
Object value = args[i+2].getValue(env);
CalculateRTData dt = new CalculateRTData();
dt.setDeviceId(deviceInfoCache.getDeviceId());
dt.setDataTime(date.getTime());
dt.setDataValue(value);
dt.setIotModelField(attr);
dataList.add(dt);
}
// dataService.updateCalFieldData(dataList);
return AviatorRuntimeJavaType.valueOf(0);
}
}

View File

@ -0,0 +1,7 @@
package com.das.modules.calc.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.das.modules.calc.domain.entity.CalcModule;
public interface CalcModuleMapper extends BaseMapper<CalcModule> {
}

View File

@ -0,0 +1,119 @@
package com.das.modules.calc.service;
import com.das.modules.calc.domain.vo.DeviceInfoCache;
import com.das.modules.equipment.entity.SysEquipment;
import com.das.modules.equipment.mapper.SysEquipmentMapper;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
* 计算缓存服务
* 用于缓存常用数据供计算服务使用
*/
@Service
public class CacheService {
@Autowired
SysEquipmentMapper sysEquipmentMapper;
/**
* 缓存初始化
*/
@PostConstruct
public void init() {
initDeviceInfoCaches();
}
/**
* 缓存释放
*/
@PreDestroy
public void destroy() {
freeDeviceInfoCaches();
}
///-设备缓存----------------------------------------------------------------
/**
* 设备缓存信息
*/
private final List<DeviceInfoCache> deviceInfoCaches = new ArrayList<DeviceInfoCache>();
/**
* 设备CODE索引用于通过设备CODE访问设备缓存信息
*/
private final ConcurrentHashMap<String, Integer> deviceCodeIndex = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, Integer> deviceIdIndex = new ConcurrentHashMap<>();
private DeviceInfoCache windFarmCache = null;
/**
* 初始化设备缓存信息
*/
private void initDeviceInfoCaches() {
List<SysEquipment> equipments = sysEquipmentMapper.selectList();
for (int i = 0; i < equipments.size(); i++) {
SysEquipment equipment = equipments.get(i);
DeviceInfoCache deviceInfoCache = new DeviceInfoCache();
deviceInfoCache.setDeviceId(equipment.getId());
deviceInfoCache.setDeviceCode(equipment.getCode());
deviceInfoCache.setObjectType(equipment.getObjectType());
deviceInfoCache.setParentDeviceId(equipment.getParentEquipmentId());
deviceInfoCaches.add(deviceInfoCache);
//创建Code索引
deviceCodeIndex.put(deviceInfoCache.getDeviceCode(),i);
//创建Id索引
deviceIdIndex.put(equipment.getId(),i);
//关联风场缓存
// if (equipment.getObjectType().equals()
}
}
/**
* 释放设备缓存信息
*/
private void freeDeviceInfoCaches() {
deviceInfoCaches.clear();
deviceCodeIndex.clear();
}
/**
* 返回设备缓存信息列表
* @return
*/
public List<DeviceInfoCache> getDevicesCache() {
return deviceInfoCaches;
}
/**
* 根据设备CODE返回设备缓存信息
* @param deviceCode
* @return
*/
public DeviceInfoCache getDeviceInfoCache(String deviceCode) {
Integer index = deviceCodeIndex.get(deviceCode);
if (index != null) {
return deviceInfoCaches.get(index);
}
return null;
}
/**
* 根据设备IRN返回设备缓存信息
* @param deviceId
* @return
*/
public DeviceInfoCache getDeviceInfoCache(Long deviceId) {
Integer index = deviceIdIndex.get(deviceId);
if (index != null) {
return deviceInfoCaches.get(index);
}
return null;
}
///-设备缓存=END---------------------------------------------------------------
}

View File

@ -0,0 +1,44 @@
package com.das.modules.calc.service;
import com.das.modules.calc.domain.entity.CalcModule;
import com.googlecode.aviator.AviatorEvaluatorInstance;
import com.googlecode.aviator.Expression;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.util.StopWatch;
import java.util.Map;
@Slf4j
public class CalcJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
try{
StopWatch sw = new StopWatch(jobExecutionContext.getJobDetail().getKey().getName());
sw.start();
JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
AviatorEvaluatorInstance instance = (AviatorEvaluatorInstance) dataMap.get("aviator");
CalcModule calcModule = (CalcModule) dataMap.get("module");
CacheService cacheService = (CacheService) dataMap.get("cache");
if (instance == null || calcModule == null || cacheService == null) {
throw new JobExecutionException("calcModule is null");
}
Expression expression = instance.getCachedExpressionByKey(calcModule.getName());
if (expression == null) {
throw new JobExecutionException("expression is null");
}
Map<String,Object> envs = expression.newEnv("G_DEVICES", cacheService.getDevicesCache());
Object result = expression.execute(envs);
sw.stop();
log.debug("任务[{}]已执行,结果:{}, 耗时:{}秒", calcModule.getName(), result, sw.getTotalTimeMillis()/1000.0);
}
catch (Exception e){
log.error(String.format("计算模块[%s]执行失败", jobExecutionContext.getJobDetail().getKey()), e);
}
}
}

View File

@ -0,0 +1,125 @@
package com.das.modules.calc.service;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.das.modules.calc.domain.entity.CalcModule;
import com.das.modules.calc.functions.FunctionRealData;
import com.das.modules.calc.functions.FunctionSaveCalcData;
import com.das.modules.calc.mapper.CalcModuleMapper;
import com.das.modules.data.service.DataService;
import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.AviatorEvaluatorInstance;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* 计算引擎服务
* 负责脚本的维护脚本执行
*/
@Slf4j
@Service
public class CalcService {
@Autowired
CalcModuleMapper calcModuleMapper;
@Autowired
SchedulerFactoryBean quartzScheduler;
@Autowired
CacheService cacheService;
@Autowired
DataService dataService;
/**
* 计算引擎实例
*/
private final AviatorEvaluatorInstance aviator = AviatorEvaluator.getInstance();
/**
* 启动计算任务
* @param scriptModule
* @throws SchedulerException
*/
public void startCalcJob(CalcModule scriptModule) throws SchedulerException {
Scheduler sh = quartzScheduler.getScheduler();
JobDataMap dataMap = new JobDataMap();
dataMap.put("module", scriptModule);
dataMap.put("aviator", aviator);
dataMap.put("cache", cacheService);
JobKey jobKey = JobKey.jobKey(scriptModule.getName(), "CalcEngine");
JobDetail jobDetail = JobBuilder
.newJob(CalcJob.class)
.setJobData(dataMap)
.withIdentity(jobKey)
.build();
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(jobKey.getName(),jobKey.getGroup())
.withSchedule(CronScheduleBuilder.cronSchedule(scriptModule.getCron()))
.build();
sh.scheduleJob(jobDetail, trigger);
}
/**
* 停止计算任务
* @param scriptModule
* @throws SchedulerException
*/
public void stopCalcJob(CalcModule scriptModule) throws SchedulerException {
Scheduler sh = quartzScheduler.getScheduler();
JobKey jobKey = JobKey.jobKey(scriptModule.getName(), "CalcEngine");
sh.deleteJob(jobKey);
}
public void addExtendFunctions(){
//获取设备实时数据 - rt( code, attrs )
FunctionRealData rt = new FunctionRealData( dataService, cacheService);
aviator.addFunction(rt);
//
FunctionSaveCalcData save = new FunctionSaveCalcData(dataService, cacheService);
aviator.addFunction(save);
}
/**
* 计算引擎初始化
*/
@PostConstruct
public void init(){
//从数据库读取未禁用脚本模块
LambdaQueryWrapper<CalcModule> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(CalcModule::getDisabled, 0);
List<CalcModule> scriptModules = calcModuleMapper.selectList(queryWrapper);
//为脚本引擎增加自定义函数
addExtendFunctions();
//初始化脚本引擎,预编译并缓存脚本
for (CalcModule scriptModule : scriptModules) {
try{
//预编译脚本
aviator.compile(scriptModule.getName(), scriptModule.getScript(), true);
startCalcJob(scriptModule);
}
catch (Exception e){
log.error("预编译脚本时出错", e);
}
}
//
}
/**
* 计算引擎释放资源
*/
@PreDestroy
public void free() throws SchedulerException {
quartzScheduler.destroy();
}
}