From c8c9260deebde334c8d6ff1d204a370f5b881d4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=B7=E6=88=90=E4=BC=9F?= Date: Tue, 26 Nov 2024 14:39:34 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 4 - .../cache/domain/IotFieldInfoCache.java | 13 +++ .../cache/domain/IotModelInfoCache.java | 12 +++ .../modules/cache/service/CacheService.java | 6 ++ .../modules/cache/service/EquipmentCache.java | 6 ++ .../modules/cache/service/IotModelCache.java | 5 + .../cache/service/impl/CacheServiceImpl.java | 13 ++- .../service/impl/EquipmentCacheImpl.java | 11 ++- .../cache/service/impl/IotModelCacheImpl.java | 24 +++++ .../calc/functions/FunctionTopValue.java | 91 +++++++++++++++++++ .../com/das/modules/calc/service/CalcJob.java | 1 + .../das/modules/calc/service/CalcService.java | 1 + .../com/das/modules/data/domain/RTValue.java | 9 ++ .../das/modules/data/service/DataService.java | 3 +- .../modules/data/service/TDEngineService.java | 24 +++++ .../data/service/impl/DataServiceImpl.java | 26 +++++- .../service/impl/SysEquipmentServiceImpl.java | 23 ++++- das/src/main/resources/application-dev.yml | 6 +- 18 files changed, 262 insertions(+), 16 deletions(-) delete mode 100644 .gitignore create mode 100644 das/src/main/java/com/das/modules/cache/domain/IotFieldInfoCache.java create mode 100644 das/src/main/java/com/das/modules/cache/domain/IotModelInfoCache.java create mode 100644 das/src/main/java/com/das/modules/cache/service/IotModelCache.java create mode 100644 das/src/main/java/com/das/modules/cache/service/impl/IotModelCacheImpl.java create mode 100644 das/src/main/java/com/das/modules/calc/functions/FunctionTopValue.java create mode 100644 das/src/main/java/com/das/modules/data/domain/RTValue.java diff --git a/.gitignore b/.gitignore deleted file mode 100644 index e84b1eb7..00000000 --- a/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -.idea -das-dn/.vscode -ui/**/dist -das/**/target \ No newline at end of file diff --git a/das/src/main/java/com/das/modules/cache/domain/IotFieldInfoCache.java b/das/src/main/java/com/das/modules/cache/domain/IotFieldInfoCache.java new file mode 100644 index 00000000..4ec4bfac --- /dev/null +++ b/das/src/main/java/com/das/modules/cache/domain/IotFieldInfoCache.java @@ -0,0 +1,13 @@ +package com.das.modules.cache.domain; + +import lombok.Data; + +@Data +public class IotFieldInfoCache { + private String attributeCode; + private String attributeName; + private Integer attributeType; + private Integer porder; + private Integer highspeed; + private Integer datatype; +} diff --git a/das/src/main/java/com/das/modules/cache/domain/IotModelInfoCache.java b/das/src/main/java/com/das/modules/cache/domain/IotModelInfoCache.java new file mode 100644 index 00000000..17d0c6c4 --- /dev/null +++ b/das/src/main/java/com/das/modules/cache/domain/IotModelInfoCache.java @@ -0,0 +1,12 @@ +package com.das.modules.cache.domain; + +import lombok.Data; + +import java.util.concurrent.ConcurrentHashMap; + +@Data +public class IotModelInfoCache { + private Long iotModelId; + private String iodModelCode; + private ConcurrentHashMap fieldInfoCache; +} diff --git a/das/src/main/java/com/das/modules/cache/service/CacheService.java b/das/src/main/java/com/das/modules/cache/service/CacheService.java index 0e3763a0..9298c642 100644 --- a/das/src/main/java/com/das/modules/cache/service/CacheService.java +++ b/das/src/main/java/com/das/modules/cache/service/CacheService.java @@ -10,4 +10,10 @@ public interface CacheService { * @return */ EquipmentCache getEquipmentCache(); + + /** + * 获取物模型缓存接口 + * @return + */ + IotModelCache getIotModelCache(); } diff --git a/das/src/main/java/com/das/modules/cache/service/EquipmentCache.java b/das/src/main/java/com/das/modules/cache/service/EquipmentCache.java index 22dd6b4b..4a8dbe3b 100644 --- a/das/src/main/java/com/das/modules/cache/service/EquipmentCache.java +++ b/das/src/main/java/com/das/modules/cache/service/EquipmentCache.java @@ -33,4 +33,10 @@ public interface EquipmentCache { * @return */ DeviceInfoCache getDeviceInfoCacheById(Long deviceId); + + /** + * 移除指定设备缓存 + * @param deviceId + */ + void removeDeviceCache(Long deviceId); } diff --git a/das/src/main/java/com/das/modules/cache/service/IotModelCache.java b/das/src/main/java/com/das/modules/cache/service/IotModelCache.java new file mode 100644 index 00000000..29cd805a --- /dev/null +++ b/das/src/main/java/com/das/modules/cache/service/IotModelCache.java @@ -0,0 +1,5 @@ +package com.das.modules.cache.service; + +public interface IotModelCache { + +} diff --git a/das/src/main/java/com/das/modules/cache/service/impl/CacheServiceImpl.java b/das/src/main/java/com/das/modules/cache/service/impl/CacheServiceImpl.java index 0b0b669f..e22d5972 100644 --- a/das/src/main/java/com/das/modules/cache/service/impl/CacheServiceImpl.java +++ b/das/src/main/java/com/das/modules/cache/service/impl/CacheServiceImpl.java @@ -2,6 +2,7 @@ package com.das.modules.cache.service.impl; import com.das.modules.cache.service.CacheService; import com.das.modules.cache.service.EquipmentCache; +import com.das.modules.cache.service.IotModelCache; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -9,10 +10,20 @@ import org.springframework.stereotype.Service; public class CacheServiceImpl implements CacheService { @Autowired - EquipmentCache equipmentCache;; + EquipmentCache equipmentCache; + + @Autowired + IotModelCache iotModelCache; @Override public EquipmentCache getEquipmentCache() { return equipmentCache; } + + @Override + public IotModelCache getIotModelCache() { + return iotModelCache; + } + + } diff --git a/das/src/main/java/com/das/modules/cache/service/impl/EquipmentCacheImpl.java b/das/src/main/java/com/das/modules/cache/service/impl/EquipmentCacheImpl.java index 98fdbb67..d5baf009 100644 --- a/das/src/main/java/com/das/modules/cache/service/impl/EquipmentCacheImpl.java +++ b/das/src/main/java/com/das/modules/cache/service/impl/EquipmentCacheImpl.java @@ -15,7 +15,7 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; @Service -public class EquipmentCacheImpl implements EquipmentCache { +public abstract class EquipmentCacheImpl implements EquipmentCache { @Autowired SysEquipmentMapper sysEquipmentMapper; @@ -110,4 +110,13 @@ public class EquipmentCacheImpl implements EquipmentCache { } return null; } + + @Override + public void removeDeviceCache(Long deviceId) { + Integer index = deviceIdIndex.get(deviceId); + if (index != null) { + deviceInfoCaches.remove(index); + } + } + } diff --git a/das/src/main/java/com/das/modules/cache/service/impl/IotModelCacheImpl.java b/das/src/main/java/com/das/modules/cache/service/impl/IotModelCacheImpl.java new file mode 100644 index 00000000..7d26fe91 --- /dev/null +++ b/das/src/main/java/com/das/modules/cache/service/impl/IotModelCacheImpl.java @@ -0,0 +1,24 @@ +package com.das.modules.cache.service.impl; + +import com.das.modules.cache.service.IotModelCache; +import com.das.modules.equipment.mapper.SysIotModelMapper; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class IotModelCacheImpl implements IotModelCache { + + @Autowired + SysIotModelMapper sysIotModelMapper; + + @PostConstruct + public void init(){ + } + + @PreDestroy + public void destroy(){ + + } +} diff --git a/das/src/main/java/com/das/modules/calc/functions/FunctionTopValue.java b/das/src/main/java/com/das/modules/calc/functions/FunctionTopValue.java new file mode 100644 index 00000000..cfdd473b --- /dev/null +++ b/das/src/main/java/com/das/modules/calc/functions/FunctionTopValue.java @@ -0,0 +1,91 @@ +package com.das.modules.calc.functions; + +import cn.hutool.core.date.DateUnit; +import cn.hutool.core.date.DateUtil; +import com.das.modules.cache.domain.DeviceInfoCache; +import com.das.modules.cache.service.CacheService; +import com.das.modules.data.domain.SnapshotValueQueryParam; +import com.das.modules.data.service.DataService; +import com.googlecode.aviator.runtime.function.AbstractFunction; +import com.googlecode.aviator.runtime.type.AviatorNil; +import com.googlecode.aviator.runtime.type.AviatorObject; +import com.googlecode.aviator.runtime.type.AviatorRuntimeJavaType; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Aviator扩展函数 - 获取时间维度内最早的一条数据 + * 函数格式: topv(deviceId, attr, timedim) + * timedim: day - 天, month - 月, year - 年 + * 返回值:数值, nil - 获取错误 + */ +@Slf4j +public class FunctionTopValue extends AbstractFunction { + + private DataService dataService = null; + private CacheService cacheService = null; + + private ConcurrentHashMap cacheValues = new ConcurrentHashMap<>(); + + public FunctionTopValue(DataService dataService, CacheService cacheService) { + this.dataService = dataService; + this.cacheService = cacheService; + } + + @Override + public String getName() { + return "topv"; + } + + @Override + public AviatorObject call(Map env, AviatorObject deviceCode, AviatorObject attr, AviatorObject timeDim) { + //设备Code + String code = (String)deviceCode.getValue(env); + String attrName = (String)attr.getValue(env); + String timeDimName = (String)timeDim.getValue(env); + + DeviceInfoCache deviceInfoCache = cacheService.getEquipmentCache().getDeviceInfoCacheByCode(code); + if (deviceInfoCache == null) { + return AviatorNil.NIL; + } + String key = String.format("%d_%s_%s", deviceInfoCache.getDeviceId(), attrName, timeDimName); + //根据维度,获取维度时间 + Date curTimeValue = null; + switch (timeDimName) { + case "day": + curTimeValue = DateUtil.beginOfDay(DateUtil.date()); + break; + case "month": + curTimeValue = DateUtil.beginOfMonth(DateUtil.date()); + break; + case "year": + curTimeValue = DateUtil.beginOfYear(DateUtil.date()); + break; + default: + return AviatorNil.NIL; + } + CacheValue cacheValue = cacheValues.get(key); + if (cacheValue != null) { + //缓存中存在,检查是否过期 + if (cacheValue.getCurTimeValue() != null && DateUtil.compare(cacheValue.getCurTimeValue(), curTimeValue) == 0) { + return AviatorRuntimeJavaType.valueOf(cacheValue.value); + } + } + //未找到缓存,查询时序API获取数据 + dataService.querySnapshotValues() + + } + + @Data + class CacheValue{ + private Double value; + private Date curTimeValue; + + } +} diff --git a/das/src/main/java/com/das/modules/calc/service/CalcJob.java b/das/src/main/java/com/das/modules/calc/service/CalcJob.java index 5512c560..8015eff5 100644 --- a/das/src/main/java/com/das/modules/calc/service/CalcJob.java +++ b/das/src/main/java/com/das/modules/calc/service/CalcJob.java @@ -31,6 +31,7 @@ public class CalcJob implements Job { } Expression expression = instance.getCachedExpressionByKey(calcModule.getName()); if (expression == null) { + log.error("expression is null, calcModule={}", calcModule.getName()); throw new JobExecutionException("expression is null"); } Map envs = expression.newEnv("G_DEVICES", cacheService.getEquipmentCache().getDevicesCache()); diff --git a/das/src/main/java/com/das/modules/calc/service/CalcService.java b/das/src/main/java/com/das/modules/calc/service/CalcService.java index b3143a5b..6a3b8994 100644 --- a/das/src/main/java/com/das/modules/calc/service/CalcService.java +++ b/das/src/main/java/com/das/modules/calc/service/CalcService.java @@ -126,6 +126,7 @@ public class CalcService { try{ //预编译脚本 aviator.compile(scriptModule.getName(), scriptModule.getScript(), true); + log.info("[预编译脚本] - {}", scriptModule.getName()); startCalcJob(scriptModule); } catch (Exception e){ diff --git a/das/src/main/java/com/das/modules/data/domain/RTValue.java b/das/src/main/java/com/das/modules/data/domain/RTValue.java new file mode 100644 index 00000000..451c8f35 --- /dev/null +++ b/das/src/main/java/com/das/modules/data/domain/RTValue.java @@ -0,0 +1,9 @@ +package com.das.modules.data.domain; + +import lombok.Data; + +@Data +public class RTValue { + private Object value; + private Long time; +} diff --git a/das/src/main/java/com/das/modules/data/service/DataService.java b/das/src/main/java/com/das/modules/data/service/DataService.java index 87e239a3..821539a4 100644 --- a/das/src/main/java/com/das/modules/data/service/DataService.java +++ b/das/src/main/java/com/das/modules/data/service/DataService.java @@ -1,5 +1,6 @@ package com.das.modules.data.service; +import com.das.modules.data.domain.RTValue; import com.das.modules.data.domain.SnapshotValueQueryParam; import com.das.modules.data.domain.TSValueQueryParam; import com.das.modules.node.domain.bo.CalculateRTData; @@ -17,5 +18,5 @@ public interface DataService { void updateCalFieldData(List values); - + Double getTimeTopValue(Long devcieId, String attr, long startTime, long endTime); } diff --git a/das/src/main/java/com/das/modules/data/service/TDEngineService.java b/das/src/main/java/com/das/modules/data/service/TDEngineService.java index 1a3bc12a..860ab832 100644 --- a/das/src/main/java/com/das/modules/data/service/TDEngineService.java +++ b/das/src/main/java/com/das/modules/data/service/TDEngineService.java @@ -4,6 +4,7 @@ import cn.hutool.core.collection.ListUtil; import cn.hutool.core.util.StrUtil; import com.das.common.utils.PageDataInfo; import com.das.modules.data.domain.DeviceEventInfo; +import com.das.modules.data.domain.RTValue; import com.das.modules.equipment.domain.vo.IotModelFieldVo; import com.das.modules.node.domain.bo.CalculateRTData; import com.das.modules.node.domain.bo.RTData; @@ -782,4 +783,27 @@ public class TDEngineService { hikariDataSource.close(); } } + + public Double getTimeTopValue(String tableName, String attr, long startTime, long endTime) { + StringBuffer sb = new StringBuffer(256); + sb.append("select "); + sb.append("first("); + sb.append(attr); + sb.append(") as value"); + sb.append(" from "); + sb.append(tableName); + sb.append(" where "); + sb.append(String.format(" updatetime >= %d and updatetime < %d ", startTime, endTime)); + Double result = null; + try (Connection conn = hikariDataSource.getConnection(); + Statement smt = conn.createStatement(); + ResultSet rs = smt.executeQuery(sb.toString())) { + if (rs.next()) { + result = rs.getDouble("value"); + } + } catch (Exception e) { + log.error("获取数据异常", e); + } + return result; + } } diff --git a/das/src/main/java/com/das/modules/data/service/impl/DataServiceImpl.java b/das/src/main/java/com/das/modules/data/service/impl/DataServiceImpl.java index 94745c55..05227174 100644 --- a/das/src/main/java/com/das/modules/data/service/impl/DataServiceImpl.java +++ b/das/src/main/java/com/das/modules/data/service/impl/DataServiceImpl.java @@ -4,6 +4,10 @@ import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.ListUtil; import com.das.common.exceptions.ServiceException; import com.das.common.utils.AdminRedisTemplate; +import com.das.modules.cache.domain.DeviceInfoCache; +import com.das.modules.cache.service.CacheService; +import com.das.modules.cache.service.EquipmentCache; +import com.das.modules.data.domain.RTValue; import com.das.modules.data.domain.SnapshotValueQueryParam; import com.das.modules.data.domain.TSValueQueryParam; import com.das.modules.data.service.DataService; @@ -26,7 +30,7 @@ import java.util.stream.Collectors; @Slf4j @Service -public class DataServiceImpl implements DataService { +public abstract class DataServiceImpl implements DataService { public static final int COMMIT_COUNT = 1000; @@ -45,6 +49,9 @@ public class DataServiceImpl implements DataService { @Autowired SysIotModelMapper sysIotModelMapper; + @Autowired + CacheService cacheService; + //key:deviceId value:modelCode public ConcurrentHashMap deviceModelMap = new ConcurrentHashMap<>(10000); @@ -236,4 +243,21 @@ public class DataServiceImpl implements DataService { tdEngineService.initIotModel(allIotModel, highIotFieldMap, lowIotFieldMap, calculateIotFieldMap); } + /** + * 获取指定时间区间内的最早的数据 + * @param devcieId + * @param attr + * @param startTime + * @param endTime + * @return + */ + @Override + public Double getTimeTopValue(Long devcieId, String attr, long startTime, long endTime){ + DeviceInfoCache deviceInfoCacheById = cacheService.getEquipmentCache().getDeviceInfoCacheById(devcieId); + if (deviceInfoCacheById == null) { + return null; + } + String tableName = ""; + return tdEngineService.getTimeTopValue(tableName, attr, startTime, endTime); + } } diff --git a/das/src/main/java/com/das/modules/equipment/service/impl/SysEquipmentServiceImpl.java b/das/src/main/java/com/das/modules/equipment/service/impl/SysEquipmentServiceImpl.java index f0213747..3d05481c 100644 --- a/das/src/main/java/com/das/modules/equipment/service/impl/SysEquipmentServiceImpl.java +++ b/das/src/main/java/com/das/modules/equipment/service/impl/SysEquipmentServiceImpl.java @@ -15,6 +15,7 @@ import com.das.common.utils.PageQuery; import com.das.common.utils.SequenceUtils; import com.das.modules.auth.domain.vo.SysUserVo; import com.das.modules.auth.mapper.SysOrgMapper; +import com.das.modules.cache.service.CacheService; import com.das.modules.data.service.impl.DataServiceImpl; import com.das.modules.equipment.domain.dto.SysEquipmentDto; import com.das.modules.equipment.domain.excel.SysEquipmentExcel; @@ -62,6 +63,9 @@ public class SysEquipmentServiceImpl implements SysEquipmentService { @Autowired private DataServiceImpl dataService; + @Autowired + private CacheService cacheService; + @Override public SysEquipmentVo creatSysEquipment(SysEquipmentDto sysEquipmentDto) { //去除空格 @@ -84,6 +88,8 @@ public class SysEquipmentServiceImpl implements SysEquipmentService { if (sysEquipment.getIotModelId() != null){ dataService.deviceModelMap.put(sysEquipment.getId().toString(),dataService.iotModelMap.get(sysEquipment.getIotModelId().toString())); } + //更新设备缓存 + cacheService.getEquipmentCache().refreshDeviceCache(sysEquipment.getId()); SysEquipmentVo sysEquipmentVo = new SysEquipmentVo(); BeanCopyUtils.copy(sysEquipment, sysEquipmentVo); return sysEquipmentVo; @@ -111,6 +117,8 @@ public class SysEquipmentServiceImpl implements SysEquipmentService { if (oldModelSysEquipInfo.getIotModelId() == null && sysEquipment.getIotModelId() != null){ dataService.deviceModelMap.put(sysEquipment.getId().toString(),dataService.iotModelMap.get(sysEquipment.getIotModelId().toString())); } + //更新设备缓存 + cacheService.getEquipmentCache().refreshDeviceCache(sysEquipment.getId()); SysEquipmentVo sysEquipmentVo = new SysEquipmentVo(); BeanCopyUtils.copy(sysEquipment, sysEquipmentVo); return sysEquipmentVo; @@ -124,6 +132,8 @@ public class SysEquipmentServiceImpl implements SysEquipmentService { } sysEquipmentMapper.deleteById(sysEquipmentDto.getId()); //删除缓存 + //更新设备缓存 + cacheService.getEquipmentCache().refreshDeviceCache(sysEquipmentDto.getId()); dataService.deviceModelMap.remove(sysEquipmentDto.getId().toString()); } @@ -297,6 +307,8 @@ public class SysEquipmentServiceImpl implements SysEquipmentService { String modelCode = dataService.iotModelMap.get(item.getIotModelId().toString()); dataService.deviceModelMap.put(item.getId().toString(),modelCode); } + //更新设备缓存 + cacheService.getEquipmentCache().refreshDeviceCache(item.getId()); } if (CollectionUtils.isNotEmpty(updateSysEquipmentList)) { sysEquipmentMapper.updateBatchById(updateSysEquipmentList); @@ -305,16 +317,17 @@ public class SysEquipmentServiceImpl implements SysEquipmentService { String modelCode = dataService.iotModelMap.get(item.getIotModelId().toString()); dataService.deviceModelMap.put(item.getId().toString(),modelCode); } + //更新设备缓存 + cacheService.getEquipmentCache().refreshDeviceCache(item.getId()); } } if (CollectionUtils.isNotEmpty(delSysEquipmentList)) { // 删除设备 sysEquipmentMapper.deleteBatchIds(delSysEquipmentList); - for (SysEquipment item : updateSysEquipmentList){ - if (item.getIotModelId() != null){ - String modelCode = dataService.iotModelMap.get(item.getIotModelId().toString()); - dataService.deviceModelMap.put(item.getId().toString(),modelCode); - } + for (SysEquipment item : delSysEquipmentList){ + dataService.deviceModelMap.remove(item.getId().toString()); + //更新设备缓存 + cacheService.getEquipmentCache().removeDeviceCache(item.getId()); } } diff --git a/das/src/main/resources/application-dev.yml b/das/src/main/resources/application-dev.yml index d33a2bc2..469df38b 100644 --- a/das/src/main/resources/application-dev.yml +++ b/das/src/main/resources/application-dev.yml @@ -40,16 +40,16 @@ spring: # 多个文件总大小 max-request-size: 2048MB datasource: - url: jdbc:postgresql://192.168.109.102:5432/das + url: jdbc:postgresql://192.168.109.187:5432/das username: das password: qwaszx12 # # redis相关配置 data: redis: - host: 192.168.109.195 + host: 192.168.109.187 database: 0 port: 6379 - password: + password: zaq12WSX client-type: lettuce