diff --git a/das/pom.xml b/das/pom.xml index bd497c05..29a25bc8 100644 --- a/das/pom.xml +++ b/das/pom.xml @@ -29,6 +29,8 @@ 2.3.14.Final 5.3.0 3.2.10 + 3.4.4 + 5.4.3 @@ -53,6 +55,11 @@ org.springframework.boot spring-boot-starter-websocket + + + org.springframework.boot + spring-boot-starter-quartz + @@ -160,9 +167,14 @@ com.lmax disruptor - 3.4.4 + ${disruptor.version} + + com.googlecode.aviator + aviator + ${aviator.version} + diff --git a/das/src/main/java/com/das/modules/calc/domain/entity/CalcModule.java b/das/src/main/java/com/das/modules/calc/domain/entity/CalcModule.java new file mode 100644 index 00000000..26ea313c --- /dev/null +++ b/das/src/main/java/com/das/modules/calc/domain/entity/CalcModule.java @@ -0,0 +1,18 @@ +package com.das.modules.calc.domain.entity; + +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.util.Date; + +@TableName("sys_calc_module") +@Data +public class CalcModule { + private String name; + private String script; + private String version; + private String description; + private Integer disabled; + private Date updateTime; + private String cron; +} diff --git a/das/src/main/java/com/das/modules/calc/domain/vo/DeviceInfoCache.java b/das/src/main/java/com/das/modules/calc/domain/vo/DeviceInfoCache.java new file mode 100644 index 00000000..fba18494 --- /dev/null +++ b/das/src/main/java/com/das/modules/calc/domain/vo/DeviceInfoCache.java @@ -0,0 +1,14 @@ +package com.das.modules.calc.domain.vo; + +import lombok.Data; + +/** + * 设备缓存数据 + */ +@Data +public class DeviceInfoCache { + private Long deviceId; + private String deviceCode; + private Integer objectType; + private Long parentDeviceId; +} diff --git a/das/src/main/java/com/das/modules/calc/functions/FunctionRealData.java b/das/src/main/java/com/das/modules/calc/functions/FunctionRealData.java new file mode 100644 index 00000000..6fb013c5 --- /dev/null +++ b/das/src/main/java/com/das/modules/calc/functions/FunctionRealData.java @@ -0,0 +1,58 @@ +package com.das.modules.calc.functions; + +import com.das.modules.calc.domain.vo.DeviceInfoCache; +import com.das.modules.calc.service.CacheService; +import com.das.modules.data.domain.SnapshotValueQueryParam; +import com.das.modules.data.service.DataService; +import com.googlecode.aviator.runtime.function.AbstractFunction; +import com.googlecode.aviator.runtime.type.*; +import com.googlecode.aviator.utils.Env; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Aviator扩展函数 - 获取设备实时数据 + * 函数格式: rt(deviceId, attrs) + * + */ +@Slf4j +public class FunctionRealData extends AbstractFunction { + + private DataService dataService = null; + private CacheService cacheService = null; + + public FunctionRealData(DataService dataService, CacheService cacheService) { + this.dataService = dataService; + this.cacheService = cacheService; + } + + @Override + public String getName() { + return "rt"; + } + + @Override + public AviatorObject call(Map env, AviatorObject deviceCode, AviatorObject attributes) { + //设备Code + String code = (String)deviceCode.getValue(env); + DeviceInfoCache deviceInfoCache = cacheService.getDeviceInfoCache(code); + //属性列表 + List list = (List)attributes.getValue(env); + List attrs = list.stream().map(String::toLowerCase).toList(); + SnapshotValueQueryParam snapshotValueQueryParam = new SnapshotValueQueryParam(); + snapshotValueQueryParam.setDeviceId(String.valueOf(deviceInfoCache.getDeviceId())); + snapshotValueQueryParam.setAttributes(attrs); + Map> stringMapMap = dataService.querySnapshotValues(List.of(snapshotValueQueryParam)); + Map data = stringMapMap.get(String.valueOf(deviceInfoCache.getDeviceId())); + Map result = new HashMap<>(list.size()); + for (String s : list) { + String key = s.toLowerCase(); + Object value = data.get(key); + result.put(s, value); + } + return AviatorRuntimeJavaType.valueOf(result); + } +} diff --git a/das/src/main/java/com/das/modules/calc/functions/FunctionSaveCalcData.java b/das/src/main/java/com/das/modules/calc/functions/FunctionSaveCalcData.java new file mode 100644 index 00000000..4601a90f --- /dev/null +++ b/das/src/main/java/com/das/modules/calc/functions/FunctionSaveCalcData.java @@ -0,0 +1,85 @@ +package com.das.modules.calc.functions; + +import com.das.modules.calc.domain.vo.DeviceInfoCache; +import com.das.modules.calc.service.CacheService; +import com.das.modules.data.domain.SnapshotValueQueryParam; +import com.das.modules.data.service.DataService; +import com.das.modules.node.domain.bo.CalculateRTData; +import com.googlecode.aviator.runtime.function.AbstractFunction; +import com.googlecode.aviator.runtime.function.AbstractVariadicFunction; +import com.googlecode.aviator.runtime.function.FunctionUtils; +import com.googlecode.aviator.runtime.type.AviatorObject; +import com.googlecode.aviator.runtime.type.AviatorRuntimeJavaType; +import lombok.extern.slf4j.Slf4j; + +import java.util.*; + +/** + * Aviator扩展函数 - 获取设备实时数据 + * 函数格式: rt(deviceId, attrs) + * + */ +@Slf4j +public class FunctionSaveCalcData extends AbstractVariadicFunction { + + private DataService dataService = null; + private CacheService cacheService = null; + + public FunctionSaveCalcData(DataService dataService, CacheService cacheService) { + this.dataService = dataService; + this.cacheService = cacheService; + } + + @Override + public String getName() { + return "save"; + } + +// @Override +// public AviatorObject call(Map env, AviatorObject deviceCode, AviatorObject attributes) { +// //设备Code +// String code = (String)deviceCode.getValue(env); +// DeviceInfoCache deviceInfoCache = cacheService.getDeviceInfoCache(code); +// //属性列表 +// List list = (List)attributes.getValue(env); +// List attrs = list.stream().map(String::toLowerCase).toList(); +// SnapshotValueQueryParam snapshotValueQueryParam = new SnapshotValueQueryParam(); +// snapshotValueQueryParam.setDeviceId(String.valueOf(deviceInfoCache.getDeviceId())); +// snapshotValueQueryParam.setAttributes(attrs); +// Map> stringMapMap = dataService.querySnapshotValues(List.of(snapshotValueQueryParam)); +// Map data = stringMapMap.get(String.valueOf(deviceInfoCache.getDeviceId())); +// log.info("{}", data); +// Map result = new HashMap<>(list.size()); +// for (String s : list) { +// String key = s.toLowerCase(); +// Object value = data.get(key); +// result.put(s, value); +// } +// return AviatorRuntimeJavaType.valueOf(result); +// } + + @Override + public AviatorObject variadicCall(Map env, AviatorObject... args) { + if (args.length < 4) { return AviatorRuntimeJavaType.valueOf(1);} + if ( (args.length - 1) % 3 != 0) { return AviatorRuntimeJavaType.valueOf(1);} + //deviceCode + String code = (String)args[0].getValue(env); + DeviceInfoCache deviceInfoCache = cacheService.getDeviceInfoCache(code); + List dataList = new ArrayList<>(); + for (int i = 1; i < args.length; i+=3) { + Date date = (Date)FunctionUtils.getJavaObject(args[i], env); + String attr = (String)args[i+1].getValue(env); + Object value = args[i+2].getValue(env); + + CalculateRTData dt = new CalculateRTData(); + dt.setDeviceId(deviceInfoCache.getDeviceId()); + dt.setDataTime(date.getTime()); + dt.setDataValue(value); + dt.setIotModelField(attr); + dataList.add(dt); + } + +// dataService.updateCalFieldData(dataList); + return AviatorRuntimeJavaType.valueOf(0); + } +} diff --git a/das/src/main/java/com/das/modules/calc/mapper/CalcModuleMapper.java b/das/src/main/java/com/das/modules/calc/mapper/CalcModuleMapper.java new file mode 100644 index 00000000..50a1ba25 --- /dev/null +++ b/das/src/main/java/com/das/modules/calc/mapper/CalcModuleMapper.java @@ -0,0 +1,7 @@ +package com.das.modules.calc.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.das.modules.calc.domain.entity.CalcModule; + +public interface CalcModuleMapper extends BaseMapper { +} diff --git a/das/src/main/java/com/das/modules/calc/service/CacheService.java b/das/src/main/java/com/das/modules/calc/service/CacheService.java new file mode 100644 index 00000000..3f74999f --- /dev/null +++ b/das/src/main/java/com/das/modules/calc/service/CacheService.java @@ -0,0 +1,119 @@ +package com.das.modules.calc.service; + +import com.das.modules.calc.domain.vo.DeviceInfoCache; +import com.das.modules.equipment.entity.SysEquipment; +import com.das.modules.equipment.mapper.SysEquipmentMapper; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 计算缓存服务 + * 用于缓存常用数据,供计算服务使用 + */ +@Service +public class CacheService { + + @Autowired + SysEquipmentMapper sysEquipmentMapper; + + /** + * 缓存初始化 + */ + @PostConstruct + public void init() { + initDeviceInfoCaches(); + } + + /** + * 缓存释放 + */ + @PreDestroy + public void destroy() { + freeDeviceInfoCaches(); + } + + ///-设备缓存---------------------------------------------------------------- + /** + * 设备缓存信息 + */ + private final List deviceInfoCaches = new ArrayList(); + + /** + * 设备CODE索引,用于通过设备CODE访问设备缓存信息 + */ + private final ConcurrentHashMap deviceCodeIndex = new ConcurrentHashMap<>(); + private final ConcurrentHashMap deviceIdIndex = new ConcurrentHashMap<>(); + + private DeviceInfoCache windFarmCache = null; + /** + * 初始化设备缓存信息 + */ + private void initDeviceInfoCaches() { + List equipments = sysEquipmentMapper.selectList(); + for (int i = 0; i < equipments.size(); i++) { + SysEquipment equipment = equipments.get(i); + DeviceInfoCache deviceInfoCache = new DeviceInfoCache(); + deviceInfoCache.setDeviceId(equipment.getId()); + deviceInfoCache.setDeviceCode(equipment.getCode()); + deviceInfoCache.setObjectType(equipment.getObjectType()); + deviceInfoCache.setParentDeviceId(equipment.getParentEquipmentId()); + deviceInfoCaches.add(deviceInfoCache); + //创建Code索引 + deviceCodeIndex.put(deviceInfoCache.getDeviceCode(),i); + //创建Id索引 + deviceIdIndex.put(equipment.getId(),i); + //关联风场缓存 +// if (equipment.getObjectType().equals() + } + } + + /** + * 释放设备缓存信息 + */ + private void freeDeviceInfoCaches() { + deviceInfoCaches.clear(); + deviceCodeIndex.clear(); + } + + /** + * 返回设备缓存信息列表 + * @return + */ + public List getDevicesCache() { + return deviceInfoCaches; + } + + /** + * 根据设备CODE返回设备缓存信息 + * @param deviceCode + * @return + */ + public DeviceInfoCache getDeviceInfoCache(String deviceCode) { + Integer index = deviceCodeIndex.get(deviceCode); + if (index != null) { + return deviceInfoCaches.get(index); + } + return null; + } + + /** + * 根据设备IRN返回设备缓存信息 + * @param deviceId + * @return + */ + public DeviceInfoCache getDeviceInfoCache(Long deviceId) { + Integer index = deviceIdIndex.get(deviceId); + if (index != null) { + return deviceInfoCaches.get(index); + } + return null; + } + + ///-设备缓存=END--------------------------------------------------------------- +} diff --git a/das/src/main/java/com/das/modules/calc/service/CalcJob.java b/das/src/main/java/com/das/modules/calc/service/CalcJob.java new file mode 100644 index 00000000..104cde7e --- /dev/null +++ b/das/src/main/java/com/das/modules/calc/service/CalcJob.java @@ -0,0 +1,44 @@ +package com.das.modules.calc.service; + +import com.das.modules.calc.domain.entity.CalcModule; +import com.googlecode.aviator.AviatorEvaluatorInstance; +import com.googlecode.aviator.Expression; +import lombok.extern.slf4j.Slf4j; +import org.quartz.Job; +import org.quartz.JobDataMap; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.springframework.util.StopWatch; + +import java.util.Map; + +@Slf4j +public class CalcJob implements Job { + + @Override + public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { + try{ + StopWatch sw = new StopWatch(jobExecutionContext.getJobDetail().getKey().getName()); + sw.start(); + JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap(); + AviatorEvaluatorInstance instance = (AviatorEvaluatorInstance) dataMap.get("aviator"); + CalcModule calcModule = (CalcModule) dataMap.get("module"); + CacheService cacheService = (CacheService) dataMap.get("cache"); + if (instance == null || calcModule == null || cacheService == null) { + throw new JobExecutionException("calcModule is null"); + } + Expression expression = instance.getCachedExpressionByKey(calcModule.getName()); + if (expression == null) { + throw new JobExecutionException("expression is null"); + } + Map envs = expression.newEnv("G_DEVICES", cacheService.getDevicesCache()); + Object result = expression.execute(envs); + sw.stop(); + log.debug("任务[{}]已执行,结果:{}, 耗时:{}秒", calcModule.getName(), result, sw.getTotalTimeMillis()/1000.0); + } + catch (Exception e){ + log.error(String.format("计算模块[%s]执行失败", jobExecutionContext.getJobDetail().getKey()), e); + } + + } +} diff --git a/das/src/main/java/com/das/modules/calc/service/CalcService.java b/das/src/main/java/com/das/modules/calc/service/CalcService.java new file mode 100644 index 00000000..6b8bba3e --- /dev/null +++ b/das/src/main/java/com/das/modules/calc/service/CalcService.java @@ -0,0 +1,125 @@ +package com.das.modules.calc.service; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.das.modules.calc.domain.entity.CalcModule; +import com.das.modules.calc.functions.FunctionRealData; +import com.das.modules.calc.functions.FunctionSaveCalcData; +import com.das.modules.calc.mapper.CalcModuleMapper; +import com.das.modules.data.service.DataService; +import com.googlecode.aviator.AviatorEvaluator; +import com.googlecode.aviator.AviatorEvaluatorInstance; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.quartz.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.quartz.SchedulerFactoryBean; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * 计算引擎服务 + * 负责脚本的维护,脚本执行 + */ +@Slf4j +@Service +public class CalcService { + + @Autowired + CalcModuleMapper calcModuleMapper; + + @Autowired + SchedulerFactoryBean quartzScheduler; + + @Autowired + CacheService cacheService; + + @Autowired + DataService dataService; + + /** + * 计算引擎实例 + */ + private final AviatorEvaluatorInstance aviator = AviatorEvaluator.getInstance(); + + + /** + * 启动计算任务 + * @param scriptModule + * @throws SchedulerException + */ + public void startCalcJob(CalcModule scriptModule) throws SchedulerException { + Scheduler sh = quartzScheduler.getScheduler(); + JobDataMap dataMap = new JobDataMap(); + dataMap.put("module", scriptModule); + dataMap.put("aviator", aviator); + dataMap.put("cache", cacheService); + JobKey jobKey = JobKey.jobKey(scriptModule.getName(), "CalcEngine"); + JobDetail jobDetail = JobBuilder + .newJob(CalcJob.class) + .setJobData(dataMap) + .withIdentity(jobKey) + .build(); + Trigger trigger = TriggerBuilder.newTrigger() + .withIdentity(jobKey.getName(),jobKey.getGroup()) + .withSchedule(CronScheduleBuilder.cronSchedule(scriptModule.getCron())) + .build(); + sh.scheduleJob(jobDetail, trigger); + } + + /** + * 停止计算任务 + * @param scriptModule + * @throws SchedulerException + */ + public void stopCalcJob(CalcModule scriptModule) throws SchedulerException { + Scheduler sh = quartzScheduler.getScheduler(); + JobKey jobKey = JobKey.jobKey(scriptModule.getName(), "CalcEngine"); + sh.deleteJob(jobKey); + } + + public void addExtendFunctions(){ + //获取设备实时数据 - rt( code, attrs ) + FunctionRealData rt = new FunctionRealData( dataService, cacheService); + aviator.addFunction(rt); + // + FunctionSaveCalcData save = new FunctionSaveCalcData(dataService, cacheService); + aviator.addFunction(save); + } + + /** + * 计算引擎初始化 + */ + @PostConstruct + public void init(){ + //从数据库读取未禁用脚本模块 + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(CalcModule::getDisabled, 0); + List scriptModules = calcModuleMapper.selectList(queryWrapper); + //为脚本引擎增加自定义函数 + addExtendFunctions(); + //初始化脚本引擎,预编译并缓存脚本 + for (CalcModule scriptModule : scriptModules) { + try{ + //预编译脚本 + aviator.compile(scriptModule.getName(), scriptModule.getScript(), true); + startCalcJob(scriptModule); + } + catch (Exception e){ + log.error("预编译脚本时出错", e); + } + } + // + } + + /** + * 计算引擎释放资源 + */ + @PreDestroy + public void free() throws SchedulerException { + quartzScheduler.destroy(); + } + + +}