diff --git a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java index 50331b83..519d4e56 100644 --- a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java @@ -16,6 +16,7 @@ public class AnalogDataCommand implements BaseCommand { @Override public void doCommand(TerminalMessage data) { try { + //analogData值只存入redis dataService.handleData(data); } catch (Exception e) { diff --git a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java index ac1b80ce..f758b68a 100644 --- a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java @@ -17,6 +17,7 @@ public class StateDataCommand implements BaseCommand { @Override public void doCommand(TerminalMessage data) { try { + //只存入redis dataService.handleData(data); } catch (Exception e) { log.error("解析数据异常", e); diff --git a/das/src/main/java/com/das/modules/node/constant/NodeConstant.java b/das/src/main/java/com/das/modules/node/constant/NodeConstant.java index 9e1f93cf..06e781f5 100644 --- a/das/src/main/java/com/das/modules/node/constant/NodeConstant.java +++ b/das/src/main/java/com/das/modules/node/constant/NodeConstant.java @@ -17,4 +17,10 @@ public interface NodeConstant { String ANALOG_DATA = "analogData"; String STATE_DATA = "stateData"; + + String HIS_ANALOG_DATA = "historyStateData"; + + String HIS_STATE_DATA = "historyStateData"; + + String DEVICE_CONTROL_RESP = "deviceControlResp"; } diff --git a/das/src/main/java/com/das/modules/node/domain/vo/IotModelVo.java b/das/src/main/java/com/das/modules/node/domain/vo/IotModelVo.java index c7e10e2f..85d7ac27 100644 --- a/das/src/main/java/com/das/modules/node/domain/vo/IotModelVo.java +++ b/das/src/main/java/com/das/modules/node/domain/vo/IotModelVo.java @@ -25,4 +25,6 @@ public class IotModelVo { private String equipmentService; private Object params; + + private Integer highSpeed; } diff --git a/das/src/main/java/com/das/modules/node/domain/vo/NewIotModelVo.java b/das/src/main/java/com/das/modules/node/domain/vo/NewIotModelVo.java index e4b22796..58a4b812 100644 --- a/das/src/main/java/com/das/modules/node/domain/vo/NewIotModelVo.java +++ b/das/src/main/java/com/das/modules/node/domain/vo/NewIotModelVo.java @@ -9,5 +9,7 @@ public class NewIotModelVo { private String type; + private Integer highSpeed; + private Object params; } diff --git a/das/src/main/java/com/das/modules/node/service/TDEngineService.java b/das/src/main/java/com/das/modules/node/service/TDEngineService.java index 2af353b4..36e26cc6 100644 --- a/das/src/main/java/com/das/modules/node/service/TDEngineService.java +++ b/das/src/main/java/com/das/modules/node/service/TDEngineService.java @@ -124,7 +124,7 @@ public class TDEngineService { } @Async - public void updateYCValues(List values, String iotModelCode) { + public void updateYCHighValues(List values, String iotModelCode) { StringBuilder sb = new StringBuilder(1024*1024); try (Connection conn = hikariDataSource.getConnection(); Statement pstmt = conn.createStatement()) { @@ -132,12 +132,54 @@ public class TDEngineService { sb.setLength(0); sb.append("insert into "); for (RTData dv : list) { - sb.append("d"); + sb.append("h"); sb.append(dv.getDeviceId()); - sb.append(" using " ); + sb.append(" using h_" ); sb.append(iotModelCode); sb.append(" tags ("); sb.append(dv.getDeviceId()); + sb.append(") ("); + dv.getValues().forEach((key, value) -> + sb.append(",").append(key) + ); + sb.append(") values ("); + sb.append(dv.getDataTime()); + + dv.getValues().forEach((key, value) -> + sb.append(",").append(value) + ); + sb.append(")"); + } + try { + pstmt.executeUpdate(sb.toString()); + } catch (SQLException ex) { + log.error("save yc error", ex); + } + }); + } catch (SQLException ex) { + log.error(ex.getMessage()); + } + } + + @Async + public void updateYCLowValues(List values, String iotModelCode) { + StringBuilder sb = new StringBuilder(1024*1024); + try (Connection conn = hikariDataSource.getConnection(); + Statement pstmt = conn.createStatement()) { + ListUtil.page(values, batchSize, (list)->{ + sb.setLength(0); + sb.append("insert into "); + for (RTData dv : list) { + sb.append("l"); + sb.append(dv.getDeviceId()); + sb.append(" using l_" ); + sb.append(iotModelCode); + sb.append(" tags ("); + sb.append(dv.getDeviceId()); + sb.append(") ("); + dv.getValues().forEach((key, value) -> + sb.append(",").append(key) + ); sb.append(") values ("); sb.append(dv.getDataTime()); diff --git a/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java b/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java index 73b4fb9d..8fc28f8b 100644 --- a/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java +++ b/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java @@ -29,10 +29,7 @@ import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -72,8 +69,6 @@ public class DataServiceImpl implements DataService { public ConcurrentHashMap> lowIotFieldMap = new ConcurrentHashMap<>(10000); - public static final String DEVICE_DATA = "deviceData:{0}"; - @PostConstruct public void init() { //初始化高性能队列 @@ -151,9 +146,10 @@ public class DataServiceImpl implements DataService { List iotModelFieldList = sysImptabmappingMapper.getIotModelFieldByEquipmentId(equipmentId); if (!CollectionUtils.isEmpty(iotModelFieldList)) { for (IotModelVo info : iotModelFieldList) { - if(info.getServiceType() == null) { + if (info.getServiceType() == null) { NewIotModelVo newIotModelVo = new NewIotModelVo(); newIotModelVo.setName(info.getEquipmentAttribute()); + newIotModelVo.setHighSpeed(info.getHighSpeed()); if (info.getParams() == null) { newIotModelVo.setParams(equipJsonNode); } else { @@ -235,7 +231,7 @@ public class DataServiceImpl implements DataService { } lowIotFieldMap.put(item.getIotModelCode(), lowMap); } - tdEngineService.initIotModel(allIotModel, highIotFieldMap,lowIotFieldMap); + tdEngineService.initIotModel(allIotModel, highIotFieldMap, lowIotFieldMap); } @Override @@ -243,33 +239,52 @@ public class DataServiceImpl implements DataService { // 先从redis里面获取设备的初始化数据 JsonNode jsonNode = data.getData(); String deviceId = jsonNode.get("deviceId").asText(); - String key = MessageFormat.format(DEVICE_DATA, deviceId); - HashMap initValue = adminRedisTemplate.get(key); JsonNode values = jsonNode.get("values"); - List> entryList = new ArrayList<>(initValue.entrySet()); - HashMap newHashMap = new HashMap<>(); - for (Map.Entry entry : entryList) { - if(values.get(entry.getKey()) == null){ - newHashMap.put(entry.getKey(), entry.getValue()); - } else{ - newHashMap.put(entry.getKey(), values.get(entry.getKey()).asDouble()); - } - - } - adminRedisTemplate.set(key, newHashMap); - Long dataTime = data.getTime(); - // 存入td库 + JsonNode high = values.get("high"); + JsonNode low = values.get("low"); + Map keyValueMap = new HashMap<>(); // 根据设备ID获取对应的物模型属性 String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong()); - HashMap tdValues = adminRedisTemplate.get(key); - List list = new ArrayList<>(); - RTData rtData = RTData.builder() + Map highFieldMap = highIotFieldMap.get(iotModelCode); + Map lowFiledMap = lowIotFieldMap.get(iotModelCode); + + //High数据 + Iterator keysHigh = high.fieldNames(); + while (keysHigh.hasNext()) { + String fieldName = keysHigh.next(); + highFieldMap.put(fieldName, high.get(fieldName)); + String key = String.format("RT:[%s]:[%s]", deviceId, fieldName); + keyValueMap.put(key, high.get(fieldName).asDouble()); + + } + + //LOW数据 + Iterator keysLow = low.fieldNames(); + while (keysLow.hasNext()) { + String fieldName = keysLow.next(); + lowFiledMap.put(fieldName, low.get(fieldName)); + String key = String.format("RT:[%s]:[%s]", deviceId, fieldName); + keyValueMap.put(key, high.get(fieldName).asDouble()); + } + adminRedisTemplate.mSet(keyValueMap); + Long dataTime = data.getTime(); + + // 存入td库 + List highList = new ArrayList<>(); + List lowList = new ArrayList<>(); + RTData rtHighData = RTData.builder() .dataTime(dataTime) .deviceId(Long.valueOf(deviceId)) - .values(tdValues) + .values(highFieldMap) .build(); - - list.add(rtData); - tdEngineService.updateYCValues(list, iotModelCode); + RTData rtLowData = RTData.builder() + .dataTime(dataTime) + .deviceId(Long.valueOf(deviceId)) + .values(highFieldMap) + .build(); + highList.add(rtHighData); + lowList.add(rtLowData); + tdEngineService.updateYCHighValues(highList, iotModelCode); + tdEngineService.updateYCLowValues(lowList, iotModelCode); } } diff --git a/das/src/main/resources/mapper/SysImptabmappingMapper.xml b/das/src/main/resources/mapper/SysImptabmappingMapper.xml index a3df07df..fcfa7990 100644 --- a/das/src/main/resources/mapper/SysImptabmappingMapper.xml +++ b/das/src/main/resources/mapper/SysImptabmappingMapper.xml @@ -57,7 +57,7 @@