From efb2cce55f20c125a17bf5bec9eadd4d5e97e94f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=B7=E6=88=90=E4=BC=9F?= Date: Tue, 12 Nov 2024 15:31:02 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E8=8A=82=E7=82=B9=EF=BC=8C?= =?UTF-8?q?=E9=93=BE=E8=B7=AF=E4=BB=A3=E7=A0=81=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../das/common/config/WebsocketConfig.java | 13 +- .../node/disruptor/MessageEventFactory.java | 11 - .../TerminalMessageEventHandler.java | 57 ----- .../TerminalMessageWorkerHandler.java | 32 +++ .../node/domain/bo/TerminalMessage.java | 2 +- .../node/handler/NodeMessageHandler.java | 150 ------------ ...=> NodeWebsocketHandshakeInterceptor.java} | 5 +- .../node/service/NodeMessageService.java | 12 +- .../service/impl/NodeMessageServiceImpl.java | 228 +++++++++++++++--- .../node/service/impl/SysNodeServiceImpl.java | 24 +- .../operation/service/OperationService.java | 8 +- 11 files changed, 262 insertions(+), 280 deletions(-) delete mode 100644 das/src/main/java/com/das/modules/node/disruptor/MessageEventFactory.java delete mode 100644 das/src/main/java/com/das/modules/node/disruptor/TerminalMessageEventHandler.java create mode 100644 das/src/main/java/com/das/modules/node/disruptor/TerminalMessageWorkerHandler.java delete mode 100644 das/src/main/java/com/das/modules/node/handler/NodeMessageHandler.java rename das/src/main/java/com/das/modules/node/handler/{NodeHandshakeInterceptor.java => NodeWebsocketHandshakeInterceptor.java} (93%) diff --git a/das/src/main/java/com/das/common/config/WebsocketConfig.java b/das/src/main/java/com/das/common/config/WebsocketConfig.java index f28aee31..95361632 100644 --- a/das/src/main/java/com/das/common/config/WebsocketConfig.java +++ b/das/src/main/java/com/das/common/config/WebsocketConfig.java @@ -1,10 +1,9 @@ package com.das.common.config; -import com.das.modules.node.handler.NodeHandshakeInterceptor; -import com.das.modules.node.handler.NodeMessageHandler; +import com.das.modules.node.handler.NodeWebsocketHandshakeInterceptor; +import com.das.modules.node.service.impl.NodeMessageServiceImpl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; -import org.springframework.stereotype.Component; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; @@ -12,15 +11,13 @@ import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry @EnableWebSocket @Configuration public class WebsocketConfig implements WebSocketConfigurer { - @Autowired - NodeHandshakeInterceptor nodeHandshakeInterceptor; @Autowired - NodeMessageHandler nodeMessageHandler; + NodeMessageServiceImpl nodeMessageService; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { - registry.addHandler(nodeMessageHandler, "/node/{nodeId}/{version}") + registry.addHandler(nodeMessageService, "/node/{nodeId}/{version}") .setAllowedOrigins("*") - .addInterceptors(nodeHandshakeInterceptor); + .addInterceptors(new NodeWebsocketHandshakeInterceptor()); } } diff --git a/das/src/main/java/com/das/modules/node/disruptor/MessageEventFactory.java b/das/src/main/java/com/das/modules/node/disruptor/MessageEventFactory.java deleted file mode 100644 index 9a3a65f2..00000000 --- a/das/src/main/java/com/das/modules/node/disruptor/MessageEventFactory.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.das.modules.node.disruptor; - -import com.das.modules.node.domain.bo.TerminalMessage; -import com.lmax.disruptor.EventFactory; - -public class MessageEventFactory implements EventFactory { - @Override - public TerminalMessage newInstance() { - return TerminalMessage.builder().build(); - } -} diff --git a/das/src/main/java/com/das/modules/node/disruptor/TerminalMessageEventHandler.java b/das/src/main/java/com/das/modules/node/disruptor/TerminalMessageEventHandler.java deleted file mode 100644 index 0234d9d0..00000000 --- a/das/src/main/java/com/das/modules/node/disruptor/TerminalMessageEventHandler.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.das.modules.node.disruptor; - -import com.das.common.utils.SpringUtils; -import com.das.modules.node.command.BaseCommand; -import com.das.modules.node.domain.bo.TerminalMessage; -import com.das.modules.node.handler.NodeMessageHandler; -import com.lmax.disruptor.EventHandler; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.util.concurrent.*; - -@Slf4j -@Component -public class TerminalMessageEventHandler implements EventHandler { - - @Autowired - NodeMessageHandler nodeMessageHandler; - - private ConcurrentHashMap> callbackMap = new ConcurrentHashMap<>(16); - @Override - public void onEvent(TerminalMessage terminalMessage, long sequence, boolean endOfBatch) throws Exception { -// log.info("收到消息: {}", terminalMessage.toJsonString()); - if (callbackMap.containsKey(terminalMessage.getCmdId())){ - //如果是回调函数,推送到回调函数 - callbackMap.get(terminalMessage.getCmdId()).complete(terminalMessage); - } else{ - String cmd = terminalMessage.getCmd(); - BaseCommand commander = null; - try { - commander = SpringUtils.getBean(cmd); - } catch (Exception e) { - log.debug("当前未找到执行command容器"); - } - if (commander != null) { - try { - commander.doCommand(terminalMessage); - } catch (Exception ex) { - log.error(String.format("命令 - %s 处理失败", cmd), ex); - } - } else { - log.error("命令[{}]无效, 未发现实现适配器!", cmd); - } - } - } - - - public void sendTerminalMessageWithResult(Long nodeId, TerminalMessage message) throws ExecutionException, InterruptedException, TimeoutException { - nodeMessageHandler.sendActionMessage(nodeId, message); - - CompletableFuture future = new CompletableFuture<>(); - callbackMap.put(message.getCmdId(), future); - - TerminalMessage result = future.get(10, TimeUnit.SECONDS); - } -} diff --git a/das/src/main/java/com/das/modules/node/disruptor/TerminalMessageWorkerHandler.java b/das/src/main/java/com/das/modules/node/disruptor/TerminalMessageWorkerHandler.java new file mode 100644 index 00000000..2ea1a071 --- /dev/null +++ b/das/src/main/java/com/das/modules/node/disruptor/TerminalMessageWorkerHandler.java @@ -0,0 +1,32 @@ +package com.das.modules.node.disruptor; + +import com.das.common.utils.SpringUtils; +import com.das.modules.node.command.BaseCommand; +import com.das.modules.node.domain.bo.TerminalMessage; +import com.lmax.disruptor.WorkHandler; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class TerminalMessageWorkerHandler implements WorkHandler { + + + @Override + public void onEvent(TerminalMessage event) throws Exception { + String cmd = event.getCmd(); + BaseCommand commander = null; + try { + commander = SpringUtils.getBean(cmd); + } catch (Exception e) { + log.debug("当前未找到执行command容器"); + } + if (commander != null) { + try { + commander.doCommand(event); + } catch (Exception ex) { + log.error(String.format("命令 - %s 处理失败", cmd), ex); + } + } else { + log.error("命令[{}]无效, 未发现实现适配器!", cmd); + } + } +} diff --git a/das/src/main/java/com/das/modules/node/domain/bo/TerminalMessage.java b/das/src/main/java/com/das/modules/node/domain/bo/TerminalMessage.java index e90ae54f..16951875 100644 --- a/das/src/main/java/com/das/modules/node/domain/bo/TerminalMessage.java +++ b/das/src/main/java/com/das/modules/node/domain/bo/TerminalMessage.java @@ -6,8 +6,8 @@ import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; - @Builder +@NoArgsConstructor @AllArgsConstructor @Data public class TerminalMessage { diff --git a/das/src/main/java/com/das/modules/node/handler/NodeMessageHandler.java b/das/src/main/java/com/das/modules/node/handler/NodeMessageHandler.java deleted file mode 100644 index 1c7c2262..00000000 --- a/das/src/main/java/com/das/modules/node/handler/NodeMessageHandler.java +++ /dev/null @@ -1,150 +0,0 @@ -package com.das.modules.node.handler; - -import com.das.common.utils.JsonUtils; -import com.das.common.utils.SpringUtil; -import com.das.modules.node.constant.NodeConstant; -import com.das.modules.node.domain.bo.TerminalMessage; -import com.das.modules.node.service.NodeMessageService; -import com.fasterxml.jackson.databind.JsonNode; -import lombok.extern.slf4j.Slf4j; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; -import org.springframework.web.socket.*; -import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; -import org.springframework.web.socket.handler.TextWebSocketHandler; - -import java.util.concurrent.*; - -@Component -@Slf4j -public class NodeMessageHandler extends TextWebSocketHandler { - - public static final Long HEARTBEAT_TIMEOUT = 1000 * 60 * 1L; - private ConcurrentHashMap onlineSessions = new ConcurrentHashMap<>(16); - - private NodeMessageService nodeMessageService; - @Override - public void afterConnectionEstablished(WebSocketSession session) throws Exception { - String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, ""); - Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); - Long version = (Long)session.getAttributes().get(NodeConstant.VERSION); - long time = System.currentTimeMillis(); - log.debug("IP: {} 请求连接. sessionId: {}", remoteIp, session.getId()); - if (onlineSessions.containsKey(nodeId)){ - //如果终端节点已在线,则拒绝新的终端连接 - try { - session.close(CloseStatus.NOT_ACCEPTABLE); - } - catch (Exception ignore){} - } - else { - log.info("IP: {} 准许连接, NodeId:{}, Version: {}, sessionId: {}", remoteIp, nodeId, version, session.getId()); - onlineSessions.put(nodeId, new ConcurrentWebSocketSessionDecorator(session, 5 * 1000, 2 * 1024 * 1024)); - } - - // 如果version是0,则需要调用一次configUpdate配置更新 - if (version == 0){ - if (nodeMessageService == null){ - nodeMessageService = SpringUtil.getBean(NodeMessageService.class); - } - nodeMessageService.sendTerminalConfig(Long.valueOf(nodeId)); - } - - } - - @Override - protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { - TerminalMessage msg = JsonUtils.parseObject(message.getPayload(), TerminalMessage.class); - String nodeId = session.getAttributes().get(NodeConstant.NODE_ID).toString(); - String cmd = msg.getCmd(); - JsonNode data = msg.getData(); - log.info("收到 Node:{} 命令: {}", nodeId, cmd); - if (nodeMessageService == null){ - nodeMessageService = SpringUtil.getBean(NodeMessageService.class); - } - if (nodeMessageService != null){ - nodeMessageService.pushMessage(msg); - } - - } - - @Override - protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { - Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); - log.info("收到 Node:{} Pong Message", nodeId); - session.getAttributes().put(NodeConstant.LAST_PONG_TIME, System.currentTimeMillis()); - } - - @Override - public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { - log.error(String.format("通讯异常: NodeId: %s", session.getAttributes().get(NodeConstant.NODE_ID)), exception); - } - - @Override - public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { - log.info("IP: {} 已断开连接, NodeId:{}, Version: {}, sessionId:{}, 原因: {}", - session.getAttributes().get(NodeConstant.REMOTE_IP), - session.getAttributes().get(NodeConstant.NODE_ID), - session.getAttributes().get(NodeConstant.VERSION), - session.getId(), - status.toString()); - Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); - onlineSessions.remove(nodeId); - } - - - /** - * 定时发送心跳报文,并清理离线的终端 - */ - @Scheduled(cron = "0/15 * * * * ?") - public void sendHeartbeat(){ - for (ConcurrentWebSocketSessionDecorator session : onlineSessions.values()) { - //判断心跳是否超时,超时则主动断开连接 - Long lastPongTime = (Long) session.getAttributes().get(NodeConstant.LAST_PONG_TIME); - if (lastPongTime != null && ((System.currentTimeMillis() - lastPongTime) > HEARTBEAT_TIMEOUT)){ - closeSession(session); - return; - } - SendPingMessage(session); - } - } - - - private void closeSession(WebSocketSession session){ - try{ - session.close(CloseStatus.NO_CLOSE_FRAME); - } - catch (Exception ignore){} - - } - - /** - * 发送ping消息 - * @param session - */ - private void SendPingMessage(ConcurrentWebSocketSessionDecorator session){ - try { - session.sendMessage(new PingMessage()); - } - catch (Exception ignore){} - } - - /** - * 发送无返回值消息 - * @param nodeId - */ - public void sendActionMessage(Long nodeId, TerminalMessage message){ - ConcurrentWebSocketSessionDecorator session = onlineSessions.get(nodeId); - if (session != null){ - try { - session.sendMessage(new TextMessage(message.toJsonString())); - log.info("发送的消息为:{}", message.toJsonString()); - } - catch (Exception exception){ - log.error(String.format("发送消息失败: NodeId: %s", nodeId), exception); - closeSession(session); - } - } - } - -} diff --git a/das/src/main/java/com/das/modules/node/handler/NodeHandshakeInterceptor.java b/das/src/main/java/com/das/modules/node/handler/NodeWebsocketHandshakeInterceptor.java similarity index 93% rename from das/src/main/java/com/das/modules/node/handler/NodeHandshakeInterceptor.java rename to das/src/main/java/com/das/modules/node/handler/NodeWebsocketHandshakeInterceptor.java index 1cc543b2..f3c77ba6 100644 --- a/das/src/main/java/com/das/modules/node/handler/NodeHandshakeInterceptor.java +++ b/das/src/main/java/com/das/modules/node/handler/NodeWebsocketHandshakeInterceptor.java @@ -7,10 +7,8 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.stereotype.Component; -import org.springframework.util.MultiValueMap; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.HandshakeInterceptor; -import org.springframework.web.util.UriComponentsBuilder; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; @@ -22,8 +20,7 @@ import java.util.Map; * Websocket握手拦截器 */ @Slf4j -@Component -public class NodeHandshakeInterceptor implements HandshakeInterceptor { +public class NodeWebsocketHandshakeInterceptor implements HandshakeInterceptor { public String getRealIp(ServerHttpRequest request) { diff --git a/das/src/main/java/com/das/modules/node/service/NodeMessageService.java b/das/src/main/java/com/das/modules/node/service/NodeMessageService.java index ec1abb92..c23de923 100644 --- a/das/src/main/java/com/das/modules/node/service/NodeMessageService.java +++ b/das/src/main/java/com/das/modules/node/service/NodeMessageService.java @@ -3,12 +3,19 @@ package com.das.modules.node.service; import com.das.modules.node.domain.bo.CalculateRTData; import com.das.modules.node.domain.bo.TerminalMessage; import com.fasterxml.jackson.databind.JsonNode; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +/** + * 节点消息处理服务 + */ public interface NodeMessageService { - void pushMessage(TerminalMessage msg); JsonNode sendTerminalConfig(Long nodeId); @@ -19,4 +26,7 @@ public interface NodeMessageService { void handleLowSpeed(TerminalMessage data); void handleDeviceEvent(TerminalMessage data); + + TerminalMessage sendTerminalMessageWithResult(Long nodeId, TerminalMessage message) throws ExecutionException, InterruptedException, TimeoutException; + TerminalMessage sendTerminalMessageWithResult(Long nodeId, TerminalMessage message, long timeout) throws ExecutionException, InterruptedException, TimeoutException; } diff --git a/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java index b71c256b..257736e0 100644 --- a/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java +++ b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java @@ -1,64 +1,76 @@ package com.das.modules.node.service.impl; +import cn.hutool.core.util.IdUtil; import com.baomidou.mybatisplus.core.toolkit.IdWorker; +import com.das.common.constant.MeasType; +import com.das.common.utils.AdminRedisTemplate; +import com.das.common.utils.JsonUtils; import com.das.common.utils.StringUtils; import com.das.modules.cache.domain.DeviceInfoCache; import com.das.modules.cache.service.CacheService; import com.das.modules.data.domain.DeviceEventInfo; +import com.das.modules.data.service.TDEngineService; import com.das.modules.data.service.impl.DataServiceImpl; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.core.type.TypeReference; -import cn.hutool.core.util.IdUtil; -import com.das.common.constant.MeasType; -import com.das.common.utils.AdminRedisTemplate; import com.das.modules.equipment.mapper.SysIotModelMapper; -import com.das.modules.node.disruptor.MessageEventFactory; -import com.das.modules.node.disruptor.TerminalMessageEventHandler; +import com.das.modules.node.constant.NodeConstant; +import com.das.modules.node.disruptor.TerminalMessageWorkerHandler; import com.das.modules.node.domain.bo.RTData; import com.das.modules.node.domain.bo.TerminalMessage; import com.das.modules.node.domain.vo.*; -import com.das.modules.node.handler.NodeMessageHandler; import com.das.modules.node.mapper.SysCommunicationLinkMapper; import com.das.modules.node.mapper.SysImpTabMappingMapper; import com.das.modules.node.service.NodeMessageService; -import com.das.modules.data.service.TDEngineService; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import com.lmax.disruptor.util.DaemonThreadFactory; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; +import org.springframework.web.socket.*; +import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; +import org.springframework.web.socket.handler.TextWebSocketHandler; +import java.io.IOException; import java.util.*; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; +import java.util.concurrent.*; @Slf4j @Service -public class NodeMessageServiceImpl implements NodeMessageService { +public class NodeMessageServiceImpl extends TextWebSocketHandler implements NodeMessageService { + + public static final Long HEARTBEAT_TIMEOUT = 1000 * 60 * 1L; + + /** + * JSON 转换器 + */ private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); + /** + * websocket 会话集合 + */ + private final ConcurrentHashMap onlineWSSessions = new ConcurrentHashMap<>(16); + + private ConcurrentHashMap> responseCallback = new ConcurrentHashMap<>(16); + private Disruptor disruptor = null; - private RingBuffer ringBuffer = null; - - @Resource - TerminalMessageEventHandler terminalMessageEventHandler; - @Resource SysCommunicationLinkMapper sysCommunicationLinkMapper; @Resource SysImpTabMappingMapper sysImptabmappingMapper; - @Resource - private NodeMessageHandler nodeMessageHandler; - @Autowired AdminRedisTemplate adminRedisTemplate; @@ -79,27 +91,27 @@ public class NodeMessageServiceImpl implements NodeMessageService { @PostConstruct public void init() { //初始化高性能队列 - Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); - MessageEventFactory factory = new MessageEventFactory(); - Disruptor disruptor = new Disruptor<>(factory, 1024 * 256, executor); - disruptor.handleEventsWith(terminalMessageEventHandler); + int cpu = Runtime.getRuntime().availableProcessors(); + int bufferSize = 1024 * 4; + disruptor = new Disruptor<>(TerminalMessage::new, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, new YieldingWaitStrategy()); + // + TerminalMessageWorkerHandler[] workerHandlers = new TerminalMessageWorkerHandler[cpu]; + for (int i = 0; i < cpu; i++) { + workerHandlers[i] = new TerminalMessageWorkerHandler(); + } + disruptor.handleEventsWithWorkerPool(workerHandlers); disruptor.start(); - ringBuffer = disruptor.getRingBuffer(); } @PreDestroy public void destroy() { - if (ringBuffer != null) { - ringBuffer = null; - } if (disruptor != null) { disruptor.shutdown(); } } - - @Override public void pushMessage(TerminalMessage msg) { + RingBuffer ringBuffer = disruptor.getRingBuffer(); if (ringBuffer == null) { return; } @@ -212,7 +224,7 @@ public class NodeMessageServiceImpl implements NodeMessageService { .time(time) .data(jsonNode) .build(); - nodeMessageHandler.sendActionMessage(nodeId, configUpdate); + sendActionMessage(nodeId, configUpdate); return jsonNode; } @@ -347,4 +359,156 @@ public class NodeMessageServiceImpl implements NodeMessageService { default -> null; }; } + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, ""); + Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); + Long version = (Long)session.getAttributes().get(NodeConstant.VERSION); + if (nodeId == null || version == null) { + log.warn("检测到非法连接请求, IP: {}", remoteIp); + try { + session.close(CloseStatus.NOT_ACCEPTABLE); + } catch (IOException ignored) { + } + return; + } + if (onlineWSSessions.contains(nodeId)){ + log.warn("检测到同一节点连接请求,已断开. NodeId: {}, IP:{}", nodeId, remoteIp); + try { + session.close(CloseStatus.NOT_ACCEPTABLE); + } catch (IOException ignored) { + } + return; + } + ConcurrentWebSocketSessionDecorator concurrentWebSocketSessionDecorator = new ConcurrentWebSocketSessionDecorator(session, 5 * 1000, 2 * 1024 * 1024); + onlineWSSessions.put(nodeId, concurrentWebSocketSessionDecorator); + + //如果采集程序的版本是0,则直接下发当前配置。 + if (version == 0){ + sendTerminalConfig(nodeId); + } + } + + /** + * 收到节点Websocket报文回调函数 + * @param session websocket session + * @param message 报文内容 + * @throws Exception + */ + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { + TerminalMessage msg = JsonUtils.parseObject(message.getPayload(), TerminalMessage.class); + if (msg == null) { + log.warn("收到非法报文:{}", message.getPayload()); + return; + } + //如果是应答报文,跳过队列,直接异步返回 + if (responseCallback.contains(msg.getCmdId())){ + responseCallback.get(msg.getCmdId()).complete(msg); + } + else{ + //如果是主动请求报文,加入队列,等待处理 + String nodeId = session.getAttributes().get(NodeConstant.NODE_ID).toString(); + String cmd = msg.getCmd(); + JsonNode data = msg.getData(); + log.debug("收到 Node:{} WS 报文: {}", nodeId, cmd); + pushMessage(msg); + } + + } + + @Override + protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { + Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); + log.info("收到 Node:{} Pong Message", nodeId); + session.getAttributes().put(NodeConstant.LAST_PONG_TIME, System.currentTimeMillis()); + } + + @Override + public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { + String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, ""); + Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); + Long version = (Long)session.getAttributes().get(NodeConstant.VERSION); + log.error(String.format("IP: %s 通讯异常, NodeId:%d, Version: %d", remoteIp, nodeId, version), exception); + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { + String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, ""); + Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); + Long version = (Long)session.getAttributes().get(NodeConstant.VERSION); + log.info("IP: {} 已断开连接, NodeId:{}, Version: {}, sessionId:{}, 原因: {}", + remoteIp, + nodeId, + version, + session.getId(), + status.toString()); + onlineWSSessions.remove(nodeId); + } + + /** + * 定时发送心跳报文,并清理离线的终端 + */ + @Scheduled(cron = "0/15 * * * * ?") + public void sendHeartbeat(){ + for (ConcurrentWebSocketSessionDecorator session : onlineWSSessions.values()) { + //判断心跳是否超时,超时则主动断开连接 + Long lastPongTime = (Long) session.getAttributes().get(NodeConstant.LAST_PONG_TIME); + if (lastPongTime != null && ((System.currentTimeMillis() - lastPongTime) > HEARTBEAT_TIMEOUT)){ + closeSession(session); + return; + } + SendPingMessage(session); + } + } + + /** + * 发送ping消息 + * @param session + */ + private void SendPingMessage(ConcurrentWebSocketSessionDecorator session){ + try { + session.sendMessage(new PingMessage()); + } + catch (Exception ignore){} + } + + private void closeSession(WebSocketSession session){ + try{ + session.close(CloseStatus.NO_CLOSE_FRAME); + } + catch (Exception ignore){} + + } + + /** + * 发送无返回值消息 + * @param nodeId + */ + public void sendActionMessage(Long nodeId, TerminalMessage message){ + ConcurrentWebSocketSessionDecorator session = onlineWSSessions.get(nodeId); + if (session != null){ + try { + session.sendMessage(new TextMessage(message.toJsonString())); + log.info("发送的消息为:{}", message.toJsonString()); + } + catch (Exception exception){ + log.error(String.format("发送消息失败: NodeId: %s", nodeId), exception); + closeSession(session); + } + } + } + + public TerminalMessage sendTerminalMessageWithResult(Long nodeId, TerminalMessage message) throws ExecutionException, InterruptedException, TimeoutException { + return sendTerminalMessageWithResult(nodeId,message, 10); + } + + @Override + public TerminalMessage sendTerminalMessageWithResult(Long nodeId, TerminalMessage message, long timeout) throws ExecutionException, InterruptedException, TimeoutException { + sendActionMessage(nodeId, message); + CompletableFuture future = new CompletableFuture<>(); + responseCallback.put(message.getCmdId(), future); + return future.get(timeout, TimeUnit.SECONDS); + } } diff --git a/das/src/main/java/com/das/modules/node/service/impl/SysNodeServiceImpl.java b/das/src/main/java/com/das/modules/node/service/impl/SysNodeServiceImpl.java index 031bc7e5..6be4c0fa 100644 --- a/das/src/main/java/com/das/modules/node/service/impl/SysNodeServiceImpl.java +++ b/das/src/main/java/com/das/modules/node/service/impl/SysNodeServiceImpl.java @@ -2,11 +2,9 @@ package com.das.modules.node.service.impl; import cn.dev33.satoken.stp.StpUtil; import cn.hutool.core.io.IoUtil; -import cn.hutool.core.util.IdUtil; import com.baomidou.mybatisplus.core.metadata.IPage; import com.das.common.config.SessionUtil; import com.das.common.constant.MeasType; -import com.das.common.exceptions.ServiceException; import com.das.common.utils.BeanCopyUtils; import com.das.common.utils.PageDataInfo; import com.das.common.utils.PageQuery; @@ -19,11 +17,14 @@ import com.das.modules.equipment.domain.vo.SysIotModelServiceVo; import com.das.modules.equipment.mapper.SysEquipmentMapper; import com.das.modules.equipment.mapper.SysIotModelFieldMapper; import com.das.modules.equipment.mapper.SysIotModelServiceMapper; -import com.das.modules.node.constant.NodeConstant; -import com.das.modules.node.disruptor.TerminalMessageEventHandler; -import com.das.modules.node.domain.bo.TerminalMessage; -import com.das.modules.node.domain.dto.*; -import com.das.modules.node.domain.vo.*; +import com.das.modules.node.domain.dto.BindEquipmentInfoDto; +import com.das.modules.node.domain.dto.QueryTabMappingParamDto; +import com.das.modules.node.domain.dto.SysCommunicationLinkDto; +import com.das.modules.node.domain.dto.SysNodeDto; +import com.das.modules.node.domain.vo.EquipmentVo; +import com.das.modules.node.domain.vo.SysCommunicationLinkVo; +import com.das.modules.node.domain.vo.SysNodeVo; +import com.das.modules.node.domain.vo.SysTabMappingVo; import com.das.modules.node.entity.SysCommunicationLink; import com.das.modules.node.entity.SysNode; import com.das.modules.node.entity.SysTabMapping; @@ -32,7 +33,6 @@ import com.das.modules.node.mapper.SysImpTabMappingMapper; import com.das.modules.node.mapper.SysNodeMapper; import com.das.modules.node.service.SysNodeService; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; import lombok.extern.slf4j.Slf4j; @@ -42,7 +42,10 @@ import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; import org.springframework.web.multipart.MultipartFile; -import java.io.*; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.*; @@ -74,9 +77,6 @@ public class SysNodeServiceImpl implements SysNodeService { @Autowired SysIotModelServiceMapper iotModelServiceMapper; - @Autowired - TerminalMessageEventHandler terminalMessageEventHandler; - @Override public List querySysNodeList() { diff --git a/das/src/main/java/com/das/modules/operation/service/OperationService.java b/das/src/main/java/com/das/modules/operation/service/OperationService.java index 4aaac176..7252e0ec 100644 --- a/das/src/main/java/com/das/modules/operation/service/OperationService.java +++ b/das/src/main/java/com/das/modules/operation/service/OperationService.java @@ -8,10 +8,10 @@ import com.das.common.exceptions.ServiceException; import com.das.common.utils.AdminRedisTemplate; import com.das.modules.auth.domain.vo.SysUserVo; import com.das.modules.node.constant.NodeConstant; -import com.das.modules.node.disruptor.TerminalMessageEventHandler; import com.das.modules.node.domain.bo.TerminalMessage; import com.das.modules.node.domain.vo.SysNodeVo; import com.das.modules.node.mapper.SysNodeMapper; +import com.das.modules.node.service.NodeMessageService; import com.das.modules.operation.domain.dto.CommandInfoDto; import com.das.modules.operation.entity.SysManualStatus; import com.das.modules.operation.entity.SysOperationLog; @@ -49,10 +49,10 @@ public class OperationService { private SysOperationLogMapper sysOperationLogMapper; @Autowired - TerminalMessageEventHandler terminalMessageEventHandler; + AdminRedisTemplate adminRedisTemplate; @Autowired - AdminRedisTemplate adminRedisTemplate; + NodeMessageService nodeMessageService; @@ -127,7 +127,7 @@ public class OperationService { .time(time) .data(jsonNode) .build(); - terminalMessageEventHandler.sendTerminalMessageWithResult(activeNodeId, configUpdate); + nodeMessageService.sendTerminalMessageWithResult(activeNodeId, configUpdate); } catch (Exception e) { throw new ServiceException("设备控制失败 "+ e); }