From 9e82f9e355b84787a6ad53bf1792e96b103343ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=B7=E6=88=90=E4=BC=9F?= Date: Tue, 31 Dec 2024 16:01:51 +0800 Subject: [PATCH 1/6] =?UTF-8?q?=E8=BF=81=E7=A7=BBanalogData=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=E8=BF=9B=E5=85=A5AnalogDataCommand?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../modules/data/service/TDEngineService.java | 12 ++ .../node/command/AnalogDataCommand.java | 138 +++++++++++++++++- .../node/command/DeviceEventCommand.java | 1 + .../node/command/HeartbeatCommand.java | 2 +- .../node/command/HisHighSpeedCommand.java | 1 + .../node/command/HisLowSpeedCommand.java | 1 + .../node/command/StateDataCommand.java | 1 + .../node/service/NodeMessageService.java | 2 - .../service/impl/NodeMessageServiceImpl.java | 56 ------- 9 files changed, 151 insertions(+), 63 deletions(-) diff --git a/das/src/main/java/com/das/modules/data/service/TDEngineService.java b/das/src/main/java/com/das/modules/data/service/TDEngineService.java index b0d66f3a..184ed25a 100644 --- a/das/src/main/java/com/das/modules/data/service/TDEngineService.java +++ b/das/src/main/java/com/das/modules/data/service/TDEngineService.java @@ -277,6 +277,9 @@ public class TDEngineService { @Async public void updateYCHighValues(List values, String iotModelCode) { + if (values.isEmpty()) { + return; + } StringBuilder sb = new StringBuilder(1024 * 1024); try (Connection conn = hikariDataSource.getConnection(); Statement pstmt = conn.createStatement()) { @@ -315,6 +318,9 @@ public class TDEngineService { @Async public void updateYCLowValues(List values, String iotModelCode) { + if (values.isEmpty()) { + return; + } StringBuilder sb = new StringBuilder(1024 * 1024); try (Connection conn = hikariDataSource.getConnection(); Statement pstmt = conn.createStatement()) { @@ -353,6 +359,9 @@ public class TDEngineService { @Async public void updateCalFieldValues(List values) { + if (values.isEmpty()) { + return; + } StringBuilder sb = new StringBuilder(1024 * 1024); try (Connection conn = hikariDataSource.getConnection(); Statement pstmt = conn.createStatement()) { @@ -387,6 +396,9 @@ public class TDEngineService { @Async public void updateDeviceEventValues(List values) { + if (values.isEmpty()) { + return; + } StringBuilder sb = new StringBuilder(1024 * 1024); try (Connection conn = hikariDataSource.getConnection(); Statement pstmt = conn.createStatement()) { diff --git a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java index fcb1717c..7b427e20 100644 --- a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java @@ -1,26 +1,156 @@ package com.das.modules.node.command; +import com.das.common.exceptions.ServiceException; +import com.das.modules.cache.domain.DeviceInfoCache; +import com.das.modules.cache.service.CacheService; +import com.das.modules.cache.service.IotModelCache; +import com.das.modules.data.service.TDEngineService; import com.das.modules.node.constant.NodeConstant; +import com.das.modules.node.domain.bo.RTData; import com.das.modules.node.domain.bo.TerminalMessage; import com.das.modules.node.service.NodeMessageService; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; +import java.util.*; + @Service(value = NodeConstant.ANALOG_DATA) @Slf4j public class AnalogDataCommand implements BaseCommand { @Autowired - NodeMessageService nodeMessageService; + CacheService cacheService; + + @Autowired + TDEngineService tdEngineService; + + @Autowired + RedisTemplate redisTemplate; + + /** + * 执行命令方法 + * 当接收到终端消息时,此方法被调用以处理数据 + * 主要负责处理模拟量数据的接收和处理 + * + * @param data 包含模拟量数据的TerminalMessage对象 + */ @Override public void doCommand(TerminalMessage data) { + if (log.isDebugEnabled()){ + log.debug("收到实时数据[模拟量]"); + log.debug("数据内容: {}", data.toJsonString()); + } try { - //analogData值只存入redis - nodeMessageService.handleData(data); + processAnalogData(data); } catch (Exception e) { - log.error("解析数据异常", e); + log.error("处理实时数据时产生异常", e); } } + + /** + * 处理实时数据 + * @param data + */ + + private void processAnalogData(TerminalMessage data){ + JsonNode dataNode = data.getData(); + String deviceId = Optional.of(dataNode.get("deviceId")).orElseThrow(() -> new ServiceException("deviceId字段缺失")).asText(); + Long dataTime = Optional.of(dataNode.get("dataTime")).orElseThrow( () -> new ServiceException("dataTime字段缺失")).asLong(); + JsonNode values = dataNode.get("values"); + JsonNode archiveValues = dataNode.get("archiveValues"); + //排除空数据 + if (values == null && archiveValues == null){ + return; + } + //如果values有值,则存入redis + Map redisValues = new HashMap<>(); + if (values != null && values.isObject()){ + for (Iterator it = values.fieldNames(); it.hasNext(); ) { + String valueName = it.next(); + String key = String.format("RT:%s:%s", deviceId, valueName.toLowerCase()); + redisValues.put(key, values.get(valueName)); + } + } + DeviceInfoCache dev = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.parseLong(deviceId)); + IotModelCache iotCache = cacheService.getIotModelCache(); + //如果archiveValues有值,则存入td,同时也更新redis + if (archiveValues != null && archiveValues.isObject()){ + RTData hiSpeedData = new RTData(); + hiSpeedData.setDeviceId(Long.parseLong(deviceId)); + hiSpeedData.setDataTime(dataTime); + Map hiSpeedValues = new HashMap<>(); + RTData lowSpeedData = new RTData(); + lowSpeedData.setDeviceId(Long.parseLong(deviceId)); + lowSpeedData.setDataTime(dataTime); + Map lowSpeedValues = new HashMap<>(); + for (Iterator it = archiveValues.fieldNames(); it.hasNext(); ) { + //加入redis更新列表 + String valueName = it.next(); + String key = String.format("RT:%s:%s", deviceId, valueName.toLowerCase()); + redisValues.put(key, archiveValues.get(valueName)); + //加入td更新列表 + if (iotCache.isHighSpeed(dev.getIotModelId(), valueName)) { + hiSpeedValues.put(valueName, archiveValues.get(valueName)); + } else if (iotCache.isLowSpeed(dev.getIotModelId(), valueName)) { + lowSpeedValues.put(valueName, archiveValues.get(valueName)); + } + } + hiSpeedData.setValues(hiSpeedValues); + lowSpeedData.setValues(lowSpeedValues); + tdEngineService.updateYCHighValues(List.of(hiSpeedData), dev.getModel()); + tdEngineService.updateYCLowValues(List.of(lowSpeedData), dev.getModel()); + } + redisTemplate.opsForValue().multiSet(redisValues); +// Map keyValueMap = new HashMap<>(); +// String modelCode = dataService.deviceModelMap.get(deviceId); +// Set highKey = dataService.highIotFieldMap.get(modelCode).keySet(); +// Set lowKey = dataService.lowIotFieldMap.get(modelCode).keySet(); +// Map highSpeedValueMap = new HashMap<>(); +// Map lowSpeedValueMap = new HashMap<>(); + +// //数据入redis +// if (values != null){ +// Iterator keysHigh = values.fieldNames(); +// while (keysHigh.hasNext()) { +// String fieldName = keysHigh.next(); +// String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase()); +// keyValueMap.put(key, values.get(fieldName)); +// } +// } +// +// if (archiveValues != null){ +// Iterator archiveKeys = archiveValues.fieldNames(); +// while (archiveKeys.hasNext()) { +// String fieldName = archiveKeys.next(); +// String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase()); +// keyValueMap.put(key, archiveValues.get(fieldName)); +// if (highKey.contains(fieldName)) { +// highSpeedValueMap.put(fieldName, archiveValues.get(fieldName)); +// } +// if (lowKey.contains(fieldName)) { +// lowSpeedValueMap.put(fieldName, archiveValues.get(fieldName)); +// } +// } +// } +// //更新td +// if (!highSpeedValueMap.isEmpty()) { +// List highSpeedData = new ArrayList<>(); +// RTData rtHighData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(highSpeedValueMap).build(); +// highSpeedData.add(rtHighData); +// tdEngineService.updateYCHighValues(highSpeedData, modelCode); +// } +// +// if (!lowSpeedValueMap.isEmpty()) { +// List lowSpeedData = new ArrayList<>(); +// RTData rtLowData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(lowSpeedValueMap).build(); +// lowSpeedData.add(rtLowData); +// tdEngineService.updateYCLowValues(lowSpeedData, modelCode); +// } +// adminRedisTemplate.mSet(keyValueMap); + } } diff --git a/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java b/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java index aeb96bcf..4a61029c 100644 --- a/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java +++ b/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java @@ -15,6 +15,7 @@ public class DeviceEventCommand implements BaseCommand{ @Override public void doCommand(TerminalMessage data) { + log.debug("收到实时[事件告警]"); try { nodeMessageService.handleDeviceEvent(data); diff --git a/das/src/main/java/com/das/modules/node/command/HeartbeatCommand.java b/das/src/main/java/com/das/modules/node/command/HeartbeatCommand.java index a646940c..9eb13de7 100644 --- a/das/src/main/java/com/das/modules/node/command/HeartbeatCommand.java +++ b/das/src/main/java/com/das/modules/node/command/HeartbeatCommand.java @@ -34,7 +34,7 @@ public class HeartbeatCommand implements BaseCommand{ */ @Override public void doCommand(TerminalMessage data) { - log.info("收到[heartbeat]报文"); + log.debug("收到[heartbeat]报文"); // 解析心跳报文中的数据信息 JsonNode dataInfo = data.getData(); diff --git a/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java b/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java index 78662a09..81bfa8cc 100644 --- a/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java +++ b/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java @@ -15,6 +15,7 @@ public class HisHighSpeedCommand implements BaseCommand { NodeMessageService nodeMessageService; @Override public void doCommand(TerminalMessage data) { + log.debug("收到历史高频数据"); try { //analogData值只存入redis nodeMessageService.handleHighSpeed(data); diff --git a/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java b/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java index 82d5497f..5c7911d0 100644 --- a/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java +++ b/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java @@ -15,6 +15,7 @@ public class HisLowSpeedCommand implements BaseCommand { NodeMessageService nodeMessageService; @Override public void doCommand(TerminalMessage data) { + log.debug("收到[历史低频数据]"); try { //analogData值只存入redis nodeMessageService.handleLowSpeed(data); diff --git a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java index eb61758a..ec01ed44 100644 --- a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java @@ -16,6 +16,7 @@ public class StateDataCommand implements BaseCommand { @Override public void doCommand(TerminalMessage data) { + log.debug("收到实时数据[状态量]"); try { //只存入redis nodeMessageService.handleData(data); diff --git a/das/src/main/java/com/das/modules/node/service/NodeMessageService.java b/das/src/main/java/com/das/modules/node/service/NodeMessageService.java index 1953a9d5..22b78ca7 100644 --- a/das/src/main/java/com/das/modules/node/service/NodeMessageService.java +++ b/das/src/main/java/com/das/modules/node/service/NodeMessageService.java @@ -14,8 +14,6 @@ public interface NodeMessageService { JsonNode sendTerminalConfig(Long nodeId); - void handleData(TerminalMessage data); - void handleHighSpeed(TerminalMessage data); void handleLowSpeed(TerminalMessage data); diff --git a/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java index e9e14da1..3d6f73b9 100644 --- a/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java +++ b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java @@ -238,62 +238,6 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node } - @Override - public void handleData(TerminalMessage data) { - JsonNode jsonNode = data.getData(); - log.debug("收到消息:{}",data.getData()); - String deviceId = jsonNode.get("deviceId").asText(); - JsonNode values = jsonNode.get("values"); - JsonNode archiveValues = jsonNode.get("archiveValues"); - Long dataTime = jsonNode.get("dataTime").asLong(); - Map keyValueMap = new HashMap<>(); - String modelCode = dataService.deviceModelMap.get(deviceId); - Set highKey = dataService.highIotFieldMap.get(modelCode).keySet(); - Set lowKey = dataService.lowIotFieldMap.get(modelCode).keySet(); - Map highSpeedValueMap = new HashMap<>(); - Map lowSpeedValueMap = new HashMap<>(); - - //数据入redis - if (values != null){ - Iterator keysHigh = values.fieldNames(); - while (keysHigh.hasNext()) { - String fieldName = keysHigh.next(); - String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase()); - keyValueMap.put(key, values.get(fieldName)); - } - } - - if (archiveValues != null){ - Iterator archiveKeys = archiveValues.fieldNames(); - while (archiveKeys.hasNext()) { - String fieldName = archiveKeys.next(); - String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase()); - keyValueMap.put(key, archiveValues.get(fieldName)); - if (highKey.contains(fieldName)) { - highSpeedValueMap.put(fieldName, archiveValues.get(fieldName)); - } - if (lowKey.contains(fieldName)) { - lowSpeedValueMap.put(fieldName, archiveValues.get(fieldName)); - } - } - } - //更新td - if (!highSpeedValueMap.isEmpty()) { - List highSpeedData = new ArrayList<>(); - RTData rtHighData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(highSpeedValueMap).build(); - highSpeedData.add(rtHighData); - tdEngineService.updateYCHighValues(highSpeedData, modelCode); - } - - if (!lowSpeedValueMap.isEmpty()) { - List lowSpeedData = new ArrayList<>(); - RTData rtLowData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(lowSpeedValueMap).build(); - lowSpeedData.add(rtLowData); - tdEngineService.updateYCLowValues(lowSpeedData, modelCode); - } - adminRedisTemplate.mSet(keyValueMap); - } - @Override public void handleHighSpeed(TerminalMessage data) { JsonNode jsonNode = data.getData(); From f9484cfce624fabfcd0d71002f122bafbe6c8953 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=B7=E6=88=90=E4=BC=9F?= Date: Tue, 31 Dec 2024 16:32:52 +0800 Subject: [PATCH 2/6] =?UTF-8?q?=E8=BF=81=E7=A7=BBstateData=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=E8=BF=9B=E5=85=A5StateDataCommand?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../node/command/AnalogDataCommand.java | 46 ------------ .../node/command/StateDataCommand.java | 71 ++++++++++++++++++- 2 files changed, 69 insertions(+), 48 deletions(-) diff --git a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java index 7b427e20..8a59b3e1 100644 --- a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java @@ -106,51 +106,5 @@ public class AnalogDataCommand implements BaseCommand { tdEngineService.updateYCLowValues(List.of(lowSpeedData), dev.getModel()); } redisTemplate.opsForValue().multiSet(redisValues); -// Map keyValueMap = new HashMap<>(); -// String modelCode = dataService.deviceModelMap.get(deviceId); -// Set highKey = dataService.highIotFieldMap.get(modelCode).keySet(); -// Set lowKey = dataService.lowIotFieldMap.get(modelCode).keySet(); -// Map highSpeedValueMap = new HashMap<>(); -// Map lowSpeedValueMap = new HashMap<>(); - -// //数据入redis -// if (values != null){ -// Iterator keysHigh = values.fieldNames(); -// while (keysHigh.hasNext()) { -// String fieldName = keysHigh.next(); -// String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase()); -// keyValueMap.put(key, values.get(fieldName)); -// } -// } -// -// if (archiveValues != null){ -// Iterator archiveKeys = archiveValues.fieldNames(); -// while (archiveKeys.hasNext()) { -// String fieldName = archiveKeys.next(); -// String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase()); -// keyValueMap.put(key, archiveValues.get(fieldName)); -// if (highKey.contains(fieldName)) { -// highSpeedValueMap.put(fieldName, archiveValues.get(fieldName)); -// } -// if (lowKey.contains(fieldName)) { -// lowSpeedValueMap.put(fieldName, archiveValues.get(fieldName)); -// } -// } -// } -// //更新td -// if (!highSpeedValueMap.isEmpty()) { -// List highSpeedData = new ArrayList<>(); -// RTData rtHighData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(highSpeedValueMap).build(); -// highSpeedData.add(rtHighData); -// tdEngineService.updateYCHighValues(highSpeedData, modelCode); -// } -// -// if (!lowSpeedValueMap.isEmpty()) { -// List lowSpeedData = new ArrayList<>(); -// RTData rtLowData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(lowSpeedValueMap).build(); -// lowSpeedData.add(rtLowData); -// tdEngineService.updateYCLowValues(lowSpeedData, modelCode); -// } -// adminRedisTemplate.mSet(keyValueMap); } } diff --git a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java index ec01ed44..db719dd4 100644 --- a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java @@ -1,27 +1,94 @@ package com.das.modules.node.command; +import com.das.common.exceptions.ServiceException; +import com.das.modules.cache.domain.DeviceInfoCache; +import com.das.modules.cache.service.CacheService; +import com.das.modules.cache.service.IotModelCache; +import com.das.modules.data.service.TDEngineService; import com.das.modules.node.constant.NodeConstant; +import com.das.modules.node.domain.bo.RTData; import com.das.modules.node.domain.bo.TerminalMessage; import com.das.modules.node.service.NodeMessageService; +import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; +import java.util.*; + @Service(value = NodeConstant.STATE_DATA) @Slf4j public class StateDataCommand implements BaseCommand { @Autowired - NodeMessageService nodeMessageService; + CacheService cacheService; + + @Autowired + RedisTemplate redisTemplate; + + @Autowired + TDEngineService tdEngineService; @Override public void doCommand(TerminalMessage data) { log.debug("收到实时数据[状态量]"); try { //只存入redis - nodeMessageService.handleData(data); + processStateData(data); } catch (Exception e) { log.error("解析数据异常", e); } } + + private void processStateData(TerminalMessage data) { + JsonNode dataNode = data.getData(); + String deviceId = Optional.of(dataNode.get("deviceId")).orElseThrow(() -> new ServiceException("deviceId字段缺失")).asText(); + Long dataTime = Optional.of(dataNode.get("dataTime")).orElseThrow( () -> new ServiceException("dataTime字段缺失")).asLong(); + JsonNode values = dataNode.get("values"); + JsonNode archiveValues = dataNode.get("archiveValues"); + //排除空数据 + if (values == null && archiveValues == null){ + return; + } + //如果values有值,则存入redis + Map redisValues = new HashMap<>(); + if (values != null && values.isObject()){ + for (Iterator it = values.fieldNames(); it.hasNext(); ) { + String valueName = it.next(); + String key = String.format("RT:%s:%s", deviceId, valueName.toLowerCase()); + redisValues.put(key, values.get(valueName)); + } + } + DeviceInfoCache dev = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.parseLong(deviceId)); + IotModelCache iotCache = cacheService.getIotModelCache(); + //如果archiveValues有值,则存入td,同时也更新redis + if (archiveValues != null && archiveValues.isObject()){ + RTData hiSpeedData = new RTData(); + hiSpeedData.setDeviceId(Long.parseLong(deviceId)); + hiSpeedData.setDataTime(dataTime); + Map hiSpeedValues = new HashMap<>(); + RTData lowSpeedData = new RTData(); + lowSpeedData.setDeviceId(Long.parseLong(deviceId)); + lowSpeedData.setDataTime(dataTime); + Map lowSpeedValues = new HashMap<>(); + for (Iterator it = archiveValues.fieldNames(); it.hasNext(); ) { + //加入redis更新列表 + String valueName = it.next(); + String key = String.format("RT:%s:%s", deviceId, valueName.toLowerCase()); + redisValues.put(key, archiveValues.get(valueName)); + //加入td更新列表 + if (iotCache.isHighSpeed(dev.getIotModelId(), valueName)) { + hiSpeedValues.put(valueName, archiveValues.get(valueName)); + } else if (iotCache.isLowSpeed(dev.getIotModelId(), valueName)) { + lowSpeedValues.put(valueName, archiveValues.get(valueName)); + } + } + hiSpeedData.setValues(hiSpeedValues); + lowSpeedData.setValues(lowSpeedValues); + tdEngineService.updateYCHighValues(List.of(hiSpeedData), dev.getModel()); + tdEngineService.updateYCLowValues(List.of(lowSpeedData), dev.getModel()); + } + redisTemplate.opsForValue().multiSet(redisValues); + } } From 633566bda89feab89806d60f0cb26ed41c1bfd1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=B7=E6=88=90=E4=BC=9F?= Date: Tue, 31 Dec 2024 16:43:00 +0800 Subject: [PATCH 3/6] =?UTF-8?q?=E8=AE=BE=E5=A4=87=E7=BC=93=E5=AD=98?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0iotModelCode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/das/modules/cache/domain/DeviceInfoCache.java | 2 ++ .../modules/cache/service/impl/EquipmentCacheImpl.java | 9 ++++++++- .../com/das/modules/node/command/AnalogDataCommand.java | 4 ++-- .../com/das/modules/node/command/StateDataCommand.java | 5 +++-- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/das/src/main/java/com/das/modules/cache/domain/DeviceInfoCache.java b/das/src/main/java/com/das/modules/cache/domain/DeviceInfoCache.java index 892f7f9b..60e1f351 100644 --- a/das/src/main/java/com/das/modules/cache/domain/DeviceInfoCache.java +++ b/das/src/main/java/com/das/modules/cache/domain/DeviceInfoCache.java @@ -34,6 +34,8 @@ public class DeviceInfoCache { */ private Long iotModelId; + private String iotModelCode; + /** * 制造商 */ diff --git a/das/src/main/java/com/das/modules/cache/service/impl/EquipmentCacheImpl.java b/das/src/main/java/com/das/modules/cache/service/impl/EquipmentCacheImpl.java index 6426fa40..1e0a930e 100644 --- a/das/src/main/java/com/das/modules/cache/service/impl/EquipmentCacheImpl.java +++ b/das/src/main/java/com/das/modules/cache/service/impl/EquipmentCacheImpl.java @@ -1,9 +1,11 @@ package com.das.modules.cache.service.impl; +import cn.hutool.core.util.StrUtil; import com.das.modules.cache.domain.DeviceInfoCache; import com.das.modules.cache.service.EquipmentCache; import com.das.modules.equipment.entity.SysEquipment; import com.das.modules.equipment.mapper.SysEquipmentMapper; +import com.das.modules.equipment.mapper.SysIotModelMapper; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import org.springframework.beans.factory.annotation.Autowired; @@ -18,7 +20,8 @@ import java.util.concurrent.ConcurrentHashMap; public class EquipmentCacheImpl implements EquipmentCache { @Autowired SysEquipmentMapper sysEquipmentMapper; - + @Autowired + SysIotModelMapper sysIotModelMapper; /** * 设备CODE索引,用于通过设备CODE访问设备缓存信息 @@ -43,6 +46,10 @@ public class EquipmentCacheImpl implements EquipmentCache { deviceInfoCache.setMadeinFactory(equipment.getMadeinFactory()); deviceInfoCache.setParentDeviceId(equipment.getParentEquipmentId()); deviceInfoCache.setIotModelId(equipment.getIotModelId()); + String iotModelCode = sysIotModelMapper.getIotModel(equipment.getIotModelId()); + if (StrUtil.isNotBlank(iotModelCode)){ + deviceInfoCache.setIotModelCode(iotModelCode); + } //创建Code索引 deviceCodeIndex.put(deviceInfoCache.getDeviceCode(),deviceInfoCache); //创建Id索引 diff --git a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java index 8a59b3e1..7ab7ddd4 100644 --- a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java @@ -102,8 +102,8 @@ public class AnalogDataCommand implements BaseCommand { } hiSpeedData.setValues(hiSpeedValues); lowSpeedData.setValues(lowSpeedValues); - tdEngineService.updateYCHighValues(List.of(hiSpeedData), dev.getModel()); - tdEngineService.updateYCLowValues(List.of(lowSpeedData), dev.getModel()); + tdEngineService.updateYCHighValues(List.of(hiSpeedData), dev.getIotModelCode()); + tdEngineService.updateYCLowValues(List.of(lowSpeedData), dev.getIotModelCode()); } redisTemplate.opsForValue().multiSet(redisValues); } diff --git a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java index db719dd4..97e12964 100644 --- a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java @@ -86,8 +86,9 @@ public class StateDataCommand implements BaseCommand { } hiSpeedData.setValues(hiSpeedValues); lowSpeedData.setValues(lowSpeedValues); - tdEngineService.updateYCHighValues(List.of(hiSpeedData), dev.getModel()); - tdEngineService.updateYCLowValues(List.of(lowSpeedData), dev.getModel()); + + tdEngineService.updateYCHighValues(List.of(hiSpeedData), dev.getIotModelCode()); + tdEngineService.updateYCLowValues(List.of(lowSpeedData),dev.getIotModelCode()); } redisTemplate.opsForValue().multiSet(redisValues); } From 2a1a768d1801d8d98e7aa2447436093a0a9587f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=B7=E6=88=90=E4=BC=9F?= Date: Tue, 31 Dec 2024 16:54:06 +0800 Subject: [PATCH 4/6] =?UTF-8?q?=E8=AE=BE=E5=A4=87=E7=BC=93=E5=AD=98?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0iotModelCode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../das/modules/cache/service/impl/EquipmentCacheImpl.java | 6 +++++- .../com/das/modules/equipment/mapper/SysIotModelMapper.java | 2 ++ das/src/main/resources/mapper/SysIotModelMapper.xml | 4 ++++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/das/src/main/java/com/das/modules/cache/service/impl/EquipmentCacheImpl.java b/das/src/main/java/com/das/modules/cache/service/impl/EquipmentCacheImpl.java index 1e0a930e..0812d2b1 100644 --- a/das/src/main/java/com/das/modules/cache/service/impl/EquipmentCacheImpl.java +++ b/das/src/main/java/com/das/modules/cache/service/impl/EquipmentCacheImpl.java @@ -8,6 +8,7 @@ import com.das.modules.equipment.mapper.SysEquipmentMapper; import com.das.modules.equipment.mapper.SysIotModelMapper; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -16,6 +17,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +@Slf4j @Service public class EquipmentCacheImpl implements EquipmentCache { @Autowired @@ -46,8 +48,10 @@ public class EquipmentCacheImpl implements EquipmentCache { deviceInfoCache.setMadeinFactory(equipment.getMadeinFactory()); deviceInfoCache.setParentDeviceId(equipment.getParentEquipmentId()); deviceInfoCache.setIotModelId(equipment.getIotModelId()); - String iotModelCode = sysIotModelMapper.getIotModel(equipment.getIotModelId()); + String iotModelCode = sysIotModelMapper.getIodModelCode(equipment.getIotModelId()); + log.debug("iotModelId: {}", equipment.getIotModelId()); if (StrUtil.isNotBlank(iotModelCode)){ + log.debug("iotModelCode: {}", iotModelCode); deviceInfoCache.setIotModelCode(iotModelCode); } //创建Code索引 diff --git a/das/src/main/java/com/das/modules/equipment/mapper/SysIotModelMapper.java b/das/src/main/java/com/das/modules/equipment/mapper/SysIotModelMapper.java index 71fad7fd..8b6950ee 100644 --- a/das/src/main/java/com/das/modules/equipment/mapper/SysIotModelMapper.java +++ b/das/src/main/java/com/das/modules/equipment/mapper/SysIotModelMapper.java @@ -31,6 +31,8 @@ public interface SysIotModelMapper extends BaseMapper { String getIotModel(Long id); + String getIodModelCode(Long id); + List getAllIotModelField(Long id); SysIotModelVo selectIotModelByCode(String code); diff --git a/das/src/main/resources/mapper/SysIotModelMapper.xml b/das/src/main/resources/mapper/SysIotModelMapper.xml index 271248f8..297189d5 100644 --- a/das/src/main/resources/mapper/SysIotModelMapper.xml +++ b/das/src/main/resources/mapper/SysIotModelMapper.xml @@ -77,6 +77,10 @@ select sim.iot_model_code from sys_iot_model sim left join sys_equipment se on sim.id = se.iot_model_id where se.id = #{id} + + From adcc19deb3a197534119c0c457c87bb46236bca1 Mon Sep 17 00:00:00 2001 From: huguanghan Date: Fri, 3 Jan 2025 09:19:08 +0800 Subject: [PATCH 5/6] =?UTF-8?q?=E8=BF=81=E7=A7=BBdeviceEvent,hisHighSpeed,?= =?UTF-8?q?hisLowSpeed=E8=87=B3=E5=AF=B9=E5=BA=94command?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../node/command/DeviceEventCommand.java | 106 +++++++++++++- .../node/command/HisHighSpeedCommand.java | 41 +++++- .../node/command/HisLowSpeedCommand.java | 41 +++++- .../node/service/NodeMessageService.java | 6 - .../service/impl/NodeMessageServiceImpl.java | 131 ------------------ 5 files changed, 182 insertions(+), 143 deletions(-) diff --git a/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java b/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java index 4a61029c..0e4223e1 100644 --- a/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java +++ b/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java @@ -1,26 +1,128 @@ package com.das.modules.node.command; +import com.baomidou.mybatisplus.core.toolkit.IdWorker; +import com.das.common.utils.AdminRedisTemplate; +import com.das.common.utils.StringUtils; +import com.das.modules.cache.domain.DeviceInfoCache; +import com.das.modules.cache.service.CacheService; +import com.das.modules.data.domain.DeviceEventInfo; +import com.das.modules.data.service.TDEngineService; +import com.das.modules.data.service.impl.DataServiceImpl; import com.das.modules.node.constant.NodeConstant; import com.das.modules.node.domain.bo.TerminalMessage; +import com.das.modules.node.domain.vo.DeviceEventVo; import com.das.modules.node.service.NodeMessageService; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + @Service(value = NodeConstant.DEVICE_EVENT) @Slf4j public class DeviceEventCommand implements BaseCommand{ @Autowired - private NodeMessageService nodeMessageService; + AdminRedisTemplate adminRedisTemplate; + + @Autowired + private DataServiceImpl dataService; + + @Autowired + TDEngineService tdEngineService; + + + @Autowired + CacheService cacheService; @Override public void doCommand(TerminalMessage data) { log.debug("收到实时[事件告警]"); try { - nodeMessageService.handleDeviceEvent(data); + handleDeviceEvent(data); } catch (Exception e) { log.error("解析数据异常", e); } } + + private void handleDeviceEvent(TerminalMessage data) { + + JsonNode dataEvent = data.getData(); + ObjectMapper objectMapper = new ObjectMapper(); + List valueList = new ArrayList<>(); + + // 使用 TypeReference 来指定转换目标类型 + List list = objectMapper.convertValue(dataEvent, new TypeReference>() { + }); + log.debug("消息data转化deviceVo,{}", list); + for (DeviceEventVo item : list) { + DeviceInfoCache deviceInfoCache = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.valueOf(item.getDeviceId())); + Integer firstTriggeredCode = adminRedisTemplate.get(String.format("RT:%s:%s", item.getDeviceId(), "FirstTriggeredCode".toLowerCase())); + DeviceEventInfo deviceEventInfo = new DeviceEventInfo(); + deviceEventInfo.setEventTime(item.getEventTime()); + deviceEventInfo.setEventId(IdWorker.getId()); + deviceEventInfo.setAttributeCode(item.getAttrCode()); + deviceEventInfo.setDeviceId(item.getDeviceId()); + deviceEventInfo.setDeviceName(deviceInfoCache.getDeviceName()); + deviceEventInfo.setDeviceCode(deviceInfoCache.getDeviceCode()); + deviceEventInfo.setFirstTriggeredCode(firstTriggeredCode); + String eventType = getEventType(item.getEventType()); + String model = dataService.deviceModelMap.get(item.getDeviceId()); + if (StringUtils.isEmpty(model)) { + log.debug("未查询到物模型code,设备id:{}", item.getDeviceId()); + } + String fieldName = dataService.fieldCodeNameMap.get(model).get(item.getAttrCode()); + if (StringUtils.isEmpty(fieldName)) { + log.debug("未查询到物模型属性code,设备id:{}", item.getDeviceId()); + } + deviceEventInfo.setEventType(item.getEventType()); + deviceEventInfo.setConfirmed(0); + if (!StringUtils.isEmpty(eventType) && eventType.equals("遥信变位")) { + String stateDesc = dataService.stateDescMap.get(model).get(item.getAttrCode()); + if (item.getAttrValue().equals(0)) { + deviceEventInfo.setEventText(item.getAttrCode() + fieldName + " 复归"); + if (StringUtils.isNotEmpty(stateDesc)) { + List descList = Arrays.stream(stateDesc.split("\\|")).toList(); + deviceEventInfo.setEventText(item.getAttrCode() + fieldName + descList.get(0)); + } + deviceEventInfo.setEventLevel(0); + } else { + deviceEventInfo.setEventText(item.getAttrCode() + fieldName + " 动作"); + if (StringUtils.isNotEmpty(stateDesc)) { + List descList = Arrays.stream(stateDesc.split("\\|")).toList(); + deviceEventInfo.setEventText(item.getAttrCode() + fieldName + descList.get(1)); + } + Integer level = dataService.eventLevelMap.get(model).get(item.getAttrCode()); + log.debug("level:{}", level); + log.debug("fieldname{}", fieldName); + deviceEventInfo.setEventLevel(level == null ? 0 : level); + } + } else { + deviceEventInfo.setEventText(item.getAttrCode() + fieldName + eventType + ",属性值为:" + item.getAttrValue() + ",越限值为:" + item.getLimitValue()); + deviceEventInfo.setEventLevel(1); + } + valueList.add(deviceEventInfo); + } + try { + tdEngineService.updateDeviceEventValues(valueList); + } catch (Exception e) { + log.error("事件信息存入Td失败,失败原因", e); + } + } + + private String getEventType(int eventType) { + return switch (eventType) { + case 0 -> "遥信变位"; + case 1 -> "越上限"; + case 2 -> "越下限"; + case 3 -> "越上上限"; + case 4 -> "越下下限"; + default -> ""; + }; + } } diff --git a/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java b/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java index 81bfa8cc..37587dcb 100644 --- a/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java +++ b/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java @@ -1,27 +1,66 @@ package com.das.modules.node.command; +import com.das.modules.data.service.TDEngineService; +import com.das.modules.equipment.mapper.SysIotModelMapper; import com.das.modules.node.constant.NodeConstant; +import com.das.modules.node.domain.bo.RTData; import com.das.modules.node.domain.bo.TerminalMessage; import com.das.modules.node.service.NodeMessageService; +import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.*; + @Service(value = NodeConstant.HIS_HIGH_SPEED_DATA) @Slf4j public class HisHighSpeedCommand implements BaseCommand { @Autowired NodeMessageService nodeMessageService; + + @Autowired + SysIotModelMapper sysIotModelMapper; + + @Autowired + TDEngineService tdEngineService; + @Override public void doCommand(TerminalMessage data) { log.debug("收到历史高频数据"); try { //analogData值只存入redis - nodeMessageService.handleHighSpeed(data); + handleHighSpeed(data); } catch (Exception e) { log.error("解析数据异常", e); } } + + private void handleHighSpeed(TerminalMessage data) { + JsonNode jsonNode = data.getData(); + String deviceId = jsonNode.get("deviceId").asText(); + JsonNode values = jsonNode.get("values"); + Map keyValueMap = new HashMap<>(); + // 根据设备ID获取对应的物模型属性 + String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong()); + + //High数据 + Iterator keysHigh = values.fieldNames(); + while (keysHigh.hasNext()) { + String fieldName = keysHigh.next(); + keyValueMap.put(fieldName, values.get(fieldName)); + + } + Long dataTime = jsonNode.get("dataTime").asLong(); + List highList = new ArrayList<>(); + RTData rtHighData = RTData.builder() + .dataTime(dataTime) + .deviceId(Long.valueOf(deviceId)) + .values(keyValueMap) + .build(); + highList.add(rtHighData); + tdEngineService.updateYCHighValues(highList, iotModelCode); + } } diff --git a/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java b/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java index 5c7911d0..289117fc 100644 --- a/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java +++ b/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java @@ -1,27 +1,62 @@ package com.das.modules.node.command; +import com.das.modules.data.service.TDEngineService; +import com.das.modules.equipment.mapper.SysIotModelMapper; import com.das.modules.node.constant.NodeConstant; +import com.das.modules.node.domain.bo.RTData; import com.das.modules.node.domain.bo.TerminalMessage; -import com.das.modules.node.service.NodeMessageService; +import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.*; + @Service(value = NodeConstant.HIS_LOW_SPEED_DATA) @Slf4j public class HisLowSpeedCommand implements BaseCommand { @Autowired - NodeMessageService nodeMessageService; + SysIotModelMapper sysIotModelMapper; + + @Autowired + TDEngineService tdEngineService; @Override public void doCommand(TerminalMessage data) { log.debug("收到[历史低频数据]"); try { //analogData值只存入redis - nodeMessageService.handleLowSpeed(data); + handleLowSpeed(data); } catch (Exception e) { log.error("解析数据异常", e); } } + + private void handleLowSpeed(TerminalMessage data) { + JsonNode jsonNode = data.getData(); + String deviceId = jsonNode.get("deviceId").asText(); + JsonNode values = jsonNode.get("values"); + Map keyValueMap = new HashMap<>(); + // 根据设备ID获取对应的物模型属性 + String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong()); + + //Low数据 + Iterator keysLow = values.fieldNames(); + while (keysLow.hasNext()) { + String fieldName = keysLow.next(); + keyValueMap.put(fieldName, values.get(fieldName)); + + } + Long dataTime = jsonNode.get("dataTime").asLong(); + List highList = new ArrayList<>(); + RTData rtHighData = RTData.builder() + .dataTime(dataTime) + .deviceId(Long.valueOf(deviceId)) + .values(keyValueMap) + .build(); + highList.add(rtHighData); + tdEngineService.updateYCLowValues(highList, iotModelCode); + } + } diff --git a/das/src/main/java/com/das/modules/node/service/NodeMessageService.java b/das/src/main/java/com/das/modules/node/service/NodeMessageService.java index 22b78ca7..73c5e1f9 100644 --- a/das/src/main/java/com/das/modules/node/service/NodeMessageService.java +++ b/das/src/main/java/com/das/modules/node/service/NodeMessageService.java @@ -14,12 +14,6 @@ public interface NodeMessageService { JsonNode sendTerminalConfig(Long nodeId); - void handleHighSpeed(TerminalMessage data); - - void handleLowSpeed(TerminalMessage data); - - void handleDeviceEvent(TerminalMessage data); - /** * 向指定采集节点发送指令(无返回值) * @param nodeId 节点ID diff --git a/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java index 3d6f73b9..17216fb3 100644 --- a/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java +++ b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java @@ -238,137 +238,6 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node } - @Override - public void handleHighSpeed(TerminalMessage data) { - JsonNode jsonNode = data.getData(); - String deviceId = jsonNode.get("deviceId").asText(); - JsonNode values = jsonNode.get("values"); - Map keyValueMap = new HashMap<>(); - // 根据设备ID获取对应的物模型属性 - String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong()); - - //High数据 - Iterator keysHigh = values.fieldNames(); - while (keysHigh.hasNext()) { - String fieldName = keysHigh.next(); - keyValueMap.put(fieldName, values.get(fieldName)); - - } - Long dataTime = jsonNode.get("dataTime").asLong(); - List highList = new ArrayList<>(); - RTData rtHighData = RTData.builder() - .dataTime(dataTime) - .deviceId(Long.valueOf(deviceId)) - .values(keyValueMap) - .build(); - highList.add(rtHighData); - tdEngineService.updateYCHighValues(highList, iotModelCode); - } - - @Override - public void handleLowSpeed(TerminalMessage data) { - JsonNode jsonNode = data.getData(); - String deviceId = jsonNode.get("deviceId").asText(); - JsonNode values = jsonNode.get("values"); - Map keyValueMap = new HashMap<>(); - // 根据设备ID获取对应的物模型属性 - String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong()); - - //Low数据 - Iterator keysLow = values.fieldNames(); - while (keysLow.hasNext()) { - String fieldName = keysLow.next(); - keyValueMap.put(fieldName, values.get(fieldName)); - - } - Long dataTime = jsonNode.get("dataTime").asLong(); - List highList = new ArrayList<>(); - RTData rtHighData = RTData.builder() - .dataTime(dataTime) - .deviceId(Long.valueOf(deviceId)) - .values(keyValueMap) - .build(); - highList.add(rtHighData); - tdEngineService.updateYCLowValues(highList, iotModelCode); - } - - @Override - public void handleDeviceEvent(TerminalMessage data) { - - JsonNode dataEvent = data.getData(); - ObjectMapper objectMapper = new ObjectMapper(); - List valueList = new ArrayList<>(); - - // 使用 TypeReference 来指定转换目标类型 - List list = objectMapper.convertValue(dataEvent, new TypeReference>() { - }); - log.debug("消息data转化deviceVo,{}", list); - for (DeviceEventVo item : list) { - DeviceInfoCache deviceInfoCache = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.valueOf(item.getDeviceId())); - Integer firstTriggeredCode = adminRedisTemplate.get(String.format("RT:%s:%s", item.getDeviceId(), "FirstTriggeredCode".toLowerCase())); - DeviceEventInfo deviceEventInfo = new DeviceEventInfo(); - deviceEventInfo.setEventTime(item.getEventTime()); - deviceEventInfo.setEventId(IdWorker.getId()); - deviceEventInfo.setAttributeCode(item.getAttrCode()); - deviceEventInfo.setDeviceId(item.getDeviceId()); - deviceEventInfo.setDeviceName(deviceInfoCache.getDeviceName()); - deviceEventInfo.setDeviceCode(deviceInfoCache.getDeviceCode()); - deviceEventInfo.setFirstTriggeredCode(firstTriggeredCode); - String eventType = getEventType(item.getEventType()); - String model = dataService.deviceModelMap.get(item.getDeviceId()); - if (StringUtils.isEmpty(model)) { - log.debug("未查询到物模型code,设备id:{}", item.getDeviceId()); - } - String fieldName = dataService.fieldCodeNameMap.get(model).get(item.getAttrCode()); - if (StringUtils.isEmpty(fieldName)) { - log.debug("未查询到物模型属性code,设备id:{}", item.getDeviceId()); - } - deviceEventInfo.setEventType(item.getEventType()); - deviceEventInfo.setConfirmed(0); - if (!StringUtils.isEmpty(eventType) && eventType.equals("遥信变位")) { - String stateDesc = dataService.stateDescMap.get(model).get(item.getAttrCode()); - if (item.getAttrValue().equals(0)) { - deviceEventInfo.setEventText(item.getAttrCode() + fieldName + " 复归"); - if (StringUtils.isNotEmpty(stateDesc)) { - List descList = Arrays.stream(stateDesc.split("\\|")).toList(); - deviceEventInfo.setEventText(item.getAttrCode() + fieldName + descList.get(0)); - } - deviceEventInfo.setEventLevel(0); - } else { - deviceEventInfo.setEventText(item.getAttrCode() + fieldName + " 动作"); - if (StringUtils.isNotEmpty(stateDesc)) { - List descList = Arrays.stream(stateDesc.split("\\|")).toList(); - deviceEventInfo.setEventText(item.getAttrCode() + fieldName + descList.get(1)); - } - Integer level = dataService.eventLevelMap.get(model).get(item.getAttrCode()); - log.debug("level:{}", level); - log.debug("fieldname{}", fieldName); - deviceEventInfo.setEventLevel(level == null ? 0 : level); - } - } else { - deviceEventInfo.setEventText(item.getAttrCode() + fieldName + eventType + ",属性值为:" + item.getAttrValue() + ",越限值为:" + item.getLimitValue()); - deviceEventInfo.setEventLevel(1); - } - valueList.add(deviceEventInfo); - } - try { - tdEngineService.updateDeviceEventValues(valueList); - } catch (Exception e) { - log.error("事件信息存入Td失败,失败原因", e); - } - } - - private String getEventType(int eventType) { - return switch (eventType) { - case 0 -> "遥信变位"; - case 1 -> "越上限"; - case 2 -> "越下限"; - case 3 -> "越上上限"; - case 4 -> "越下下限"; - default -> ""; - }; - } - @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, ""); From 8f337cb58919cabb639a752aaab9256862537f54 Mon Sep 17 00:00:00 2001 From: huguanghan Date: Fri, 3 Jan 2025 09:37:34 +0800 Subject: [PATCH 6/6] =?UTF-8?q?=E8=BF=81=E7=A7=BBdeviceEvent,hisHighSpeed,?= =?UTF-8?q?hisLowSpeed=E8=87=B3=E5=AF=B9=E5=BA=94command?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../modules/node/command/HisHighSpeedCommand.java | 13 +++++++------ .../modules/node/command/HisLowSpeedCommand.java | 13 ++++++++----- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java b/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java index 37587dcb..4229cfe6 100644 --- a/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java +++ b/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java @@ -1,7 +1,8 @@ package com.das.modules.node.command; +import com.das.modules.cache.domain.DeviceInfoCache; +import com.das.modules.cache.service.CacheService; import com.das.modules.data.service.TDEngineService; -import com.das.modules.equipment.mapper.SysIotModelMapper; import com.das.modules.node.constant.NodeConstant; import com.das.modules.node.domain.bo.RTData; import com.das.modules.node.domain.bo.TerminalMessage; @@ -21,10 +22,10 @@ public class HisHighSpeedCommand implements BaseCommand { NodeMessageService nodeMessageService; @Autowired - SysIotModelMapper sysIotModelMapper; + TDEngineService tdEngineService; @Autowired - TDEngineService tdEngineService; + CacheService cacheService; @Override public void doCommand(TerminalMessage data) { @@ -43,8 +44,8 @@ public class HisHighSpeedCommand implements BaseCommand { String deviceId = jsonNode.get("deviceId").asText(); JsonNode values = jsonNode.get("values"); Map keyValueMap = new HashMap<>(); - // 根据设备ID获取对应的物模型属性 - String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong()); + // 根据设备ID获取对应的物模型code + DeviceInfoCache dev = cacheService.getEquipmentCache().getDeviceInfoCacheById(jsonNode.get("deviceId").asLong()); //High数据 Iterator keysHigh = values.fieldNames(); @@ -61,6 +62,6 @@ public class HisHighSpeedCommand implements BaseCommand { .values(keyValueMap) .build(); highList.add(rtHighData); - tdEngineService.updateYCHighValues(highList, iotModelCode); + tdEngineService.updateYCHighValues(highList, dev.getIotModelCode()); } } diff --git a/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java b/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java index 289117fc..d21909b0 100644 --- a/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java +++ b/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java @@ -1,5 +1,7 @@ package com.das.modules.node.command; +import com.das.modules.cache.domain.DeviceInfoCache; +import com.das.modules.cache.service.CacheService; import com.das.modules.data.service.TDEngineService; import com.das.modules.equipment.mapper.SysIotModelMapper; import com.das.modules.node.constant.NodeConstant; @@ -17,10 +19,11 @@ import java.util.*; public class HisLowSpeedCommand implements BaseCommand { @Autowired - SysIotModelMapper sysIotModelMapper; + TDEngineService tdEngineService; + @Autowired - TDEngineService tdEngineService; + CacheService cacheService; @Override public void doCommand(TerminalMessage data) { log.debug("收到[历史低频数据]"); @@ -38,8 +41,8 @@ public class HisLowSpeedCommand implements BaseCommand { String deviceId = jsonNode.get("deviceId").asText(); JsonNode values = jsonNode.get("values"); Map keyValueMap = new HashMap<>(); - // 根据设备ID获取对应的物模型属性 - String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong()); + // 根据设备ID获取对应的物模型code + DeviceInfoCache dev = cacheService.getEquipmentCache().getDeviceInfoCacheById(jsonNode.get("deviceId").asLong()); //Low数据 Iterator keysLow = values.fieldNames(); @@ -56,7 +59,7 @@ public class HisLowSpeedCommand implements BaseCommand { .values(keyValueMap) .build(); highList.add(rtHighData); - tdEngineService.updateYCLowValues(highList, iotModelCode); + tdEngineService.updateYCLowValues(highList, dev.getIotModelCode()); } }