diff --git a/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java b/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java index 4a61029c..0e4223e1 100644 --- a/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java +++ b/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java @@ -1,26 +1,128 @@ package com.das.modules.node.command; +import com.baomidou.mybatisplus.core.toolkit.IdWorker; +import com.das.common.utils.AdminRedisTemplate; +import com.das.common.utils.StringUtils; +import com.das.modules.cache.domain.DeviceInfoCache; +import com.das.modules.cache.service.CacheService; +import com.das.modules.data.domain.DeviceEventInfo; +import com.das.modules.data.service.TDEngineService; +import com.das.modules.data.service.impl.DataServiceImpl; import com.das.modules.node.constant.NodeConstant; import com.das.modules.node.domain.bo.TerminalMessage; +import com.das.modules.node.domain.vo.DeviceEventVo; import com.das.modules.node.service.NodeMessageService; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + @Service(value = NodeConstant.DEVICE_EVENT) @Slf4j public class DeviceEventCommand implements BaseCommand{ @Autowired - private NodeMessageService nodeMessageService; + AdminRedisTemplate adminRedisTemplate; + + @Autowired + private DataServiceImpl dataService; + + @Autowired + TDEngineService tdEngineService; + + + @Autowired + CacheService cacheService; @Override public void doCommand(TerminalMessage data) { log.debug("收到实时[事件告警]"); try { - nodeMessageService.handleDeviceEvent(data); + handleDeviceEvent(data); } catch (Exception e) { log.error("解析数据异常", e); } } + + private void handleDeviceEvent(TerminalMessage data) { + + JsonNode dataEvent = data.getData(); + ObjectMapper objectMapper = new ObjectMapper(); + List valueList = new ArrayList<>(); + + // 使用 TypeReference 来指定转换目标类型 + List list = objectMapper.convertValue(dataEvent, new TypeReference>() { + }); + log.debug("消息data转化deviceVo,{}", list); + for (DeviceEventVo item : list) { + DeviceInfoCache deviceInfoCache = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.valueOf(item.getDeviceId())); + Integer firstTriggeredCode = adminRedisTemplate.get(String.format("RT:%s:%s", item.getDeviceId(), "FirstTriggeredCode".toLowerCase())); + DeviceEventInfo deviceEventInfo = new DeviceEventInfo(); + deviceEventInfo.setEventTime(item.getEventTime()); + deviceEventInfo.setEventId(IdWorker.getId()); + deviceEventInfo.setAttributeCode(item.getAttrCode()); + deviceEventInfo.setDeviceId(item.getDeviceId()); + deviceEventInfo.setDeviceName(deviceInfoCache.getDeviceName()); + deviceEventInfo.setDeviceCode(deviceInfoCache.getDeviceCode()); + deviceEventInfo.setFirstTriggeredCode(firstTriggeredCode); + String eventType = getEventType(item.getEventType()); + String model = dataService.deviceModelMap.get(item.getDeviceId()); + if (StringUtils.isEmpty(model)) { + log.debug("未查询到物模型code,设备id:{}", item.getDeviceId()); + } + String fieldName = dataService.fieldCodeNameMap.get(model).get(item.getAttrCode()); + if (StringUtils.isEmpty(fieldName)) { + log.debug("未查询到物模型属性code,设备id:{}", item.getDeviceId()); + } + deviceEventInfo.setEventType(item.getEventType()); + deviceEventInfo.setConfirmed(0); + if (!StringUtils.isEmpty(eventType) && eventType.equals("遥信变位")) { + String stateDesc = dataService.stateDescMap.get(model).get(item.getAttrCode()); + if (item.getAttrValue().equals(0)) { + deviceEventInfo.setEventText(item.getAttrCode() + fieldName + " 复归"); + if (StringUtils.isNotEmpty(stateDesc)) { + List descList = Arrays.stream(stateDesc.split("\\|")).toList(); + deviceEventInfo.setEventText(item.getAttrCode() + fieldName + descList.get(0)); + } + deviceEventInfo.setEventLevel(0); + } else { + deviceEventInfo.setEventText(item.getAttrCode() + fieldName + " 动作"); + if (StringUtils.isNotEmpty(stateDesc)) { + List descList = Arrays.stream(stateDesc.split("\\|")).toList(); + deviceEventInfo.setEventText(item.getAttrCode() + fieldName + descList.get(1)); + } + Integer level = dataService.eventLevelMap.get(model).get(item.getAttrCode()); + log.debug("level:{}", level); + log.debug("fieldname{}", fieldName); + deviceEventInfo.setEventLevel(level == null ? 0 : level); + } + } else { + deviceEventInfo.setEventText(item.getAttrCode() + fieldName + eventType + ",属性值为:" + item.getAttrValue() + ",越限值为:" + item.getLimitValue()); + deviceEventInfo.setEventLevel(1); + } + valueList.add(deviceEventInfo); + } + try { + tdEngineService.updateDeviceEventValues(valueList); + } catch (Exception e) { + log.error("事件信息存入Td失败,失败原因", e); + } + } + + private String getEventType(int eventType) { + return switch (eventType) { + case 0 -> "遥信变位"; + case 1 -> "越上限"; + case 2 -> "越下限"; + case 3 -> "越上上限"; + case 4 -> "越下下限"; + default -> ""; + }; + } } diff --git a/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java b/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java index 81bfa8cc..37587dcb 100644 --- a/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java +++ b/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java @@ -1,27 +1,66 @@ package com.das.modules.node.command; +import com.das.modules.data.service.TDEngineService; +import com.das.modules.equipment.mapper.SysIotModelMapper; import com.das.modules.node.constant.NodeConstant; +import com.das.modules.node.domain.bo.RTData; import com.das.modules.node.domain.bo.TerminalMessage; import com.das.modules.node.service.NodeMessageService; +import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.*; + @Service(value = NodeConstant.HIS_HIGH_SPEED_DATA) @Slf4j public class HisHighSpeedCommand implements BaseCommand { @Autowired NodeMessageService nodeMessageService; + + @Autowired + SysIotModelMapper sysIotModelMapper; + + @Autowired + TDEngineService tdEngineService; + @Override public void doCommand(TerminalMessage data) { log.debug("收到历史高频数据"); try { //analogData值只存入redis - nodeMessageService.handleHighSpeed(data); + handleHighSpeed(data); } catch (Exception e) { log.error("解析数据异常", e); } } + + private void handleHighSpeed(TerminalMessage data) { + JsonNode jsonNode = data.getData(); + String deviceId = jsonNode.get("deviceId").asText(); + JsonNode values = jsonNode.get("values"); + Map keyValueMap = new HashMap<>(); + // 根据设备ID获取对应的物模型属性 + String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong()); + + //High数据 + Iterator keysHigh = values.fieldNames(); + while (keysHigh.hasNext()) { + String fieldName = keysHigh.next(); + keyValueMap.put(fieldName, values.get(fieldName)); + + } + Long dataTime = jsonNode.get("dataTime").asLong(); + List highList = new ArrayList<>(); + RTData rtHighData = RTData.builder() + .dataTime(dataTime) + .deviceId(Long.valueOf(deviceId)) + .values(keyValueMap) + .build(); + highList.add(rtHighData); + tdEngineService.updateYCHighValues(highList, iotModelCode); + } } diff --git a/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java b/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java index 5c7911d0..289117fc 100644 --- a/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java +++ b/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java @@ -1,27 +1,62 @@ package com.das.modules.node.command; +import com.das.modules.data.service.TDEngineService; +import com.das.modules.equipment.mapper.SysIotModelMapper; import com.das.modules.node.constant.NodeConstant; +import com.das.modules.node.domain.bo.RTData; import com.das.modules.node.domain.bo.TerminalMessage; -import com.das.modules.node.service.NodeMessageService; +import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.*; + @Service(value = NodeConstant.HIS_LOW_SPEED_DATA) @Slf4j public class HisLowSpeedCommand implements BaseCommand { @Autowired - NodeMessageService nodeMessageService; + SysIotModelMapper sysIotModelMapper; + + @Autowired + TDEngineService tdEngineService; @Override public void doCommand(TerminalMessage data) { log.debug("收到[历史低频数据]"); try { //analogData值只存入redis - nodeMessageService.handleLowSpeed(data); + handleLowSpeed(data); } catch (Exception e) { log.error("解析数据异常", e); } } + + private void handleLowSpeed(TerminalMessage data) { + JsonNode jsonNode = data.getData(); + String deviceId = jsonNode.get("deviceId").asText(); + JsonNode values = jsonNode.get("values"); + Map keyValueMap = new HashMap<>(); + // 根据设备ID获取对应的物模型属性 + String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong()); + + //Low数据 + Iterator keysLow = values.fieldNames(); + while (keysLow.hasNext()) { + String fieldName = keysLow.next(); + keyValueMap.put(fieldName, values.get(fieldName)); + + } + Long dataTime = jsonNode.get("dataTime").asLong(); + List highList = new ArrayList<>(); + RTData rtHighData = RTData.builder() + .dataTime(dataTime) + .deviceId(Long.valueOf(deviceId)) + .values(keyValueMap) + .build(); + highList.add(rtHighData); + tdEngineService.updateYCLowValues(highList, iotModelCode); + } + } diff --git a/das/src/main/java/com/das/modules/node/service/NodeMessageService.java b/das/src/main/java/com/das/modules/node/service/NodeMessageService.java index 22b78ca7..73c5e1f9 100644 --- a/das/src/main/java/com/das/modules/node/service/NodeMessageService.java +++ b/das/src/main/java/com/das/modules/node/service/NodeMessageService.java @@ -14,12 +14,6 @@ public interface NodeMessageService { JsonNode sendTerminalConfig(Long nodeId); - void handleHighSpeed(TerminalMessage data); - - void handleLowSpeed(TerminalMessage data); - - void handleDeviceEvent(TerminalMessage data); - /** * 向指定采集节点发送指令(无返回值) * @param nodeId 节点ID diff --git a/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java index 3d6f73b9..17216fb3 100644 --- a/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java +++ b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java @@ -238,137 +238,6 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node } - @Override - public void handleHighSpeed(TerminalMessage data) { - JsonNode jsonNode = data.getData(); - String deviceId = jsonNode.get("deviceId").asText(); - JsonNode values = jsonNode.get("values"); - Map keyValueMap = new HashMap<>(); - // 根据设备ID获取对应的物模型属性 - String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong()); - - //High数据 - Iterator keysHigh = values.fieldNames(); - while (keysHigh.hasNext()) { - String fieldName = keysHigh.next(); - keyValueMap.put(fieldName, values.get(fieldName)); - - } - Long dataTime = jsonNode.get("dataTime").asLong(); - List highList = new ArrayList<>(); - RTData rtHighData = RTData.builder() - .dataTime(dataTime) - .deviceId(Long.valueOf(deviceId)) - .values(keyValueMap) - .build(); - highList.add(rtHighData); - tdEngineService.updateYCHighValues(highList, iotModelCode); - } - - @Override - public void handleLowSpeed(TerminalMessage data) { - JsonNode jsonNode = data.getData(); - String deviceId = jsonNode.get("deviceId").asText(); - JsonNode values = jsonNode.get("values"); - Map keyValueMap = new HashMap<>(); - // 根据设备ID获取对应的物模型属性 - String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong()); - - //Low数据 - Iterator keysLow = values.fieldNames(); - while (keysLow.hasNext()) { - String fieldName = keysLow.next(); - keyValueMap.put(fieldName, values.get(fieldName)); - - } - Long dataTime = jsonNode.get("dataTime").asLong(); - List highList = new ArrayList<>(); - RTData rtHighData = RTData.builder() - .dataTime(dataTime) - .deviceId(Long.valueOf(deviceId)) - .values(keyValueMap) - .build(); - highList.add(rtHighData); - tdEngineService.updateYCLowValues(highList, iotModelCode); - } - - @Override - public void handleDeviceEvent(TerminalMessage data) { - - JsonNode dataEvent = data.getData(); - ObjectMapper objectMapper = new ObjectMapper(); - List valueList = new ArrayList<>(); - - // 使用 TypeReference 来指定转换目标类型 - List list = objectMapper.convertValue(dataEvent, new TypeReference>() { - }); - log.debug("消息data转化deviceVo,{}", list); - for (DeviceEventVo item : list) { - DeviceInfoCache deviceInfoCache = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.valueOf(item.getDeviceId())); - Integer firstTriggeredCode = adminRedisTemplate.get(String.format("RT:%s:%s", item.getDeviceId(), "FirstTriggeredCode".toLowerCase())); - DeviceEventInfo deviceEventInfo = new DeviceEventInfo(); - deviceEventInfo.setEventTime(item.getEventTime()); - deviceEventInfo.setEventId(IdWorker.getId()); - deviceEventInfo.setAttributeCode(item.getAttrCode()); - deviceEventInfo.setDeviceId(item.getDeviceId()); - deviceEventInfo.setDeviceName(deviceInfoCache.getDeviceName()); - deviceEventInfo.setDeviceCode(deviceInfoCache.getDeviceCode()); - deviceEventInfo.setFirstTriggeredCode(firstTriggeredCode); - String eventType = getEventType(item.getEventType()); - String model = dataService.deviceModelMap.get(item.getDeviceId()); - if (StringUtils.isEmpty(model)) { - log.debug("未查询到物模型code,设备id:{}", item.getDeviceId()); - } - String fieldName = dataService.fieldCodeNameMap.get(model).get(item.getAttrCode()); - if (StringUtils.isEmpty(fieldName)) { - log.debug("未查询到物模型属性code,设备id:{}", item.getDeviceId()); - } - deviceEventInfo.setEventType(item.getEventType()); - deviceEventInfo.setConfirmed(0); - if (!StringUtils.isEmpty(eventType) && eventType.equals("遥信变位")) { - String stateDesc = dataService.stateDescMap.get(model).get(item.getAttrCode()); - if (item.getAttrValue().equals(0)) { - deviceEventInfo.setEventText(item.getAttrCode() + fieldName + " 复归"); - if (StringUtils.isNotEmpty(stateDesc)) { - List descList = Arrays.stream(stateDesc.split("\\|")).toList(); - deviceEventInfo.setEventText(item.getAttrCode() + fieldName + descList.get(0)); - } - deviceEventInfo.setEventLevel(0); - } else { - deviceEventInfo.setEventText(item.getAttrCode() + fieldName + " 动作"); - if (StringUtils.isNotEmpty(stateDesc)) { - List descList = Arrays.stream(stateDesc.split("\\|")).toList(); - deviceEventInfo.setEventText(item.getAttrCode() + fieldName + descList.get(1)); - } - Integer level = dataService.eventLevelMap.get(model).get(item.getAttrCode()); - log.debug("level:{}", level); - log.debug("fieldname{}", fieldName); - deviceEventInfo.setEventLevel(level == null ? 0 : level); - } - } else { - deviceEventInfo.setEventText(item.getAttrCode() + fieldName + eventType + ",属性值为:" + item.getAttrValue() + ",越限值为:" + item.getLimitValue()); - deviceEventInfo.setEventLevel(1); - } - valueList.add(deviceEventInfo); - } - try { - tdEngineService.updateDeviceEventValues(valueList); - } catch (Exception e) { - log.error("事件信息存入Td失败,失败原因", e); - } - } - - private String getEventType(int eventType) { - return switch (eventType) { - case 0 -> "遥信变位"; - case 1 -> "越上限"; - case 2 -> "越下限"; - case 3 -> "越上上限"; - case 4 -> "越下下限"; - default -> ""; - }; - } - @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, "");