迁移deviceEvent,hisHighSpeed,hisLowSpeed至对应command

This commit is contained in:
huguanghan 2025-01-03 09:19:08 +08:00
parent 2a1a768d18
commit adcc19deb3
5 changed files with 182 additions and 143 deletions

View File

@ -1,26 +1,128 @@
package com.das.modules.node.command; package com.das.modules.node.command;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.das.common.utils.AdminRedisTemplate;
import com.das.common.utils.StringUtils;
import com.das.modules.cache.domain.DeviceInfoCache;
import com.das.modules.cache.service.CacheService;
import com.das.modules.data.domain.DeviceEventInfo;
import com.das.modules.data.service.TDEngineService;
import com.das.modules.data.service.impl.DataServiceImpl;
import com.das.modules.node.constant.NodeConstant; import com.das.modules.node.constant.NodeConstant;
import com.das.modules.node.domain.bo.TerminalMessage; import com.das.modules.node.domain.bo.TerminalMessage;
import com.das.modules.node.domain.vo.DeviceEventVo;
import com.das.modules.node.service.NodeMessageService; import com.das.modules.node.service.NodeMessageService;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@Service(value = NodeConstant.DEVICE_EVENT) @Service(value = NodeConstant.DEVICE_EVENT)
@Slf4j @Slf4j
public class DeviceEventCommand implements BaseCommand{ public class DeviceEventCommand implements BaseCommand{
@Autowired @Autowired
private NodeMessageService nodeMessageService; AdminRedisTemplate adminRedisTemplate;
@Autowired
private DataServiceImpl dataService;
@Autowired
TDEngineService tdEngineService;
@Autowired
CacheService cacheService;
@Override @Override
public void doCommand(TerminalMessage data) { public void doCommand(TerminalMessage data) {
log.debug("收到实时[事件告警]"); log.debug("收到实时[事件告警]");
try { try {
nodeMessageService.handleDeviceEvent(data); handleDeviceEvent(data);
} catch (Exception e) { } catch (Exception e) {
log.error("解析数据异常", e); log.error("解析数据异常", e);
} }
} }
private void handleDeviceEvent(TerminalMessage data) {
JsonNode dataEvent = data.getData();
ObjectMapper objectMapper = new ObjectMapper();
List<DeviceEventInfo> valueList = new ArrayList<>();
// 使用 TypeReference 来指定转换目标类型
List<DeviceEventVo> list = objectMapper.convertValue(dataEvent, new TypeReference<List<DeviceEventVo>>() {
});
log.debug("消息data转化deviceVo,{}", list);
for (DeviceEventVo item : list) {
DeviceInfoCache deviceInfoCache = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.valueOf(item.getDeviceId()));
Integer firstTriggeredCode = adminRedisTemplate.get(String.format("RT:%s:%s", item.getDeviceId(), "FirstTriggeredCode".toLowerCase()));
DeviceEventInfo deviceEventInfo = new DeviceEventInfo();
deviceEventInfo.setEventTime(item.getEventTime());
deviceEventInfo.setEventId(IdWorker.getId());
deviceEventInfo.setAttributeCode(item.getAttrCode());
deviceEventInfo.setDeviceId(item.getDeviceId());
deviceEventInfo.setDeviceName(deviceInfoCache.getDeviceName());
deviceEventInfo.setDeviceCode(deviceInfoCache.getDeviceCode());
deviceEventInfo.setFirstTriggeredCode(firstTriggeredCode);
String eventType = getEventType(item.getEventType());
String model = dataService.deviceModelMap.get(item.getDeviceId());
if (StringUtils.isEmpty(model)) {
log.debug("未查询到物模型code设备id{}", item.getDeviceId());
}
String fieldName = dataService.fieldCodeNameMap.get(model).get(item.getAttrCode());
if (StringUtils.isEmpty(fieldName)) {
log.debug("未查询到物模型属性code设备id{}", item.getDeviceId());
}
deviceEventInfo.setEventType(item.getEventType());
deviceEventInfo.setConfirmed(0);
if (!StringUtils.isEmpty(eventType) && eventType.equals("遥信变位")) {
String stateDesc = dataService.stateDescMap.get(model).get(item.getAttrCode());
if (item.getAttrValue().equals(0)) {
deviceEventInfo.setEventText(item.getAttrCode() + fieldName + " 复归");
if (StringUtils.isNotEmpty(stateDesc)) {
List<String> descList = Arrays.stream(stateDesc.split("\\|")).toList();
deviceEventInfo.setEventText(item.getAttrCode() + fieldName + descList.get(0));
}
deviceEventInfo.setEventLevel(0);
} else {
deviceEventInfo.setEventText(item.getAttrCode() + fieldName + " 动作");
if (StringUtils.isNotEmpty(stateDesc)) {
List<String> descList = Arrays.stream(stateDesc.split("\\|")).toList();
deviceEventInfo.setEventText(item.getAttrCode() + fieldName + descList.get(1));
}
Integer level = dataService.eventLevelMap.get(model).get(item.getAttrCode());
log.debug("level:{}", level);
log.debug("fieldname{}", fieldName);
deviceEventInfo.setEventLevel(level == null ? 0 : level);
}
} else {
deviceEventInfo.setEventText(item.getAttrCode() + fieldName + eventType + ",属性值为:" + item.getAttrValue() + ",越限值为:" + item.getLimitValue());
deviceEventInfo.setEventLevel(1);
}
valueList.add(deviceEventInfo);
}
try {
tdEngineService.updateDeviceEventValues(valueList);
} catch (Exception e) {
log.error("事件信息存入Td失败,失败原因", e);
}
}
private String getEventType(int eventType) {
return switch (eventType) {
case 0 -> "遥信变位";
case 1 -> "越上限";
case 2 -> "越下限";
case 3 -> "越上上限";
case 4 -> "越下下限";
default -> "";
};
}
} }

View File

@ -1,27 +1,66 @@
package com.das.modules.node.command; package com.das.modules.node.command;
import com.das.modules.data.service.TDEngineService;
import com.das.modules.equipment.mapper.SysIotModelMapper;
import com.das.modules.node.constant.NodeConstant; import com.das.modules.node.constant.NodeConstant;
import com.das.modules.node.domain.bo.RTData;
import com.das.modules.node.domain.bo.TerminalMessage; import com.das.modules.node.domain.bo.TerminalMessage;
import com.das.modules.node.service.NodeMessageService; import com.das.modules.node.service.NodeMessageService;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.*;
@Service(value = NodeConstant.HIS_HIGH_SPEED_DATA) @Service(value = NodeConstant.HIS_HIGH_SPEED_DATA)
@Slf4j @Slf4j
public class HisHighSpeedCommand implements BaseCommand { public class HisHighSpeedCommand implements BaseCommand {
@Autowired @Autowired
NodeMessageService nodeMessageService; NodeMessageService nodeMessageService;
@Autowired
SysIotModelMapper sysIotModelMapper;
@Autowired
TDEngineService tdEngineService;
@Override @Override
public void doCommand(TerminalMessage data) { public void doCommand(TerminalMessage data) {
log.debug("收到历史高频数据"); log.debug("收到历史高频数据");
try { try {
//analogData值只存入redis //analogData值只存入redis
nodeMessageService.handleHighSpeed(data); handleHighSpeed(data);
} catch (Exception e) { } catch (Exception e) {
log.error("解析数据异常", e); log.error("解析数据异常", e);
} }
} }
private void handleHighSpeed(TerminalMessage data) {
JsonNode jsonNode = data.getData();
String deviceId = jsonNode.get("deviceId").asText();
JsonNode values = jsonNode.get("values");
Map<String, Object> keyValueMap = new HashMap<>();
// 根据设备ID获取对应的物模型属性
String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong());
//High数据
Iterator<String> keysHigh = values.fieldNames();
while (keysHigh.hasNext()) {
String fieldName = keysHigh.next();
keyValueMap.put(fieldName, values.get(fieldName));
}
Long dataTime = jsonNode.get("dataTime").asLong();
List<RTData> highList = new ArrayList<>();
RTData rtHighData = RTData.builder()
.dataTime(dataTime)
.deviceId(Long.valueOf(deviceId))
.values(keyValueMap)
.build();
highList.add(rtHighData);
tdEngineService.updateYCHighValues(highList, iotModelCode);
}
} }

View File

@ -1,27 +1,62 @@
package com.das.modules.node.command; package com.das.modules.node.command;
import com.das.modules.data.service.TDEngineService;
import com.das.modules.equipment.mapper.SysIotModelMapper;
import com.das.modules.node.constant.NodeConstant; import com.das.modules.node.constant.NodeConstant;
import com.das.modules.node.domain.bo.RTData;
import com.das.modules.node.domain.bo.TerminalMessage; import com.das.modules.node.domain.bo.TerminalMessage;
import com.das.modules.node.service.NodeMessageService; import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.*;
@Service(value = NodeConstant.HIS_LOW_SPEED_DATA) @Service(value = NodeConstant.HIS_LOW_SPEED_DATA)
@Slf4j @Slf4j
public class HisLowSpeedCommand implements BaseCommand { public class HisLowSpeedCommand implements BaseCommand {
@Autowired @Autowired
NodeMessageService nodeMessageService; SysIotModelMapper sysIotModelMapper;
@Autowired
TDEngineService tdEngineService;
@Override @Override
public void doCommand(TerminalMessage data) { public void doCommand(TerminalMessage data) {
log.debug("收到[历史低频数据]"); log.debug("收到[历史低频数据]");
try { try {
//analogData值只存入redis //analogData值只存入redis
nodeMessageService.handleLowSpeed(data); handleLowSpeed(data);
} catch (Exception e) { } catch (Exception e) {
log.error("解析数据异常", e); log.error("解析数据异常", e);
} }
} }
private void handleLowSpeed(TerminalMessage data) {
JsonNode jsonNode = data.getData();
String deviceId = jsonNode.get("deviceId").asText();
JsonNode values = jsonNode.get("values");
Map<String, Object> keyValueMap = new HashMap<>();
// 根据设备ID获取对应的物模型属性
String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong());
//Low数据
Iterator<String> keysLow = values.fieldNames();
while (keysLow.hasNext()) {
String fieldName = keysLow.next();
keyValueMap.put(fieldName, values.get(fieldName));
}
Long dataTime = jsonNode.get("dataTime").asLong();
List<RTData> highList = new ArrayList<>();
RTData rtHighData = RTData.builder()
.dataTime(dataTime)
.deviceId(Long.valueOf(deviceId))
.values(keyValueMap)
.build();
highList.add(rtHighData);
tdEngineService.updateYCLowValues(highList, iotModelCode);
}
} }

View File

@ -14,12 +14,6 @@ public interface NodeMessageService {
JsonNode sendTerminalConfig(Long nodeId); JsonNode sendTerminalConfig(Long nodeId);
void handleHighSpeed(TerminalMessage data);
void handleLowSpeed(TerminalMessage data);
void handleDeviceEvent(TerminalMessage data);
/** /**
* 向指定采集节点发送指令(无返回值) * 向指定采集节点发送指令(无返回值)
* @param nodeId 节点ID * @param nodeId 节点ID

View File

@ -238,137 +238,6 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
} }
@Override
public void handleHighSpeed(TerminalMessage data) {
JsonNode jsonNode = data.getData();
String deviceId = jsonNode.get("deviceId").asText();
JsonNode values = jsonNode.get("values");
Map<String, Object> keyValueMap = new HashMap<>();
// 根据设备ID获取对应的物模型属性
String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong());
//High数据
Iterator<String> keysHigh = values.fieldNames();
while (keysHigh.hasNext()) {
String fieldName = keysHigh.next();
keyValueMap.put(fieldName, values.get(fieldName));
}
Long dataTime = jsonNode.get("dataTime").asLong();
List<RTData> highList = new ArrayList<>();
RTData rtHighData = RTData.builder()
.dataTime(dataTime)
.deviceId(Long.valueOf(deviceId))
.values(keyValueMap)
.build();
highList.add(rtHighData);
tdEngineService.updateYCHighValues(highList, iotModelCode);
}
@Override
public void handleLowSpeed(TerminalMessage data) {
JsonNode jsonNode = data.getData();
String deviceId = jsonNode.get("deviceId").asText();
JsonNode values = jsonNode.get("values");
Map<String, Object> keyValueMap = new HashMap<>();
// 根据设备ID获取对应的物模型属性
String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong());
//Low数据
Iterator<String> keysLow = values.fieldNames();
while (keysLow.hasNext()) {
String fieldName = keysLow.next();
keyValueMap.put(fieldName, values.get(fieldName));
}
Long dataTime = jsonNode.get("dataTime").asLong();
List<RTData> highList = new ArrayList<>();
RTData rtHighData = RTData.builder()
.dataTime(dataTime)
.deviceId(Long.valueOf(deviceId))
.values(keyValueMap)
.build();
highList.add(rtHighData);
tdEngineService.updateYCLowValues(highList, iotModelCode);
}
@Override
public void handleDeviceEvent(TerminalMessage data) {
JsonNode dataEvent = data.getData();
ObjectMapper objectMapper = new ObjectMapper();
List<DeviceEventInfo> valueList = new ArrayList<>();
// 使用 TypeReference 来指定转换目标类型
List<DeviceEventVo> list = objectMapper.convertValue(dataEvent, new TypeReference<List<DeviceEventVo>>() {
});
log.debug("消息data转化deviceVo,{}", list);
for (DeviceEventVo item : list) {
DeviceInfoCache deviceInfoCache = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.valueOf(item.getDeviceId()));
Integer firstTriggeredCode = adminRedisTemplate.get(String.format("RT:%s:%s", item.getDeviceId(), "FirstTriggeredCode".toLowerCase()));
DeviceEventInfo deviceEventInfo = new DeviceEventInfo();
deviceEventInfo.setEventTime(item.getEventTime());
deviceEventInfo.setEventId(IdWorker.getId());
deviceEventInfo.setAttributeCode(item.getAttrCode());
deviceEventInfo.setDeviceId(item.getDeviceId());
deviceEventInfo.setDeviceName(deviceInfoCache.getDeviceName());
deviceEventInfo.setDeviceCode(deviceInfoCache.getDeviceCode());
deviceEventInfo.setFirstTriggeredCode(firstTriggeredCode);
String eventType = getEventType(item.getEventType());
String model = dataService.deviceModelMap.get(item.getDeviceId());
if (StringUtils.isEmpty(model)) {
log.debug("未查询到物模型code设备id{}", item.getDeviceId());
}
String fieldName = dataService.fieldCodeNameMap.get(model).get(item.getAttrCode());
if (StringUtils.isEmpty(fieldName)) {
log.debug("未查询到物模型属性code设备id{}", item.getDeviceId());
}
deviceEventInfo.setEventType(item.getEventType());
deviceEventInfo.setConfirmed(0);
if (!StringUtils.isEmpty(eventType) && eventType.equals("遥信变位")) {
String stateDesc = dataService.stateDescMap.get(model).get(item.getAttrCode());
if (item.getAttrValue().equals(0)) {
deviceEventInfo.setEventText(item.getAttrCode() + fieldName + " 复归");
if (StringUtils.isNotEmpty(stateDesc)) {
List<String> descList = Arrays.stream(stateDesc.split("\\|")).toList();
deviceEventInfo.setEventText(item.getAttrCode() + fieldName + descList.get(0));
}
deviceEventInfo.setEventLevel(0);
} else {
deviceEventInfo.setEventText(item.getAttrCode() + fieldName + " 动作");
if (StringUtils.isNotEmpty(stateDesc)) {
List<String> descList = Arrays.stream(stateDesc.split("\\|")).toList();
deviceEventInfo.setEventText(item.getAttrCode() + fieldName + descList.get(1));
}
Integer level = dataService.eventLevelMap.get(model).get(item.getAttrCode());
log.debug("level:{}", level);
log.debug("fieldname{}", fieldName);
deviceEventInfo.setEventLevel(level == null ? 0 : level);
}
} else {
deviceEventInfo.setEventText(item.getAttrCode() + fieldName + eventType + ",属性值为:" + item.getAttrValue() + ",越限值为:" + item.getLimitValue());
deviceEventInfo.setEventLevel(1);
}
valueList.add(deviceEventInfo);
}
try {
tdEngineService.updateDeviceEventValues(valueList);
} catch (Exception e) {
log.error("事件信息存入Td失败,失败原因", e);
}
}
private String getEventType(int eventType) {
return switch (eventType) {
case 0 -> "遥信变位";
case 1 -> "越上限";
case 2 -> "越下限";
case 3 -> "越上上限";
case 4 -> "越下下限";
default -> "";
};
}
@Override @Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception { public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, ""); String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, "");