From acd974f2cb8a3a97a12591ca2fa9c41906a89a73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=B7=E6=88=90=E4=BC=9F?= Date: Mon, 6 Jan 2025 09:36:22 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E8=B0=83=E6=95=B4command=E7=9B=B8=E5=85=B3?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../node/command/AnalogDataCommand.java | 2 +- .../node/command/DeviceEventCommand.java | 2 +- .../node/command/HisHighSpeedCommand.java | 34 +++++++------- .../node/command/HisLowSpeedCommand.java | 45 +++++++++++-------- .../node/command/StateDataCommand.java | 2 +- 5 files changed, 44 insertions(+), 41 deletions(-) diff --git a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java index 7ab7ddd4..4849cbbe 100644 --- a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java @@ -58,7 +58,7 @@ public class AnalogDataCommand implements BaseCommand { */ private void processAnalogData(TerminalMessage data){ - JsonNode dataNode = data.getData(); + JsonNode dataNode = Optional.of(data.getData()).orElseThrow(() -> new ServiceException("data字段缺失")); String deviceId = Optional.of(dataNode.get("deviceId")).orElseThrow(() -> new ServiceException("deviceId字段缺失")).asText(); Long dataTime = Optional.of(dataNode.get("dataTime")).orElseThrow( () -> new ServiceException("dataTime字段缺失")).asLong(); JsonNode values = dataNode.get("values"); diff --git a/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java b/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java index 0e4223e1..3598a7c9 100644 --- a/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java +++ b/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java @@ -55,7 +55,7 @@ public class DeviceEventCommand implements BaseCommand{ JsonNode dataEvent = data.getData(); ObjectMapper objectMapper = new ObjectMapper(); List valueList = new ArrayList<>(); - + // 使用 TypeReference 来指定转换目标类型 List list = objectMapper.convertValue(dataEvent, new TypeReference>() { }); diff --git a/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java b/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java index 4229cfe6..b682ebb8 100644 --- a/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java +++ b/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java @@ -1,5 +1,6 @@ package com.das.modules.node.command; +import com.das.common.exceptions.ServiceException; import com.das.modules.cache.domain.DeviceInfoCache; import com.das.modules.cache.service.CacheService; import com.das.modules.data.service.TDEngineService; @@ -19,49 +20,44 @@ import java.util.*; public class HisHighSpeedCommand implements BaseCommand { @Autowired - NodeMessageService nodeMessageService; + CacheService cacheService; @Autowired TDEngineService tdEngineService; - @Autowired - CacheService cacheService; - @Override public void doCommand(TerminalMessage data) { - log.debug("收到历史高频数据"); + log.debug("收到[历史高频数据]"); try { - //analogData值只存入redis - handleHighSpeed(data); + processHisHighSpeedData(data); } catch (Exception e) { log.error("解析数据异常", e); } } - private void handleHighSpeed(TerminalMessage data) { - JsonNode jsonNode = data.getData(); - String deviceId = jsonNode.get("deviceId").asText(); + public void processHisHighSpeedData(TerminalMessage data) { + JsonNode jsonNode = Optional.of(data.getData()).orElseThrow(() -> new ServiceException("data字段缺失")); + String deviceId = Optional.of(jsonNode.get("deviceId")).orElseThrow( () -> new ServiceException("deviceId字段缺失")).asText(); + Long dataTime = Optional.of(jsonNode.get("dataTime")).orElseThrow( ()-> new ServiceException("dataTime字段缺失")).asLong(); JsonNode values = jsonNode.get("values"); Map keyValueMap = new HashMap<>(); - // 根据设备ID获取对应的物模型code - DeviceInfoCache dev = cacheService.getEquipmentCache().getDeviceInfoCacheById(jsonNode.get("deviceId").asLong()); - + // 根据设备ID获取对应的物模型属性 + DeviceInfoCache dev = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.valueOf(deviceId)); //High数据 Iterator keysHigh = values.fieldNames(); while (keysHigh.hasNext()) { String fieldName = keysHigh.next(); - keyValueMap.put(fieldName, values.get(fieldName)); - + if (cacheService.getIotModelCache().isHighSpeed(dev.getIotModelId(), fieldName)) { + keyValueMap.put(fieldName, values.get(fieldName)); + } } - Long dataTime = jsonNode.get("dataTime").asLong(); - List highList = new ArrayList<>(); + RTData rtHighData = RTData.builder() .dataTime(dataTime) .deviceId(Long.valueOf(deviceId)) .values(keyValueMap) .build(); - highList.add(rtHighData); - tdEngineService.updateYCHighValues(highList, dev.getIotModelCode()); + tdEngineService.updateYCHighValues(List.of(rtHighData), dev.getIotModelCode()); } } diff --git a/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java b/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java index d21909b0..476766a2 100644 --- a/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java +++ b/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java @@ -3,10 +3,10 @@ package com.das.modules.node.command; import com.das.modules.cache.domain.DeviceInfoCache; import com.das.modules.cache.service.CacheService; import com.das.modules.data.service.TDEngineService; -import com.das.modules.equipment.mapper.SysIotModelMapper; import com.das.modules.node.constant.NodeConstant; import com.das.modules.node.domain.bo.RTData; import com.das.modules.node.domain.bo.TerminalMessage; +import com.das.modules.node.service.NodeMessageService; import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -19,47 +19,54 @@ import java.util.*; public class HisLowSpeedCommand implements BaseCommand { @Autowired - TDEngineService tdEngineService; - + CacheService cacheService; @Autowired - CacheService cacheService; + TDEngineService tdEngineService; + @Override public void doCommand(TerminalMessage data) { log.debug("收到[历史低频数据]"); try { //analogData值只存入redis - handleLowSpeed(data); + processHisLowSpeedData(data); } catch (Exception e) { log.error("解析数据异常", e); } } - private void handleLowSpeed(TerminalMessage data) { - JsonNode jsonNode = data.getData(); - String deviceId = jsonNode.get("deviceId").asText(); - JsonNode values = jsonNode.get("values"); + private void processHisLowSpeedData(TerminalMessage data) { + JsonNode dataNode = data.getData(); + if (dataNode == null ) { + log.error("收到无效历史低频数据报文"); + return; + } + String deviceId = Optional.of(dataNode.get("deviceId")).orElseThrow(() -> new RuntimeException("设备ID为空")).asText(); + Long dataTime = Optional.of(dataNode.get("dataTime")).orElseThrow(() -> new RuntimeException("数据时间为空")).asLong(); + + JsonNode values = dataNode.get("values"); Map keyValueMap = new HashMap<>(); - // 根据设备ID获取对应的物模型code - DeviceInfoCache dev = cacheService.getEquipmentCache().getDeviceInfoCacheById(jsonNode.get("deviceId").asLong()); + // 根据设备ID获取对应的物模型属性 + + DeviceInfoCache device = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.valueOf(deviceId)); + + String iotModelCode = device.getIotModelCode(); //Low数据 Iterator keysLow = values.fieldNames(); while (keysLow.hasNext()) { String fieldName = keysLow.next(); - keyValueMap.put(fieldName, values.get(fieldName)); - + if (cacheService.getIotModelCache().isLowSpeed(device.getIotModelId(), fieldName)){ + keyValueMap.put(fieldName, values.get(fieldName)); + } } - Long dataTime = jsonNode.get("dataTime").asLong(); - List highList = new ArrayList<>(); - RTData rtHighData = RTData.builder() + RTData rtLowData = RTData.builder() .dataTime(dataTime) .deviceId(Long.valueOf(deviceId)) .values(keyValueMap) .build(); - highList.add(rtHighData); - tdEngineService.updateYCLowValues(highList, dev.getIotModelCode()); - } + tdEngineService.updateYCLowValues(List.of(rtLowData), iotModelCode); + } } diff --git a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java index 97e12964..63289c59 100644 --- a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java @@ -42,7 +42,7 @@ public class StateDataCommand implements BaseCommand { } private void processStateData(TerminalMessage data) { - JsonNode dataNode = data.getData(); + JsonNode dataNode = Optional.of(data.getData()).orElseThrow(() -> new ServiceException("data字段缺失")); String deviceId = Optional.of(dataNode.get("deviceId")).orElseThrow(() -> new ServiceException("deviceId字段缺失")).asText(); Long dataTime = Optional.of(dataNode.get("dataTime")).orElseThrow( () -> new ServiceException("dataTime字段缺失")).asLong(); JsonNode values = dataNode.get("values"); From bdb29dca50e5fdbdcb9b80b64ad0e0465902fb11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=B7=E6=88=90=E4=BC=9F?= Date: Mon, 6 Jan 2025 09:57:45 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/NodeMessageServiceImpl.java | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java index 17216fb3..fb7030dd 100644 --- a/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java +++ b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java @@ -75,22 +75,6 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node @Resource SysImpTabMappingMapper sysImptabmappingMapper; - @Autowired - AdminRedisTemplate adminRedisTemplate; - - @Autowired - SysIotModelMapper sysIotModelMapper; - - @Autowired - TDEngineService tdEngineService; - - @Autowired - private CacheService cacheService; - - @Autowired - private DataServiceImpl dataService; - - @PostConstruct public void init() { //初始化高性能队列 From 84d715c130d6556795caa43516f489af9061d3d2 Mon Sep 17 00:00:00 2001 From: huguanghan Date: Tue, 7 Jan 2025 09:54:32 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E9=85=8D=E7=BD=AE=E4=B8=8B=E5=8F=91?= =?UTF-8?q?=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/das/modules/node/controller/SysNodeController.java | 4 +++- .../com/das/modules/node/service/NodeMessageService.java | 2 +- .../modules/node/service/impl/NodeMessageServiceImpl.java | 5 ++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/das/src/main/java/com/das/modules/node/controller/SysNodeController.java b/das/src/main/java/com/das/modules/node/controller/SysNodeController.java index f2fa9e6c..1cef87e9 100644 --- a/das/src/main/java/com/das/modules/node/controller/SysNodeController.java +++ b/das/src/main/java/com/das/modules/node/controller/SysNodeController.java @@ -19,6 +19,8 @@ import org.springframework.web.multipart.MultipartFile; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; /** * 物模型Controller @@ -84,7 +86,7 @@ public class SysNodeController { /** 配置下发 */ @PostMapping("/configUpdate") - public R configUpdate(@RequestBody SysNodeDto sysNodeDto) { + public R configUpdate(@RequestBody SysNodeDto sysNodeDto) throws ExecutionException, InterruptedException, TimeoutException { nodeMessageService.sendTerminalConfig(sysNodeDto.getId()); return R.success(); } diff --git a/das/src/main/java/com/das/modules/node/service/NodeMessageService.java b/das/src/main/java/com/das/modules/node/service/NodeMessageService.java index 73c5e1f9..ba373be5 100644 --- a/das/src/main/java/com/das/modules/node/service/NodeMessageService.java +++ b/das/src/main/java/com/das/modules/node/service/NodeMessageService.java @@ -12,7 +12,7 @@ import java.util.concurrent.TimeoutException; public interface NodeMessageService { - JsonNode sendTerminalConfig(Long nodeId); + JsonNode sendTerminalConfig(Long nodeId) throws ExecutionException, InterruptedException, TimeoutException; /** * 向指定采集节点发送指令(无返回值) diff --git a/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java index fb7030dd..afa1131d 100644 --- a/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java +++ b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java @@ -119,7 +119,7 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node } @Override - public JsonNode sendTerminalConfig(Long nodeId) { + public JsonNode sendTerminalConfig(Long nodeId) throws ExecutionException, InterruptedException, TimeoutException { ConfigUpdateVo configUpdateVo = new ConfigUpdateVo(); List links = new ArrayList<>(); try { @@ -216,8 +216,7 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node .time(time) .data(jsonNode) .build(); - sendActionMessage(nodeId, configUpdate); - System.out.println(jsonNode); + sendTerminalMessageWithResult(nodeId,configUpdate); return jsonNode; }