From c7d7cdb83371d7ebad8005d0d9aadf2aa072dcdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=B7=E6=88=90=E4=BC=9F?= Date: Thu, 17 Oct 2024 15:31:54 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/api/_sidebar.md | 5 ++++- docs/datacollect/README.md | 39 ++++++++++---------------------------- 2 files changed, 14 insertions(+), 30 deletions(-) diff --git a/docs/api/_sidebar.md b/docs/api/_sidebar.md index b4f0132d..8db7cb3d 100644 --- a/docs/api/_sidebar.md +++ b/docs/api/_sidebar.md @@ -1 +1,4 @@ -- [返回首页](/) \ No newline at end of file +- [返回首页](/) +- [设备模块](equipment.md) +- [节点模块](node.md) +- [系统管理模块](systemmgr.md) \ No newline at end of file diff --git a/docs/datacollect/README.md b/docs/datacollect/README.md index 3dc10fcd..ef69a9cb 100644 --- a/docs/datacollect/README.md +++ b/docs/datacollect/README.md @@ -161,35 +161,12 @@ PS: 同一节点只允许建立一条连接。 } ] } -``` -### 设备初始数据上报 - -当采集设备第一次采集到设备完整数据时,需要向系统上报设备初始数据。 - -?> 方向: `采集程序` -> `系统` - -**命令:** `initDeviceData` - -**数据体:** -```json -{ - "deviceId": "1123451235464", - "values": { - //设备完整初始数据 - "Ia": 123.1, - "Ib": 122.1, - "Ic": 123.1, - "Ua": 220.3, - "Ub": 221.4, - "Uc": 223.1, - "Switch01": 1 - } -} - ``` ### 模拟量数据上报 +采集程序向系统推送实时模拟量数据,此数据只刷新缓存,不归档。 + ?> 方向: `采集程序` -> `系统` **命令:** `analogData` @@ -212,6 +189,8 @@ PS: 同一节点只允许建立一条连接。 ### 状态量数据上报 +采集程序向系统推送实时状态量数据,此数据只刷新缓存,不归档。 + ?> 方向: `采集程序` -> `系统` **命令:** `stateData` @@ -230,11 +209,13 @@ PS: 同一节点只允许建立一条连接。 } ``` -### 历史模拟量数据上报 +### 历史高频数据上报 + +采集程序推送高频历史数据 ?> 方向: `采集程序` -> `系统` -**命令:** `historyAnalogData` +**命令:** `historyHighSpeedData` **数据体:** @@ -254,11 +235,11 @@ PS: 同一节点只允许建立一条连接。 ``` -### 历史状态量数据上报 +### 历史低频数据上报 ?> 方向: `采集程序` -> `系统` -**命令:** `historyStateData` +**命令:** `historyLowSpeedData` **数据体:** From aaa9ba62830cb53565d22ef91bc5c1cac5fd6722 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=B7=E6=88=90=E4=BC=9F?= Date: Thu, 17 Oct 2024 15:41:34 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E6=96=87=E6=A1=A3?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/_sidebar.md | 3 --- docs/api/_sidebar.md | 8 ++++---- docs/datacollect/_sidebar.md | 2 +- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/docs/_sidebar.md b/docs/_sidebar.md index 3bf8105e..b734321e 100644 --- a/docs/_sidebar.md +++ b/docs/_sidebar.md @@ -1,6 +1,3 @@ - [首页](/) - [API说明](api/) - - [系统管理](api/systemmgr.md) - - [设备管理](api/equipment.md) - - [节点管理](api/node.md) - [数据采集](datacollect/) \ No newline at end of file diff --git a/docs/api/_sidebar.md b/docs/api/_sidebar.md index 8db7cb3d..516ee848 100644 --- a/docs/api/_sidebar.md +++ b/docs/api/_sidebar.md @@ -1,4 +1,4 @@ -- [返回首页](/) -- [设备模块](equipment.md) -- [节点模块](node.md) -- [系统管理模块](systemmgr.md) \ No newline at end of file +* [返回首页](/) +* [设备模块](equipment.md) +* [节点模块](node.md) +* [系统管理模块](systemmgr.md) \ No newline at end of file diff --git a/docs/datacollect/_sidebar.md b/docs/datacollect/_sidebar.md index b4f0132d..328af778 100644 --- a/docs/datacollect/_sidebar.md +++ b/docs/datacollect/_sidebar.md @@ -1 +1 @@ -- [返回首页](/) \ No newline at end of file +* [返回首页](/) \ No newline at end of file From c87c0de17b0259b2a868e029c2e2d7e122df4f02 Mon Sep 17 00:00:00 2001 From: huguanghan Date: Thu, 17 Oct 2024 15:56:57 +0800 Subject: [PATCH 3/4] =?UTF-8?q?tdengine=E6=95=B0=E6=8D=AE=E6=8C=89?= =?UTF-8?q?=E8=AE=BE=E5=A4=87=E5=88=86=E4=B8=BA=E9=AB=98=E9=A2=91=E5=92=8C?= =?UTF-8?q?=E4=BD=8E=E9=A2=91=E8=A1=A8,=E5=85=A5=E5=BA=93=EF=BC=8Credis?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../node/command/AnalogDataCommand.java | 1 + .../node/command/StateDataCommand.java | 1 + .../modules/node/constant/NodeConstant.java | 6 ++ .../modules/node/domain/vo/IotModelVo.java | 2 + .../modules/node/domain/vo/NewIotModelVo.java | 2 + .../modules/node/service/TDEngineService.java | 48 +++++++++++- .../node/service/impl/DataServiceImpl.java | 75 +++++++++++-------- .../mapper/SysImptabmappingMapper.xml | 2 +- 8 files changed, 103 insertions(+), 34 deletions(-) diff --git a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java index 50331b83..519d4e56 100644 --- a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java @@ -16,6 +16,7 @@ public class AnalogDataCommand implements BaseCommand { @Override public void doCommand(TerminalMessage data) { try { + //analogData值只存入redis dataService.handleData(data); } catch (Exception e) { diff --git a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java index ac1b80ce..f758b68a 100644 --- a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java @@ -17,6 +17,7 @@ public class StateDataCommand implements BaseCommand { @Override public void doCommand(TerminalMessage data) { try { + //只存入redis dataService.handleData(data); } catch (Exception e) { log.error("解析数据异常", e); diff --git a/das/src/main/java/com/das/modules/node/constant/NodeConstant.java b/das/src/main/java/com/das/modules/node/constant/NodeConstant.java index 9e1f93cf..06e781f5 100644 --- a/das/src/main/java/com/das/modules/node/constant/NodeConstant.java +++ b/das/src/main/java/com/das/modules/node/constant/NodeConstant.java @@ -17,4 +17,10 @@ public interface NodeConstant { String ANALOG_DATA = "analogData"; String STATE_DATA = "stateData"; + + String HIS_ANALOG_DATA = "historyStateData"; + + String HIS_STATE_DATA = "historyStateData"; + + String DEVICE_CONTROL_RESP = "deviceControlResp"; } diff --git a/das/src/main/java/com/das/modules/node/domain/vo/IotModelVo.java b/das/src/main/java/com/das/modules/node/domain/vo/IotModelVo.java index c7e10e2f..85d7ac27 100644 --- a/das/src/main/java/com/das/modules/node/domain/vo/IotModelVo.java +++ b/das/src/main/java/com/das/modules/node/domain/vo/IotModelVo.java @@ -25,4 +25,6 @@ public class IotModelVo { private String equipmentService; private Object params; + + private Integer highSpeed; } diff --git a/das/src/main/java/com/das/modules/node/domain/vo/NewIotModelVo.java b/das/src/main/java/com/das/modules/node/domain/vo/NewIotModelVo.java index e4b22796..58a4b812 100644 --- a/das/src/main/java/com/das/modules/node/domain/vo/NewIotModelVo.java +++ b/das/src/main/java/com/das/modules/node/domain/vo/NewIotModelVo.java @@ -9,5 +9,7 @@ public class NewIotModelVo { private String type; + private Integer highSpeed; + private Object params; } diff --git a/das/src/main/java/com/das/modules/node/service/TDEngineService.java b/das/src/main/java/com/das/modules/node/service/TDEngineService.java index 2af353b4..36e26cc6 100644 --- a/das/src/main/java/com/das/modules/node/service/TDEngineService.java +++ b/das/src/main/java/com/das/modules/node/service/TDEngineService.java @@ -124,7 +124,7 @@ public class TDEngineService { } @Async - public void updateYCValues(List values, String iotModelCode) { + public void updateYCHighValues(List values, String iotModelCode) { StringBuilder sb = new StringBuilder(1024*1024); try (Connection conn = hikariDataSource.getConnection(); Statement pstmt = conn.createStatement()) { @@ -132,12 +132,54 @@ public class TDEngineService { sb.setLength(0); sb.append("insert into "); for (RTData dv : list) { - sb.append("d"); + sb.append("h"); sb.append(dv.getDeviceId()); - sb.append(" using " ); + sb.append(" using h_" ); sb.append(iotModelCode); sb.append(" tags ("); sb.append(dv.getDeviceId()); + sb.append(") ("); + dv.getValues().forEach((key, value) -> + sb.append(",").append(key) + ); + sb.append(") values ("); + sb.append(dv.getDataTime()); + + dv.getValues().forEach((key, value) -> + sb.append(",").append(value) + ); + sb.append(")"); + } + try { + pstmt.executeUpdate(sb.toString()); + } catch (SQLException ex) { + log.error("save yc error", ex); + } + }); + } catch (SQLException ex) { + log.error(ex.getMessage()); + } + } + + @Async + public void updateYCLowValues(List values, String iotModelCode) { + StringBuilder sb = new StringBuilder(1024*1024); + try (Connection conn = hikariDataSource.getConnection(); + Statement pstmt = conn.createStatement()) { + ListUtil.page(values, batchSize, (list)->{ + sb.setLength(0); + sb.append("insert into "); + for (RTData dv : list) { + sb.append("l"); + sb.append(dv.getDeviceId()); + sb.append(" using l_" ); + sb.append(iotModelCode); + sb.append(" tags ("); + sb.append(dv.getDeviceId()); + sb.append(") ("); + dv.getValues().forEach((key, value) -> + sb.append(",").append(key) + ); sb.append(") values ("); sb.append(dv.getDataTime()); diff --git a/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java b/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java index 73b4fb9d..8fc28f8b 100644 --- a/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java +++ b/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java @@ -29,10 +29,7 @@ import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -72,8 +69,6 @@ public class DataServiceImpl implements DataService { public ConcurrentHashMap> lowIotFieldMap = new ConcurrentHashMap<>(10000); - public static final String DEVICE_DATA = "deviceData:{0}"; - @PostConstruct public void init() { //初始化高性能队列 @@ -151,9 +146,10 @@ public class DataServiceImpl implements DataService { List iotModelFieldList = sysImptabmappingMapper.getIotModelFieldByEquipmentId(equipmentId); if (!CollectionUtils.isEmpty(iotModelFieldList)) { for (IotModelVo info : iotModelFieldList) { - if(info.getServiceType() == null) { + if (info.getServiceType() == null) { NewIotModelVo newIotModelVo = new NewIotModelVo(); newIotModelVo.setName(info.getEquipmentAttribute()); + newIotModelVo.setHighSpeed(info.getHighSpeed()); if (info.getParams() == null) { newIotModelVo.setParams(equipJsonNode); } else { @@ -235,7 +231,7 @@ public class DataServiceImpl implements DataService { } lowIotFieldMap.put(item.getIotModelCode(), lowMap); } - tdEngineService.initIotModel(allIotModel, highIotFieldMap,lowIotFieldMap); + tdEngineService.initIotModel(allIotModel, highIotFieldMap, lowIotFieldMap); } @Override @@ -243,33 +239,52 @@ public class DataServiceImpl implements DataService { // 先从redis里面获取设备的初始化数据 JsonNode jsonNode = data.getData(); String deviceId = jsonNode.get("deviceId").asText(); - String key = MessageFormat.format(DEVICE_DATA, deviceId); - HashMap initValue = adminRedisTemplate.get(key); JsonNode values = jsonNode.get("values"); - List> entryList = new ArrayList<>(initValue.entrySet()); - HashMap newHashMap = new HashMap<>(); - for (Map.Entry entry : entryList) { - if(values.get(entry.getKey()) == null){ - newHashMap.put(entry.getKey(), entry.getValue()); - } else{ - newHashMap.put(entry.getKey(), values.get(entry.getKey()).asDouble()); - } - - } - adminRedisTemplate.set(key, newHashMap); - Long dataTime = data.getTime(); - // 存入td库 + JsonNode high = values.get("high"); + JsonNode low = values.get("low"); + Map keyValueMap = new HashMap<>(); // 根据设备ID获取对应的物模型属性 String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong()); - HashMap tdValues = adminRedisTemplate.get(key); - List list = new ArrayList<>(); - RTData rtData = RTData.builder() + Map highFieldMap = highIotFieldMap.get(iotModelCode); + Map lowFiledMap = lowIotFieldMap.get(iotModelCode); + + //High数据 + Iterator keysHigh = high.fieldNames(); + while (keysHigh.hasNext()) { + String fieldName = keysHigh.next(); + highFieldMap.put(fieldName, high.get(fieldName)); + String key = String.format("RT:[%s]:[%s]", deviceId, fieldName); + keyValueMap.put(key, high.get(fieldName).asDouble()); + + } + + //LOW数据 + Iterator keysLow = low.fieldNames(); + while (keysLow.hasNext()) { + String fieldName = keysLow.next(); + lowFiledMap.put(fieldName, low.get(fieldName)); + String key = String.format("RT:[%s]:[%s]", deviceId, fieldName); + keyValueMap.put(key, high.get(fieldName).asDouble()); + } + adminRedisTemplate.mSet(keyValueMap); + Long dataTime = data.getTime(); + + // 存入td库 + List highList = new ArrayList<>(); + List lowList = new ArrayList<>(); + RTData rtHighData = RTData.builder() .dataTime(dataTime) .deviceId(Long.valueOf(deviceId)) - .values(tdValues) + .values(highFieldMap) .build(); - - list.add(rtData); - tdEngineService.updateYCValues(list, iotModelCode); + RTData rtLowData = RTData.builder() + .dataTime(dataTime) + .deviceId(Long.valueOf(deviceId)) + .values(highFieldMap) + .build(); + highList.add(rtHighData); + lowList.add(rtLowData); + tdEngineService.updateYCHighValues(highList, iotModelCode); + tdEngineService.updateYCLowValues(lowList, iotModelCode); } } diff --git a/das/src/main/resources/mapper/SysImptabmappingMapper.xml b/das/src/main/resources/mapper/SysImptabmappingMapper.xml index a3df07df..fcfa7990 100644 --- a/das/src/main/resources/mapper/SysImptabmappingMapper.xml +++ b/das/src/main/resources/mapper/SysImptabmappingMapper.xml @@ -57,7 +57,7 @@