模拟量数据上报更新tdengine

This commit is contained in:
huguanghan 2024-11-18 11:50:06 +08:00
parent 5c3b2e95a5
commit 364500bcb8

View File

@ -87,7 +87,6 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
private DataServiceImpl dataService; private DataServiceImpl dataService;
@PostConstruct @PostConstruct
public void init() { public void init() {
//初始化高性能队列 //初始化高性能队列
@ -110,6 +109,7 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
disruptor.shutdown(); disruptor.shutdown();
} }
} }
public void pushMessage(TerminalMessage msg) { public void pushMessage(TerminalMessage msg) {
RingBuffer<TerminalMessage> ringBuffer = disruptor.getRingBuffer(); RingBuffer<TerminalMessage> ringBuffer = disruptor.getRingBuffer();
if (ringBuffer == null) { if (ringBuffer == null) {
@ -233,7 +233,13 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
JsonNode jsonNode = data.getData(); JsonNode jsonNode = data.getData();
String deviceId = jsonNode.get("deviceId").asText(); String deviceId = jsonNode.get("deviceId").asText();
JsonNode values = jsonNode.get("values"); JsonNode values = jsonNode.get("values");
Long dataTime = jsonNode.get("dataTime").asLong();
Map<String, Object> keyValueMap = new HashMap<>(); Map<String, Object> keyValueMap = new HashMap<>();
DeviceInfoCache deviceInfoCacheById = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.valueOf(deviceId));
Set<String> highKey = dataService.highIotFieldMap.get(deviceInfoCacheById.getDeviceCode()).keySet();
Set<String> lowKey = dataService.lowIotFieldMap.get(deviceInfoCacheById.getDeviceCode()).keySet();
Map<String, Object> highSpeedValueMap = new HashMap<>();
Map<String, Object> lowSpeedValueMap = new HashMap<>();
//数据入redis //数据入redis
Iterator<String> keysHigh = values.fieldNames(); Iterator<String> keysHigh = values.fieldNames();
@ -241,8 +247,30 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
String fieldName = keysHigh.next(); String fieldName = keysHigh.next();
String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase()); String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase());
keyValueMap.put(key, values.get(fieldName)); keyValueMap.put(key, values.get(fieldName));
if (highKey.contains(fieldName)){
highSpeedValueMap.put(fieldName,values.get(fieldName));
}
if (lowKey.contains(fieldName)){
lowSpeedValueMap.put(fieldName,values.get(fieldName));
}
} }
adminRedisTemplate.mSet(keyValueMap); adminRedisTemplate.mSet(keyValueMap);
if (jsonNode.get("isStore").asBoolean()) {
//更新td
if (!highSpeedValueMap.isEmpty()){
List<RTData> highSpeedData = new ArrayList<>();
RTData rtHighData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(highSpeedValueMap).build();
highSpeedData.add(rtHighData);
tdEngineService.updateYCHighValues(highSpeedData,deviceInfoCacheById.getDeviceCode());
}
if (!lowSpeedValueMap.isEmpty()){
List<RTData> lowSpeedData = new ArrayList<>();
RTData rtLowData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(lowSpeedValueMap).build();
lowSpeedData.add(rtLowData);
tdEngineService.updateYCLowValues(lowSpeedData,deviceInfoCacheById.getDeviceCode());
}
}
} }
@Override @Override
@ -309,8 +337,8 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
// 使用 TypeReference 来指定转换目标类型 // 使用 TypeReference 来指定转换目标类型
List<DeviceEventVo> list = objectMapper.convertValue(dataEvent, new TypeReference<List<DeviceEventVo>>() { List<DeviceEventVo> list = objectMapper.convertValue(dataEvent, new TypeReference<List<DeviceEventVo>>() {
}); });
log.info("消息data转化deviceVo,{}",list); log.info("消息data转化deviceVo,{}", list);
for (DeviceEventVo item : list){ for (DeviceEventVo item : list) {
DeviceInfoCache deviceInfoCache = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.valueOf(item.getDeviceId())); DeviceInfoCache deviceInfoCache = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.valueOf(item.getDeviceId()));
DeviceEventInfo deviceEventInfo = new DeviceEventInfo(); DeviceEventInfo deviceEventInfo = new DeviceEventInfo();
deviceEventInfo.setEventTime(item.getEventTime()); deviceEventInfo.setEventTime(item.getEventTime());
@ -320,32 +348,31 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
deviceEventInfo.setDeviceCode(deviceInfoCache.getDeviceCode()); deviceEventInfo.setDeviceCode(deviceInfoCache.getDeviceCode());
String eventType = getEventType(item.getEventType()); String eventType = getEventType(item.getEventType());
String model = dataService.deviceModelMap.get(item.getDeviceId()); String model = dataService.deviceModelMap.get(item.getDeviceId());
if (StringUtils.isEmpty(model)){ if (StringUtils.isEmpty(model)) {
log.debug("未查询到物模型code设备id{}",item.getDeviceId()); log.debug("未查询到物模型code设备id{}", item.getDeviceId());
} }
String fieldName = dataService.fieldCodeNameMap.get(model).get(item.getAttrCode()); String fieldName = dataService.fieldCodeNameMap.get(model).get(item.getAttrCode());
if (StringUtils.isEmpty(fieldName)){ if (StringUtils.isEmpty(fieldName)) {
log.debug("未查询到物模型属性code设备id{}",item.getDeviceId()); log.debug("未查询到物模型属性code设备id{}", item.getDeviceId());
} }
deviceEventInfo.setEventType(item.getEventType()); deviceEventInfo.setEventType(item.getEventType());
deviceEventInfo.setEventLevel(0); deviceEventInfo.setEventLevel(0);
deviceEventInfo.setConfirmed(0); deviceEventInfo.setConfirmed(0);
if (!StringUtils.isEmpty(eventType) && eventType.equals("遥信变位")){ if (!StringUtils.isEmpty(eventType) && eventType.equals("遥信变位")) {
if (item.getAttrValue().equals(0)){ if (item.getAttrValue().equals(0)) {
deviceEventInfo.setEventText(fieldName + " 复归"); deviceEventInfo.setEventText(fieldName + " 复归");
}else { } else {
deviceEventInfo.setEventText(fieldName + " 动作"); deviceEventInfo.setEventText(fieldName + " 动作");
} }
} } else {
else{ deviceEventInfo.setEventText(fieldName + eventType + ",属性值为:" + item.getAttrValue() + ",越限值为:" + item.getLimitValue());
deviceEventInfo.setEventText(fieldName + eventType + ",属性值为:"+item.getAttrValue()+",越限值为:"+item.getLimitValue());
} }
valueList.add(deviceEventInfo); valueList.add(deviceEventInfo);
} }
try { try {
tdEngineService.updateDeviceEventValues(valueList); tdEngineService.updateDeviceEventValues(valueList);
}catch (Exception e){ } catch (Exception e) {
log.error("事件信息存入Td失败,失败原因{}",e); log.error("事件信息存入Td失败,失败原因{}", e);
} }
} }
@ -363,8 +390,8 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
@Override @Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception { public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, ""); String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, "");
Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); Long nodeId = (Long) session.getAttributes().get(NodeConstant.NODE_ID);
Long version = (Long)session.getAttributes().get(NodeConstant.VERSION); Long version = (Long) session.getAttributes().get(NodeConstant.VERSION);
if (nodeId == null || version == null) { if (nodeId == null || version == null) {
log.warn("检测到非法连接请求, IP: {}", remoteIp); log.warn("检测到非法连接请求, IP: {}", remoteIp);
try { try {
@ -373,7 +400,7 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
} }
return; return;
} }
if (onlineWSSessions.contains(nodeId)){ if (onlineWSSessions.contains(nodeId)) {
log.warn("检测到同一节点连接请求,已断开. NodeId: {}, IP:{}", nodeId, remoteIp); log.warn("检测到同一节点连接请求,已断开. NodeId: {}, IP:{}", nodeId, remoteIp);
try { try {
session.close(CloseStatus.NOT_ACCEPTABLE); session.close(CloseStatus.NOT_ACCEPTABLE);
@ -385,13 +412,14 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
onlineWSSessions.put(nodeId, concurrentWebSocketSessionDecorator); onlineWSSessions.put(nodeId, concurrentWebSocketSessionDecorator);
//如果采集程序的版本是0则直接下发当前配置 //如果采集程序的版本是0则直接下发当前配置
if (version == 0){ if (version == 0) {
sendTerminalConfig(nodeId); sendTerminalConfig(nodeId);
} }
} }
/** /**
* 收到节点Websocket报文回调函数 * 收到节点Websocket报文回调函数
*
* @param session websocket session * @param session websocket session
* @param message 报文内容 * @param message 报文内容
* @throws Exception * @throws Exception
@ -404,10 +432,9 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
return; return;
} }
//如果是应答报文跳过队列直接异步返回 //如果是应答报文跳过队列直接异步返回
if (responseCallback.contains(msg.getCmdId())){ if (responseCallback.contains(msg.getCmdId())) {
responseCallback.get(msg.getCmdId()).complete(msg); responseCallback.get(msg.getCmdId()).complete(msg);
} } else {
else{
//如果是主动请求报文加入队列等待处理 //如果是主动请求报文加入队列等待处理
String nodeId = session.getAttributes().get(NodeConstant.NODE_ID).toString(); String nodeId = session.getAttributes().get(NodeConstant.NODE_ID).toString();
String cmd = msg.getCmd(); String cmd = msg.getCmd();
@ -420,7 +447,7 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
@Override @Override
protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); Long nodeId = (Long) session.getAttributes().get(NodeConstant.NODE_ID);
log.info("收到 Node:{} Pong Message", nodeId); log.info("收到 Node:{} Pong Message", nodeId);
session.getAttributes().put(NodeConstant.LAST_PONG_TIME, System.currentTimeMillis()); session.getAttributes().put(NodeConstant.LAST_PONG_TIME, System.currentTimeMillis());
} }
@ -428,16 +455,16 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
@Override @Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, ""); String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, "");
Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); Long nodeId = (Long) session.getAttributes().get(NodeConstant.NODE_ID);
Long version = (Long)session.getAttributes().get(NodeConstant.VERSION); Long version = (Long) session.getAttributes().get(NodeConstant.VERSION);
log.error(String.format("IP: %s 通讯异常, NodeId:%d, Version: %d", remoteIp, nodeId, version), exception); log.error(String.format("IP: %s 通讯异常, NodeId:%d, Version: %d", remoteIp, nodeId, version), exception);
} }
@Override @Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, ""); String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, "");
Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); Long nodeId = (Long) session.getAttributes().get(NodeConstant.NODE_ID);
Long version = (Long)session.getAttributes().get(NodeConstant.VERSION); Long version = (Long) session.getAttributes().get(NodeConstant.VERSION);
log.info("IP: {} 已断开连接, NodeId:{}, Version: {}, sessionId:{}, 原因: {}", log.info("IP: {} 已断开连接, NodeId:{}, Version: {}, sessionId:{}, 原因: {}",
remoteIp, remoteIp,
nodeId, nodeId,
@ -451,11 +478,11 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
* 定时发送心跳报文并清理离线的终端 * 定时发送心跳报文并清理离线的终端
*/ */
@Scheduled(cron = "0/15 * * * * ?") @Scheduled(cron = "0/15 * * * * ?")
public void sendHeartbeat(){ public void sendHeartbeat() {
for (ConcurrentWebSocketSessionDecorator session : onlineWSSessions.values()) { for (ConcurrentWebSocketSessionDecorator session : onlineWSSessions.values()) {
//判断心跳是否超时超时则主动断开连接 //判断心跳是否超时超时则主动断开连接
Long lastPongTime = (Long) session.getAttributes().get(NodeConstant.LAST_PONG_TIME); Long lastPongTime = (Long) session.getAttributes().get(NodeConstant.LAST_PONG_TIME);
if (lastPongTime != null && ((System.currentTimeMillis() - lastPongTime) > HEARTBEAT_TIMEOUT)){ if (lastPongTime != null && ((System.currentTimeMillis() - lastPongTime) > HEARTBEAT_TIMEOUT)) {
closeSession(session); closeSession(session);
return; return;
} }
@ -465,36 +492,37 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
/** /**
* 发送ping消息 * 发送ping消息
*
* @param session * @param session
*/ */
private void SendPingMessage(ConcurrentWebSocketSessionDecorator session){ private void SendPingMessage(ConcurrentWebSocketSessionDecorator session) {
try { try {
session.sendMessage(new PingMessage()); session.sendMessage(new PingMessage());
} catch (Exception ignore) {
} }
catch (Exception ignore){}
} }
private void closeSession(WebSocketSession session){ private void closeSession(WebSocketSession session) {
try{ try {
session.close(CloseStatus.NO_CLOSE_FRAME); session.close(CloseStatus.NO_CLOSE_FRAME);
} catch (Exception ignore) {
} }
catch (Exception ignore){}
} }
/** /**
* 向指定采集节点发送指令(无返回值) * 向指定采集节点发送指令(无返回值)
* @param nodeId 节点ID *
* @param nodeId 节点ID
* @param message 指令 * @param message 指令
*/ */
public void sendActionMessage(Long nodeId, TerminalMessage message){ public void sendActionMessage(Long nodeId, TerminalMessage message) {
ConcurrentWebSocketSessionDecorator session = onlineWSSessions.get(nodeId); ConcurrentWebSocketSessionDecorator session = onlineWSSessions.get(nodeId);
if (session != null){ if (session != null) {
try { try {
session.sendMessage(new TextMessage(message.toJsonString())); session.sendMessage(new TextMessage(message.toJsonString()));
log.info("发送的消息为:{}", message.toJsonString()); log.info("发送的消息为:{}", message.toJsonString());
} } catch (Exception exception) {
catch (Exception exception){
log.error(String.format("发送消息失败: NodeId: %s", nodeId), exception); log.error(String.format("发送消息失败: NodeId: %s", nodeId), exception);
closeSession(session); closeSession(session);
} }
@ -503,6 +531,7 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
/** /**
* 向指定采集节点发送指令(无返回值) * 向指定采集节点发送指令(无返回值)
*
* @param nodeId 节点ID * @param nodeId 节点ID
* @param message 指令 * @param message 指令
* @return * @return
@ -512,11 +541,12 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
*/ */
@Override @Override
public TerminalMessage sendTerminalMessageWithResult(Long nodeId, TerminalMessage message) throws ExecutionException, InterruptedException, TimeoutException { public TerminalMessage sendTerminalMessageWithResult(Long nodeId, TerminalMessage message) throws ExecutionException, InterruptedException, TimeoutException {
return sendTerminalMessageWithResult(nodeId,message, 10); return sendTerminalMessageWithResult(nodeId, message, 10);
} }
/** /**
* 向指定采集节点发送指令(有返回值) * 向指定采集节点发送指令(有返回值)
*
* @param nodeId 节点ID * @param nodeId 节点ID
* @param message 指令 * @param message 指令
* @param timeout 超时时间 * @param timeout 超时时间