From 9e82f9e355b84787a6ad53bf1792e96b103343ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=B7=E6=88=90=E4=BC=9F?= Date: Tue, 31 Dec 2024 16:01:51 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BF=81=E7=A7=BBanalogData=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E8=BF=9B=E5=85=A5AnalogDataCommand?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../modules/data/service/TDEngineService.java | 12 ++ .../node/command/AnalogDataCommand.java | 138 +++++++++++++++++- .../node/command/DeviceEventCommand.java | 1 + .../node/command/HeartbeatCommand.java | 2 +- .../node/command/HisHighSpeedCommand.java | 1 + .../node/command/HisLowSpeedCommand.java | 1 + .../node/command/StateDataCommand.java | 1 + .../node/service/NodeMessageService.java | 2 - .../service/impl/NodeMessageServiceImpl.java | 56 ------- 9 files changed, 151 insertions(+), 63 deletions(-) diff --git a/das/src/main/java/com/das/modules/data/service/TDEngineService.java b/das/src/main/java/com/das/modules/data/service/TDEngineService.java index b0d66f3a..184ed25a 100644 --- a/das/src/main/java/com/das/modules/data/service/TDEngineService.java +++ b/das/src/main/java/com/das/modules/data/service/TDEngineService.java @@ -277,6 +277,9 @@ public class TDEngineService { @Async public void updateYCHighValues(List values, String iotModelCode) { + if (values.isEmpty()) { + return; + } StringBuilder sb = new StringBuilder(1024 * 1024); try (Connection conn = hikariDataSource.getConnection(); Statement pstmt = conn.createStatement()) { @@ -315,6 +318,9 @@ public class TDEngineService { @Async public void updateYCLowValues(List values, String iotModelCode) { + if (values.isEmpty()) { + return; + } StringBuilder sb = new StringBuilder(1024 * 1024); try (Connection conn = hikariDataSource.getConnection(); Statement pstmt = conn.createStatement()) { @@ -353,6 +359,9 @@ public class TDEngineService { @Async public void updateCalFieldValues(List values) { + if (values.isEmpty()) { + return; + } StringBuilder sb = new StringBuilder(1024 * 1024); try (Connection conn = hikariDataSource.getConnection(); Statement pstmt = conn.createStatement()) { @@ -387,6 +396,9 @@ public class TDEngineService { @Async public void updateDeviceEventValues(List values) { + if (values.isEmpty()) { + return; + } StringBuilder sb = new StringBuilder(1024 * 1024); try (Connection conn = hikariDataSource.getConnection(); Statement pstmt = conn.createStatement()) { diff --git a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java index fcb1717c..7b427e20 100644 --- a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java @@ -1,26 +1,156 @@ package com.das.modules.node.command; +import com.das.common.exceptions.ServiceException; +import com.das.modules.cache.domain.DeviceInfoCache; +import com.das.modules.cache.service.CacheService; +import com.das.modules.cache.service.IotModelCache; +import com.das.modules.data.service.TDEngineService; import com.das.modules.node.constant.NodeConstant; +import com.das.modules.node.domain.bo.RTData; import com.das.modules.node.domain.bo.TerminalMessage; import com.das.modules.node.service.NodeMessageService; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; +import java.util.*; + @Service(value = NodeConstant.ANALOG_DATA) @Slf4j public class AnalogDataCommand implements BaseCommand { @Autowired - NodeMessageService nodeMessageService; + CacheService cacheService; + + @Autowired + TDEngineService tdEngineService; + + @Autowired + RedisTemplate redisTemplate; + + /** + * 执行命令方法 + * 当接收到终端消息时,此方法被调用以处理数据 + * 主要负责处理模拟量数据的接收和处理 + * + * @param data 包含模拟量数据的TerminalMessage对象 + */ @Override public void doCommand(TerminalMessage data) { + if (log.isDebugEnabled()){ + log.debug("收到实时数据[模拟量]"); + log.debug("数据内容: {}", data.toJsonString()); + } try { - //analogData值只存入redis - nodeMessageService.handleData(data); + processAnalogData(data); } catch (Exception e) { - log.error("解析数据异常", e); + log.error("处理实时数据时产生异常", e); } } + + /** + * 处理实时数据 + * @param data + */ + + private void processAnalogData(TerminalMessage data){ + JsonNode dataNode = data.getData(); + String deviceId = Optional.of(dataNode.get("deviceId")).orElseThrow(() -> new ServiceException("deviceId字段缺失")).asText(); + Long dataTime = Optional.of(dataNode.get("dataTime")).orElseThrow( () -> new ServiceException("dataTime字段缺失")).asLong(); + JsonNode values = dataNode.get("values"); + JsonNode archiveValues = dataNode.get("archiveValues"); + //排除空数据 + if (values == null && archiveValues == null){ + return; + } + //如果values有值,则存入redis + Map redisValues = new HashMap<>(); + if (values != null && values.isObject()){ + for (Iterator it = values.fieldNames(); it.hasNext(); ) { + String valueName = it.next(); + String key = String.format("RT:%s:%s", deviceId, valueName.toLowerCase()); + redisValues.put(key, values.get(valueName)); + } + } + DeviceInfoCache dev = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.parseLong(deviceId)); + IotModelCache iotCache = cacheService.getIotModelCache(); + //如果archiveValues有值,则存入td,同时也更新redis + if (archiveValues != null && archiveValues.isObject()){ + RTData hiSpeedData = new RTData(); + hiSpeedData.setDeviceId(Long.parseLong(deviceId)); + hiSpeedData.setDataTime(dataTime); + Map hiSpeedValues = new HashMap<>(); + RTData lowSpeedData = new RTData(); + lowSpeedData.setDeviceId(Long.parseLong(deviceId)); + lowSpeedData.setDataTime(dataTime); + Map lowSpeedValues = new HashMap<>(); + for (Iterator it = archiveValues.fieldNames(); it.hasNext(); ) { + //加入redis更新列表 + String valueName = it.next(); + String key = String.format("RT:%s:%s", deviceId, valueName.toLowerCase()); + redisValues.put(key, archiveValues.get(valueName)); + //加入td更新列表 + if (iotCache.isHighSpeed(dev.getIotModelId(), valueName)) { + hiSpeedValues.put(valueName, archiveValues.get(valueName)); + } else if (iotCache.isLowSpeed(dev.getIotModelId(), valueName)) { + lowSpeedValues.put(valueName, archiveValues.get(valueName)); + } + } + hiSpeedData.setValues(hiSpeedValues); + lowSpeedData.setValues(lowSpeedValues); + tdEngineService.updateYCHighValues(List.of(hiSpeedData), dev.getModel()); + tdEngineService.updateYCLowValues(List.of(lowSpeedData), dev.getModel()); + } + redisTemplate.opsForValue().multiSet(redisValues); +// Map keyValueMap = new HashMap<>(); +// String modelCode = dataService.deviceModelMap.get(deviceId); +// Set highKey = dataService.highIotFieldMap.get(modelCode).keySet(); +// Set lowKey = dataService.lowIotFieldMap.get(modelCode).keySet(); +// Map highSpeedValueMap = new HashMap<>(); +// Map lowSpeedValueMap = new HashMap<>(); + +// //数据入redis +// if (values != null){ +// Iterator keysHigh = values.fieldNames(); +// while (keysHigh.hasNext()) { +// String fieldName = keysHigh.next(); +// String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase()); +// keyValueMap.put(key, values.get(fieldName)); +// } +// } +// +// if (archiveValues != null){ +// Iterator archiveKeys = archiveValues.fieldNames(); +// while (archiveKeys.hasNext()) { +// String fieldName = archiveKeys.next(); +// String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase()); +// keyValueMap.put(key, archiveValues.get(fieldName)); +// if (highKey.contains(fieldName)) { +// highSpeedValueMap.put(fieldName, archiveValues.get(fieldName)); +// } +// if (lowKey.contains(fieldName)) { +// lowSpeedValueMap.put(fieldName, archiveValues.get(fieldName)); +// } +// } +// } +// //更新td +// if (!highSpeedValueMap.isEmpty()) { +// List highSpeedData = new ArrayList<>(); +// RTData rtHighData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(highSpeedValueMap).build(); +// highSpeedData.add(rtHighData); +// tdEngineService.updateYCHighValues(highSpeedData, modelCode); +// } +// +// if (!lowSpeedValueMap.isEmpty()) { +// List lowSpeedData = new ArrayList<>(); +// RTData rtLowData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(lowSpeedValueMap).build(); +// lowSpeedData.add(rtLowData); +// tdEngineService.updateYCLowValues(lowSpeedData, modelCode); +// } +// adminRedisTemplate.mSet(keyValueMap); + } } diff --git a/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java b/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java index aeb96bcf..4a61029c 100644 --- a/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java +++ b/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java @@ -15,6 +15,7 @@ public class DeviceEventCommand implements BaseCommand{ @Override public void doCommand(TerminalMessage data) { + log.debug("收到实时[事件告警]"); try { nodeMessageService.handleDeviceEvent(data); diff --git a/das/src/main/java/com/das/modules/node/command/HeartbeatCommand.java b/das/src/main/java/com/das/modules/node/command/HeartbeatCommand.java index a646940c..9eb13de7 100644 --- a/das/src/main/java/com/das/modules/node/command/HeartbeatCommand.java +++ b/das/src/main/java/com/das/modules/node/command/HeartbeatCommand.java @@ -34,7 +34,7 @@ public class HeartbeatCommand implements BaseCommand{ */ @Override public void doCommand(TerminalMessage data) { - log.info("收到[heartbeat]报文"); + log.debug("收到[heartbeat]报文"); // 解析心跳报文中的数据信息 JsonNode dataInfo = data.getData(); diff --git a/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java b/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java index 78662a09..81bfa8cc 100644 --- a/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java +++ b/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java @@ -15,6 +15,7 @@ public class HisHighSpeedCommand implements BaseCommand { NodeMessageService nodeMessageService; @Override public void doCommand(TerminalMessage data) { + log.debug("收到历史高频数据"); try { //analogData值只存入redis nodeMessageService.handleHighSpeed(data); diff --git a/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java b/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java index 82d5497f..5c7911d0 100644 --- a/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java +++ b/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java @@ -15,6 +15,7 @@ public class HisLowSpeedCommand implements BaseCommand { NodeMessageService nodeMessageService; @Override public void doCommand(TerminalMessage data) { + log.debug("收到[历史低频数据]"); try { //analogData值只存入redis nodeMessageService.handleLowSpeed(data); diff --git a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java index eb61758a..ec01ed44 100644 --- a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java @@ -16,6 +16,7 @@ public class StateDataCommand implements BaseCommand { @Override public void doCommand(TerminalMessage data) { + log.debug("收到实时数据[状态量]"); try { //只存入redis nodeMessageService.handleData(data); diff --git a/das/src/main/java/com/das/modules/node/service/NodeMessageService.java b/das/src/main/java/com/das/modules/node/service/NodeMessageService.java index 1953a9d5..22b78ca7 100644 --- a/das/src/main/java/com/das/modules/node/service/NodeMessageService.java +++ b/das/src/main/java/com/das/modules/node/service/NodeMessageService.java @@ -14,8 +14,6 @@ public interface NodeMessageService { JsonNode sendTerminalConfig(Long nodeId); - void handleData(TerminalMessage data); - void handleHighSpeed(TerminalMessage data); void handleLowSpeed(TerminalMessage data); diff --git a/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java index e9e14da1..3d6f73b9 100644 --- a/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java +++ b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java @@ -238,62 +238,6 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node } - @Override - public void handleData(TerminalMessage data) { - JsonNode jsonNode = data.getData(); - log.debug("收到消息:{}",data.getData()); - String deviceId = jsonNode.get("deviceId").asText(); - JsonNode values = jsonNode.get("values"); - JsonNode archiveValues = jsonNode.get("archiveValues"); - Long dataTime = jsonNode.get("dataTime").asLong(); - Map keyValueMap = new HashMap<>(); - String modelCode = dataService.deviceModelMap.get(deviceId); - Set highKey = dataService.highIotFieldMap.get(modelCode).keySet(); - Set lowKey = dataService.lowIotFieldMap.get(modelCode).keySet(); - Map highSpeedValueMap = new HashMap<>(); - Map lowSpeedValueMap = new HashMap<>(); - - //数据入redis - if (values != null){ - Iterator keysHigh = values.fieldNames(); - while (keysHigh.hasNext()) { - String fieldName = keysHigh.next(); - String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase()); - keyValueMap.put(key, values.get(fieldName)); - } - } - - if (archiveValues != null){ - Iterator archiveKeys = archiveValues.fieldNames(); - while (archiveKeys.hasNext()) { - String fieldName = archiveKeys.next(); - String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase()); - keyValueMap.put(key, archiveValues.get(fieldName)); - if (highKey.contains(fieldName)) { - highSpeedValueMap.put(fieldName, archiveValues.get(fieldName)); - } - if (lowKey.contains(fieldName)) { - lowSpeedValueMap.put(fieldName, archiveValues.get(fieldName)); - } - } - } - //更新td - if (!highSpeedValueMap.isEmpty()) { - List highSpeedData = new ArrayList<>(); - RTData rtHighData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(highSpeedValueMap).build(); - highSpeedData.add(rtHighData); - tdEngineService.updateYCHighValues(highSpeedData, modelCode); - } - - if (!lowSpeedValueMap.isEmpty()) { - List lowSpeedData = new ArrayList<>(); - RTData rtLowData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(lowSpeedValueMap).build(); - lowSpeedData.add(rtLowData); - tdEngineService.updateYCLowValues(lowSpeedData, modelCode); - } - adminRedisTemplate.mSet(keyValueMap); - } - @Override public void handleHighSpeed(TerminalMessage data) { JsonNode jsonNode = data.getData();