From acd974f2cb8a3a97a12591ca2fa9c41906a89a73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=B7=E6=88=90=E4=BC=9F?= Date: Mon, 6 Jan 2025 09:36:22 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4command=E7=9B=B8=E5=85=B3?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../node/command/AnalogDataCommand.java | 2 +- .../node/command/DeviceEventCommand.java | 2 +- .../node/command/HisHighSpeedCommand.java | 34 +++++++------- .../node/command/HisLowSpeedCommand.java | 45 +++++++++++-------- .../node/command/StateDataCommand.java | 2 +- 5 files changed, 44 insertions(+), 41 deletions(-) diff --git a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java index 7ab7ddd4..4849cbbe 100644 --- a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java @@ -58,7 +58,7 @@ public class AnalogDataCommand implements BaseCommand { */ private void processAnalogData(TerminalMessage data){ - JsonNode dataNode = data.getData(); + JsonNode dataNode = Optional.of(data.getData()).orElseThrow(() -> new ServiceException("data字段缺失")); String deviceId = Optional.of(dataNode.get("deviceId")).orElseThrow(() -> new ServiceException("deviceId字段缺失")).asText(); Long dataTime = Optional.of(dataNode.get("dataTime")).orElseThrow( () -> new ServiceException("dataTime字段缺失")).asLong(); JsonNode values = dataNode.get("values"); diff --git a/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java b/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java index 0e4223e1..3598a7c9 100644 --- a/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java +++ b/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java @@ -55,7 +55,7 @@ public class DeviceEventCommand implements BaseCommand{ JsonNode dataEvent = data.getData(); ObjectMapper objectMapper = new ObjectMapper(); List valueList = new ArrayList<>(); - + // 使用 TypeReference 来指定转换目标类型 List list = objectMapper.convertValue(dataEvent, new TypeReference>() { }); diff --git a/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java b/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java index 4229cfe6..b682ebb8 100644 --- a/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java +++ b/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java @@ -1,5 +1,6 @@ package com.das.modules.node.command; +import com.das.common.exceptions.ServiceException; import com.das.modules.cache.domain.DeviceInfoCache; import com.das.modules.cache.service.CacheService; import com.das.modules.data.service.TDEngineService; @@ -19,49 +20,44 @@ import java.util.*; public class HisHighSpeedCommand implements BaseCommand { @Autowired - NodeMessageService nodeMessageService; + CacheService cacheService; @Autowired TDEngineService tdEngineService; - @Autowired - CacheService cacheService; - @Override public void doCommand(TerminalMessage data) { - log.debug("收到历史高频数据"); + log.debug("收到[历史高频数据]"); try { - //analogData值只存入redis - handleHighSpeed(data); + processHisHighSpeedData(data); } catch (Exception e) { log.error("解析数据异常", e); } } - private void handleHighSpeed(TerminalMessage data) { - JsonNode jsonNode = data.getData(); - String deviceId = jsonNode.get("deviceId").asText(); + public void processHisHighSpeedData(TerminalMessage data) { + JsonNode jsonNode = Optional.of(data.getData()).orElseThrow(() -> new ServiceException("data字段缺失")); + String deviceId = Optional.of(jsonNode.get("deviceId")).orElseThrow( () -> new ServiceException("deviceId字段缺失")).asText(); + Long dataTime = Optional.of(jsonNode.get("dataTime")).orElseThrow( ()-> new ServiceException("dataTime字段缺失")).asLong(); JsonNode values = jsonNode.get("values"); Map keyValueMap = new HashMap<>(); - // 根据设备ID获取对应的物模型code - DeviceInfoCache dev = cacheService.getEquipmentCache().getDeviceInfoCacheById(jsonNode.get("deviceId").asLong()); - + // 根据设备ID获取对应的物模型属性 + DeviceInfoCache dev = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.valueOf(deviceId)); //High数据 Iterator keysHigh = values.fieldNames(); while (keysHigh.hasNext()) { String fieldName = keysHigh.next(); - keyValueMap.put(fieldName, values.get(fieldName)); - + if (cacheService.getIotModelCache().isHighSpeed(dev.getIotModelId(), fieldName)) { + keyValueMap.put(fieldName, values.get(fieldName)); + } } - Long dataTime = jsonNode.get("dataTime").asLong(); - List highList = new ArrayList<>(); + RTData rtHighData = RTData.builder() .dataTime(dataTime) .deviceId(Long.valueOf(deviceId)) .values(keyValueMap) .build(); - highList.add(rtHighData); - tdEngineService.updateYCHighValues(highList, dev.getIotModelCode()); + tdEngineService.updateYCHighValues(List.of(rtHighData), dev.getIotModelCode()); } } diff --git a/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java b/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java index d21909b0..476766a2 100644 --- a/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java +++ b/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java @@ -3,10 +3,10 @@ package com.das.modules.node.command; import com.das.modules.cache.domain.DeviceInfoCache; import com.das.modules.cache.service.CacheService; import com.das.modules.data.service.TDEngineService; -import com.das.modules.equipment.mapper.SysIotModelMapper; import com.das.modules.node.constant.NodeConstant; import com.das.modules.node.domain.bo.RTData; import com.das.modules.node.domain.bo.TerminalMessage; +import com.das.modules.node.service.NodeMessageService; import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -19,47 +19,54 @@ import java.util.*; public class HisLowSpeedCommand implements BaseCommand { @Autowired - TDEngineService tdEngineService; - + CacheService cacheService; @Autowired - CacheService cacheService; + TDEngineService tdEngineService; + @Override public void doCommand(TerminalMessage data) { log.debug("收到[历史低频数据]"); try { //analogData值只存入redis - handleLowSpeed(data); + processHisLowSpeedData(data); } catch (Exception e) { log.error("解析数据异常", e); } } - private void handleLowSpeed(TerminalMessage data) { - JsonNode jsonNode = data.getData(); - String deviceId = jsonNode.get("deviceId").asText(); - JsonNode values = jsonNode.get("values"); + private void processHisLowSpeedData(TerminalMessage data) { + JsonNode dataNode = data.getData(); + if (dataNode == null ) { + log.error("收到无效历史低频数据报文"); + return; + } + String deviceId = Optional.of(dataNode.get("deviceId")).orElseThrow(() -> new RuntimeException("设备ID为空")).asText(); + Long dataTime = Optional.of(dataNode.get("dataTime")).orElseThrow(() -> new RuntimeException("数据时间为空")).asLong(); + + JsonNode values = dataNode.get("values"); Map keyValueMap = new HashMap<>(); - // 根据设备ID获取对应的物模型code - DeviceInfoCache dev = cacheService.getEquipmentCache().getDeviceInfoCacheById(jsonNode.get("deviceId").asLong()); + // 根据设备ID获取对应的物模型属性 + + DeviceInfoCache device = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.valueOf(deviceId)); + + String iotModelCode = device.getIotModelCode(); //Low数据 Iterator keysLow = values.fieldNames(); while (keysLow.hasNext()) { String fieldName = keysLow.next(); - keyValueMap.put(fieldName, values.get(fieldName)); - + if (cacheService.getIotModelCache().isLowSpeed(device.getIotModelId(), fieldName)){ + keyValueMap.put(fieldName, values.get(fieldName)); + } } - Long dataTime = jsonNode.get("dataTime").asLong(); - List highList = new ArrayList<>(); - RTData rtHighData = RTData.builder() + RTData rtLowData = RTData.builder() .dataTime(dataTime) .deviceId(Long.valueOf(deviceId)) .values(keyValueMap) .build(); - highList.add(rtHighData); - tdEngineService.updateYCLowValues(highList, dev.getIotModelCode()); - } + tdEngineService.updateYCLowValues(List.of(rtLowData), iotModelCode); + } } diff --git a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java index 97e12964..63289c59 100644 --- a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java @@ -42,7 +42,7 @@ public class StateDataCommand implements BaseCommand { } private void processStateData(TerminalMessage data) { - JsonNode dataNode = data.getData(); + JsonNode dataNode = Optional.of(data.getData()).orElseThrow(() -> new ServiceException("data字段缺失")); String deviceId = Optional.of(dataNode.get("deviceId")).orElseThrow(() -> new ServiceException("deviceId字段缺失")).asText(); Long dataTime = Optional.of(dataNode.get("dataTime")).orElseThrow( () -> new ServiceException("dataTime字段缺失")).asLong(); JsonNode values = dataNode.get("values");