diff --git a/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java index 17c06659..7f8b0865 100644 --- a/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java +++ b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java @@ -87,7 +87,6 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node private DataServiceImpl dataService; - @PostConstruct public void init() { //初始化高性能队列 @@ -110,6 +109,7 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node disruptor.shutdown(); } } + public void pushMessage(TerminalMessage msg) { RingBuffer ringBuffer = disruptor.getRingBuffer(); if (ringBuffer == null) { @@ -233,7 +233,13 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node JsonNode jsonNode = data.getData(); String deviceId = jsonNode.get("deviceId").asText(); JsonNode values = jsonNode.get("values"); + Long dataTime = jsonNode.get("dataTime").asLong(); Map keyValueMap = new HashMap<>(); + DeviceInfoCache deviceInfoCacheById = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.valueOf(deviceId)); + Set highKey = dataService.highIotFieldMap.get(deviceInfoCacheById.getDeviceCode()).keySet(); + Set lowKey = dataService.lowIotFieldMap.get(deviceInfoCacheById.getDeviceCode()).keySet(); + Map highSpeedValueMap = new HashMap<>(); + Map lowSpeedValueMap = new HashMap<>(); //数据入redis Iterator keysHigh = values.fieldNames(); @@ -241,8 +247,30 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node String fieldName = keysHigh.next(); String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase()); keyValueMap.put(key, values.get(fieldName)); + if (highKey.contains(fieldName)){ + highSpeedValueMap.put(fieldName,values.get(fieldName)); + } + if (lowKey.contains(fieldName)){ + lowSpeedValueMap.put(fieldName,values.get(fieldName)); + } } adminRedisTemplate.mSet(keyValueMap); + if (jsonNode.get("isStore").asBoolean()) { + //更新td + if (!highSpeedValueMap.isEmpty()){ + List highSpeedData = new ArrayList<>(); + RTData rtHighData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(highSpeedValueMap).build(); + highSpeedData.add(rtHighData); + tdEngineService.updateYCHighValues(highSpeedData,deviceInfoCacheById.getDeviceCode()); + } + + if (!lowSpeedValueMap.isEmpty()){ + List lowSpeedData = new ArrayList<>(); + RTData rtLowData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(lowSpeedValueMap).build(); + lowSpeedData.add(rtLowData); + tdEngineService.updateYCLowValues(lowSpeedData,deviceInfoCacheById.getDeviceCode()); + } + } } @Override @@ -309,8 +337,8 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node // 使用 TypeReference 来指定转换目标类型 List list = objectMapper.convertValue(dataEvent, new TypeReference>() { }); - log.info("消息data转化deviceVo,{}",list); - for (DeviceEventVo item : list){ + log.info("消息data转化deviceVo,{}", list); + for (DeviceEventVo item : list) { DeviceInfoCache deviceInfoCache = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.valueOf(item.getDeviceId())); DeviceEventInfo deviceEventInfo = new DeviceEventInfo(); deviceEventInfo.setEventTime(item.getEventTime()); @@ -320,32 +348,31 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node deviceEventInfo.setDeviceCode(deviceInfoCache.getDeviceCode()); String eventType = getEventType(item.getEventType()); String model = dataService.deviceModelMap.get(item.getDeviceId()); - if (StringUtils.isEmpty(model)){ - log.debug("未查询到物模型code,设备id:{}",item.getDeviceId()); + if (StringUtils.isEmpty(model)) { + log.debug("未查询到物模型code,设备id:{}", item.getDeviceId()); } String fieldName = dataService.fieldCodeNameMap.get(model).get(item.getAttrCode()); - if (StringUtils.isEmpty(fieldName)){ - log.debug("未查询到物模型属性code,设备id:{}",item.getDeviceId()); + if (StringUtils.isEmpty(fieldName)) { + log.debug("未查询到物模型属性code,设备id:{}", item.getDeviceId()); } deviceEventInfo.setEventType(item.getEventType()); deviceEventInfo.setEventLevel(0); deviceEventInfo.setConfirmed(0); - if (!StringUtils.isEmpty(eventType) && eventType.equals("遥信变位")){ - if (item.getAttrValue().equals(0)){ + if (!StringUtils.isEmpty(eventType) && eventType.equals("遥信变位")) { + if (item.getAttrValue().equals(0)) { deviceEventInfo.setEventText(fieldName + " 复归"); - }else { + } else { deviceEventInfo.setEventText(fieldName + " 动作"); } - } - else{ - deviceEventInfo.setEventText(fieldName + eventType + ",属性值为:"+item.getAttrValue()+",越限值为:"+item.getLimitValue()); + } else { + deviceEventInfo.setEventText(fieldName + eventType + ",属性值为:" + item.getAttrValue() + ",越限值为:" + item.getLimitValue()); } valueList.add(deviceEventInfo); } try { tdEngineService.updateDeviceEventValues(valueList); - }catch (Exception e){ - log.error("事件信息存入Td失败,失败原因{}",e); + } catch (Exception e) { + log.error("事件信息存入Td失败,失败原因{}", e); } } @@ -363,8 +390,8 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, ""); - Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); - Long version = (Long)session.getAttributes().get(NodeConstant.VERSION); + Long nodeId = (Long) session.getAttributes().get(NodeConstant.NODE_ID); + Long version = (Long) session.getAttributes().get(NodeConstant.VERSION); if (nodeId == null || version == null) { log.warn("检测到非法连接请求, IP: {}", remoteIp); try { @@ -373,7 +400,7 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node } return; } - if (onlineWSSessions.contains(nodeId)){ + if (onlineWSSessions.contains(nodeId)) { log.warn("检测到同一节点连接请求,已断开. NodeId: {}, IP:{}", nodeId, remoteIp); try { session.close(CloseStatus.NOT_ACCEPTABLE); @@ -385,13 +412,14 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node onlineWSSessions.put(nodeId, concurrentWebSocketSessionDecorator); //如果采集程序的版本是0,则直接下发当前配置。 - if (version == 0){ + if (version == 0) { sendTerminalConfig(nodeId); } } /** * 收到节点Websocket报文回调函数 + * * @param session websocket session * @param message 报文内容 * @throws Exception @@ -404,10 +432,9 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node return; } //如果是应答报文,跳过队列,直接异步返回 - if (responseCallback.contains(msg.getCmdId())){ + if (responseCallback.contains(msg.getCmdId())) { responseCallback.get(msg.getCmdId()).complete(msg); - } - else{ + } else { //如果是主动请求报文,加入队列,等待处理 String nodeId = session.getAttributes().get(NodeConstant.NODE_ID).toString(); String cmd = msg.getCmd(); @@ -420,7 +447,7 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node @Override protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { - Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); + Long nodeId = (Long) session.getAttributes().get(NodeConstant.NODE_ID); log.info("收到 Node:{} Pong Message", nodeId); session.getAttributes().put(NodeConstant.LAST_PONG_TIME, System.currentTimeMillis()); } @@ -428,16 +455,16 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, ""); - Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); - Long version = (Long)session.getAttributes().get(NodeConstant.VERSION); + Long nodeId = (Long) session.getAttributes().get(NodeConstant.NODE_ID); + Long version = (Long) session.getAttributes().get(NodeConstant.VERSION); log.error(String.format("IP: %s 通讯异常, NodeId:%d, Version: %d", remoteIp, nodeId, version), exception); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, ""); - Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); - Long version = (Long)session.getAttributes().get(NodeConstant.VERSION); + Long nodeId = (Long) session.getAttributes().get(NodeConstant.NODE_ID); + Long version = (Long) session.getAttributes().get(NodeConstant.VERSION); log.info("IP: {} 已断开连接, NodeId:{}, Version: {}, sessionId:{}, 原因: {}", remoteIp, nodeId, @@ -451,11 +478,11 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node * 定时发送心跳报文,并清理离线的终端 */ @Scheduled(cron = "0/15 * * * * ?") - public void sendHeartbeat(){ + public void sendHeartbeat() { for (ConcurrentWebSocketSessionDecorator session : onlineWSSessions.values()) { //判断心跳是否超时,超时则主动断开连接 Long lastPongTime = (Long) session.getAttributes().get(NodeConstant.LAST_PONG_TIME); - if (lastPongTime != null && ((System.currentTimeMillis() - lastPongTime) > HEARTBEAT_TIMEOUT)){ + if (lastPongTime != null && ((System.currentTimeMillis() - lastPongTime) > HEARTBEAT_TIMEOUT)) { closeSession(session); return; } @@ -465,36 +492,37 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node /** * 发送ping消息 + * * @param session */ - private void SendPingMessage(ConcurrentWebSocketSessionDecorator session){ + private void SendPingMessage(ConcurrentWebSocketSessionDecorator session) { try { session.sendMessage(new PingMessage()); + } catch (Exception ignore) { } - catch (Exception ignore){} } - private void closeSession(WebSocketSession session){ - try{ + private void closeSession(WebSocketSession session) { + try { session.close(CloseStatus.NO_CLOSE_FRAME); + } catch (Exception ignore) { } - catch (Exception ignore){} } /** * 向指定采集节点发送指令(无返回值) - * @param nodeId 节点ID + * + * @param nodeId 节点ID * @param message 指令 */ - public void sendActionMessage(Long nodeId, TerminalMessage message){ + public void sendActionMessage(Long nodeId, TerminalMessage message) { ConcurrentWebSocketSessionDecorator session = onlineWSSessions.get(nodeId); - if (session != null){ + if (session != null) { try { session.sendMessage(new TextMessage(message.toJsonString())); log.info("发送的消息为:{}", message.toJsonString()); - } - catch (Exception exception){ + } catch (Exception exception) { log.error(String.format("发送消息失败: NodeId: %s", nodeId), exception); closeSession(session); } @@ -503,6 +531,7 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node /** * 向指定采集节点发送指令(无返回值) + * * @param nodeId 节点ID * @param message 指令 * @return @@ -512,11 +541,12 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node */ @Override public TerminalMessage sendTerminalMessageWithResult(Long nodeId, TerminalMessage message) throws ExecutionException, InterruptedException, TimeoutException { - return sendTerminalMessageWithResult(nodeId,message, 10); + return sendTerminalMessageWithResult(nodeId, message, 10); } /** * 向指定采集节点发送指令(有返回值) + * * @param nodeId 节点ID * @param message 指令 * @param timeout 超时时间(秒)