diff --git a/das/src/main/java/com/das/modules/node/domain/bo/CalculateRTData.java b/das/src/main/java/com/das/modules/node/domain/bo/CalculateRTData.java new file mode 100644 index 00000000..da619244 --- /dev/null +++ b/das/src/main/java/com/das/modules/node/domain/bo/CalculateRTData.java @@ -0,0 +1,23 @@ +package com.das.modules.node.domain.bo; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class CalculateRTData { + @JsonSerialize(using = ToStringSerializer.class) + private Long deviceId; + + private Long dataTime; + + private Object dataValue; + + private String iotModelField; +} diff --git a/das/src/main/java/com/das/modules/node/domain/bo/RTData.java b/das/src/main/java/com/das/modules/node/domain/bo/RTData.java index 006a4149..75020198 100644 --- a/das/src/main/java/com/das/modules/node/domain/bo/RTData.java +++ b/das/src/main/java/com/das/modules/node/domain/bo/RTData.java @@ -19,5 +19,6 @@ public class RTData { @JsonSerialize(using = ToStringSerializer.class) private Long deviceId; private Long dataTime; + //字段和值的key value private Map values; } diff --git a/das/src/main/java/com/das/modules/node/service/DataService.java b/das/src/main/java/com/das/modules/node/service/DataService.java index ccddf197..33b34c4c 100644 --- a/das/src/main/java/com/das/modules/node/service/DataService.java +++ b/das/src/main/java/com/das/modules/node/service/DataService.java @@ -1,8 +1,11 @@ package com.das.modules.node.service; +import com.das.modules.node.domain.bo.CalculateRTData; import com.das.modules.node.domain.bo.TerminalMessage; import com.fasterxml.jackson.databind.JsonNode; +import java.util.List; + public interface DataService { void pushMessage(TerminalMessage msg); @@ -15,4 +18,6 @@ public interface DataService { void handleHighSpeed(TerminalMessage data); void handleLowSpeed(TerminalMessage data); + + void updateCalFieldData(List values); } diff --git a/das/src/main/java/com/das/modules/node/service/TDEngineService.java b/das/src/main/java/com/das/modules/node/service/TDEngineService.java index b2ef082c..5cd18c94 100644 --- a/das/src/main/java/com/das/modules/node/service/TDEngineService.java +++ b/das/src/main/java/com/das/modules/node/service/TDEngineService.java @@ -4,7 +4,9 @@ import cn.hutool.core.collection.ListUtil; import cn.hutool.core.util.StrUtil; import com.das.modules.equipment.domain.vo.IotModelFieldVo; import com.das.modules.equipment.mapper.SysIotModelMapper; +import com.das.modules.node.domain.bo.CalculateRTData; import com.das.modules.node.domain.bo.RTData; +import com.das.modules.node.service.impl.DataServiceImpl; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import jakarta.annotation.PreDestroy; @@ -30,6 +32,8 @@ public class TDEngineService { private HikariDataSource hikariDataSource; + private DataServiceImpl dataService; + @Value("${tdengine.url}") private String url; @@ -336,6 +340,39 @@ public class TDEngineService { } } + @Async + public void updateCalFieldValues(List values) { + StringBuilder sb = new StringBuilder(1024 * 1024); + try (Connection conn = hikariDataSource.getConnection(); + Statement pstmt = conn.createStatement()) { + ListUtil.page(values, batchSize, (list) -> { + sb.setLength(0); + sb.append("insert into "); + for (CalculateRTData dv : list) { + sb.append("c"); + sb.append(dv.getDeviceId()); + sb.append(" using c_"); + sb.append(dataService.deviceModelMap.get(dv.getDeviceId().toString())); + sb.append("_").append(dv.getIotModelField()); + sb.append(" tags ("); + sb.append(dv.getDeviceId()); + sb.append(") values ("); + sb.append(dv.getDataTime()); + sb.append(","); + sb.append(dv.getDataValue()); + sb.append(")"); + } + try { + pstmt.executeUpdate(sb.toString()); + } catch (SQLException ex) { + log.error("save yc error", ex); + } + }); + } catch (SQLException ex) { + log.error(ex.getMessage()); + } + } + @Async public void updateStataValues(List values, String iotModelCode) { StringBuilder sb = new StringBuilder(1024 * 1024); diff --git a/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java b/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java index 1083ebb4..c32d5b28 100644 --- a/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java +++ b/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java @@ -1,5 +1,6 @@ package com.das.modules.node.service.impl; +import cn.hutool.core.collection.ListUtil; import cn.hutool.core.util.IdUtil; import com.das.common.constant.MeasType; import com.das.common.utils.AdminRedisTemplate; @@ -8,6 +9,7 @@ import com.das.modules.equipment.entity.SysIotModelField; import com.das.modules.equipment.mapper.SysIotModelMapper; import com.das.modules.node.disruptor.MessageEventFactory; import com.das.modules.node.disruptor.TerminalMessageEventHandler; +import com.das.modules.node.domain.bo.CalculateRTData; import com.das.modules.node.domain.bo.RTData; import com.das.modules.node.domain.bo.TerminalMessage; import com.das.modules.node.domain.vo.*; @@ -44,6 +46,8 @@ public class DataServiceImpl implements DataService { private RingBuffer ringBuffer = null; + public static final int COMMIT_NUMBER = 1000; + @Resource TerminalMessageEventHandler terminalMessageEventHandler; @@ -65,12 +69,19 @@ public class DataServiceImpl implements DataService { @Autowired TDEngineService tdEngineService; + //key:deviceId value:modelCode + public ConcurrentHashMap deviceModelMap = new ConcurrentHashMap<>(10000); + + //key:modelId value:modelCode public ConcurrentHashMap iotModelMap = new ConcurrentHashMap<>(10000); + //key:modelCode value:Filed:Code,dataType public ConcurrentHashMap> highIotFieldMap = new ConcurrentHashMap<>(10000); + //key:modelCode value:Filed:Code,dataType public ConcurrentHashMap> lowIotFieldMap = new ConcurrentHashMap<>(10000); + //key:modelCode value:Filed:Code,dataType public ConcurrentHashMap> calculateIotFieldMap = new ConcurrentHashMap<>(10000); @PostConstruct @@ -143,42 +154,42 @@ public class DataServiceImpl implements DataService { ObjectNode equipJsonNode = JSON_MAPPER.convertValue(map, ObjectNode.class); List attrs = new ArrayList<>(); - List services= new ArrayList<>(); + List services = new ArrayList<>(); - List tabMappingVoList = sysImptabmappingMapper.getMappingInfoListByLinkIdAndDeviceId(sysCommunicationLinkVo.getId(), dev.getId()); + List tabMappingVoList = sysImptabmappingMapper.getMappingInfoListByLinkIdAndDeviceId(sysCommunicationLinkVo.getId(), dev.getId()); if (!CollectionUtils.isEmpty(tabMappingVoList)) { for (SysTabMappingVo info : tabMappingVoList) { - NewIotModelVo newIotModelVo = new NewIotModelVo(); - newIotModelVo.setName(info.getMeasPointCode()); - if (info.getParams() == null) { - newIotModelVo.setParams(equipJsonNode); - } else { - newIotModelVo.setParams(JSON_MAPPER.readTree(info.getParams())); - } + NewIotModelVo newIotModelVo = new NewIotModelVo(); + newIotModelVo.setName(info.getMeasPointCode()); + if (info.getParams() == null) { + newIotModelVo.setParams(equipJsonNode); + } else { + newIotModelVo.setParams(JSON_MAPPER.readTree(info.getParams())); + } - Integer pointType = info.getMeasPointType(); + Integer pointType = info.getMeasPointType(); - if (pointType == MeasType.TYPE_PSR_ANALOG) { - newIotModelVo.setHighSpeed(info.getHighSpeed()); - newIotModelVo.setType("yc"); - attrs.add(newIotModelVo); - } else if (pointType == MeasType.TYPE_PSR_ACCUMULATOR) { - newIotModelVo.setHighSpeed(info.getHighSpeed()); - newIotModelVo.setType("ym"); - attrs.add(newIotModelVo); - } else if (pointType == MeasType.TYPE_PSR_DISCRETE) { - newIotModelVo.setHighSpeed(info.getHighSpeed()); - newIotModelVo.setType("yx"); - attrs.add(newIotModelVo); - }else if (pointType == MeasType.TYPE_PSR_SET_POINT) { + if (pointType == MeasType.TYPE_PSR_ANALOG) { + newIotModelVo.setHighSpeed(info.getHighSpeed()); + newIotModelVo.setType("yc"); + attrs.add(newIotModelVo); + } else if (pointType == MeasType.TYPE_PSR_ACCUMULATOR) { + newIotModelVo.setHighSpeed(info.getHighSpeed()); + newIotModelVo.setType("ym"); + attrs.add(newIotModelVo); + } else if (pointType == MeasType.TYPE_PSR_DISCRETE) { + newIotModelVo.setHighSpeed(info.getHighSpeed()); + newIotModelVo.setType("yx"); + attrs.add(newIotModelVo); + } else if (pointType == MeasType.TYPE_PSR_SET_POINT) { newIotModelVo.setType("yt"); - services.add(newIotModelVo); - } else if (pointType == MeasType.TYPE_PSR_CONTROL) { + services.add(newIotModelVo); + } else if (pointType == MeasType.TYPE_PSR_CONTROL) { newIotModelVo.setType("yk"); - services.add(newIotModelVo); - } + services.add(newIotModelVo); + } } } @@ -198,7 +209,7 @@ public class DataServiceImpl implements DataService { configUpdateVo.setVersion(1); configUpdateVo.setLinks(links); configUpdateVo.setEquipments(equipments); - log.info("下发配置为{}",configUpdateVo); + log.info("下发配置为{}", configUpdateVo); JsonNode jsonNode = JSON_MAPPER.valueToTree(configUpdateVo); long time = System.currentTimeMillis(); @@ -220,8 +231,8 @@ public class DataServiceImpl implements DataService { String key = String.valueOf(item.getId()); iotModelMap.put(key, item.getIotModelCode()); List allIotModelField = sysIotModelMapper.getAllIotModelField(item.getId()); - Map LowModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 0 && field.getAttributeType() != 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1)); - Map HighModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 1 && field.getAttributeType() != 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1)); + Map LowModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 0 && field.getAttributeType() != 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1)); + Map HighModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 1 && field.getAttributeType() != 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1)); Map calculateFieldList = allIotModelField.stream().filter(field -> field.getAttributeType() == 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1)); Map map = new HashMap<>(); for (String field : HighModelFieldList.keySet()) { @@ -233,9 +244,9 @@ public class DataServiceImpl implements DataService { lowMap.put(field, LowModelFieldList.get(field)); } lowIotFieldMap.put(item.getIotModelCode(), lowMap); - calculateIotFieldMap.put(item.getIotModelCode(),calculateFieldList); + calculateIotFieldMap.put(item.getIotModelCode(), calculateFieldList); } - tdEngineService.initIotModel(allIotModel, highIotFieldMap, lowIotFieldMap,calculateIotFieldMap); + tdEngineService.initIotModel(allIotModel, highIotFieldMap, lowIotFieldMap, calculateIotFieldMap); } @Override @@ -250,7 +261,7 @@ public class DataServiceImpl implements DataService { while (keysHigh.hasNext()) { String fieldName = keysHigh.next(); String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase()); - keyValueMap.put(key,values.get(fieldName)); + keyValueMap.put(key, values.get(fieldName)); } adminRedisTemplate.mSet(keyValueMap); } @@ -292,7 +303,7 @@ public class DataServiceImpl implements DataService { String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong()); //Low数据 - Iterator keysLow= values.fieldNames(); + Iterator keysLow = values.fieldNames(); while (keysLow.hasNext()) { String fieldName = keysLow.next(); keyValueMap.put(fieldName, values.get(fieldName)); @@ -309,4 +320,18 @@ public class DataServiceImpl implements DataService { tdEngineService.updateYCLowValues(highList, iotModelCode); } + @Override + public void updateCalFieldData(List calValues) { + //更新数据至redis,TD + ListUtil.page(calValues, COMMIT_NUMBER, list -> { + Map keyValueMap = new HashMap<>(); + for (CalculateRTData value : list) { + String key = String.format("RT:%s:%s", value.getDeviceId(), value.getIotModelField().toLowerCase()); + keyValueMap.put(key, value.getDataValue()); + } + adminRedisTemplate.mSet(keyValueMap); + tdEngineService.updateCalFieldValues(list); + }); + } + }