From 73f48a4c96aabe8bedd7c0151fdd9e4e1ac70ad2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=B7=E6=88=90=E4=BC=9F?= Date: Tue, 16 Jul 2024 17:29:11 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=86=85=E5=AD=98=E9=98=9F?= =?UTF-8?q?=E5=88=97=E7=94=A8=E4=BA=8E=E7=BB=88=E7=AB=AF=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E9=80=9A=E8=AE=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- das/pom.xml | 6 ++ .../node/constant/TerminalCommand.java | 5 ++ .../node/disruptor/MessageEventFactory.java | 11 ++++ .../TerminalMessageEventHandler.java | 27 ++++++++ .../node/domain/bo/TerminalMessage.java | 28 ++++++++ .../node/handler/NodeMessageHandler.java | 53 +++++++++++++-- .../das/modules/node/service/DataService.java | 8 +++ .../node/service/impl/DataServiceImpl.java | 66 +++++++++++++++++++ das/src/main/resources/application.yml | 5 ++ 9 files changed, 203 insertions(+), 6 deletions(-) create mode 100644 das/src/main/java/com/das/modules/node/constant/TerminalCommand.java create mode 100644 das/src/main/java/com/das/modules/node/disruptor/MessageEventFactory.java create mode 100644 das/src/main/java/com/das/modules/node/disruptor/TerminalMessageEventHandler.java create mode 100644 das/src/main/java/com/das/modules/node/domain/bo/TerminalMessage.java create mode 100644 das/src/main/java/com/das/modules/node/service/DataService.java create mode 100644 das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java diff --git a/das/pom.xml b/das/pom.xml index 3fc9df87..75a72cb3 100644 --- a/das/pom.xml +++ b/das/pom.xml @@ -151,6 +151,12 @@ spring-boot-starter-data-redis + + com.lmax + disruptor + 3.3.4 + + diff --git a/das/src/main/java/com/das/modules/node/constant/TerminalCommand.java b/das/src/main/java/com/das/modules/node/constant/TerminalCommand.java new file mode 100644 index 00000000..9a46abf9 --- /dev/null +++ b/das/src/main/java/com/das/modules/node/constant/TerminalCommand.java @@ -0,0 +1,5 @@ +package com.das.modules.node.constant; + +public interface TerminalCommand { + String CMD_HEARTBEAT = "heartbeat"; +} diff --git a/das/src/main/java/com/das/modules/node/disruptor/MessageEventFactory.java b/das/src/main/java/com/das/modules/node/disruptor/MessageEventFactory.java new file mode 100644 index 00000000..9a3a65f2 --- /dev/null +++ b/das/src/main/java/com/das/modules/node/disruptor/MessageEventFactory.java @@ -0,0 +1,11 @@ +package com.das.modules.node.disruptor; + +import com.das.modules.node.domain.bo.TerminalMessage; +import com.lmax.disruptor.EventFactory; + +public class MessageEventFactory implements EventFactory { + @Override + public TerminalMessage newInstance() { + return TerminalMessage.builder().build(); + } +} diff --git a/das/src/main/java/com/das/modules/node/disruptor/TerminalMessageEventHandler.java b/das/src/main/java/com/das/modules/node/disruptor/TerminalMessageEventHandler.java new file mode 100644 index 00000000..2b72f5b6 --- /dev/null +++ b/das/src/main/java/com/das/modules/node/disruptor/TerminalMessageEventHandler.java @@ -0,0 +1,27 @@ +package com.das.modules.node.disruptor; + +import com.das.modules.node.domain.bo.TerminalMessage; +import com.lmax.disruptor.EventHandler; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +@Component +public class TerminalMessageEventHandler implements EventHandler { + + private ConcurrentHashMap> callbackMap = new ConcurrentHashMap<>(16); + @Override + public void onEvent(TerminalMessage terminalMessage, long sequence, boolean endOfBatch) throws Exception { + log.info("收到消息: {}", terminalMessage.toJsonString()); + if (callbackMap.containsKey(terminalMessage.getCmdId())){ + //如果是回调函数,推送到回调函数 + callbackMap.get(terminalMessage.getCmdId()).complete(terminalMessage); + } + else{ + + } + } +} diff --git a/das/src/main/java/com/das/modules/node/domain/bo/TerminalMessage.java b/das/src/main/java/com/das/modules/node/domain/bo/TerminalMessage.java new file mode 100644 index 00000000..e90ae54f --- /dev/null +++ b/das/src/main/java/com/das/modules/node/domain/bo/TerminalMessage.java @@ -0,0 +1,28 @@ +package com.das.modules.node.domain.bo; + +import com.das.common.utils.JsonUtils; +import com.fasterxml.jackson.databind.JsonNode; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Builder +@AllArgsConstructor +@Data +public class TerminalMessage { + private String cmd; + private String cmdId; + private Long time; + private JsonNode data; + + public void from(TerminalMessage other){ + this.cmd = other.cmd; + this.cmdId = other.cmdId; + this.time = other.time; + this.data = other.data; + } + public String toJsonString() { + return JsonUtils.toJsonString(this); + } +} diff --git a/das/src/main/java/com/das/modules/node/handler/NodeMessageHandler.java b/das/src/main/java/com/das/modules/node/handler/NodeMessageHandler.java index 8a590c59..2948d497 100644 --- a/das/src/main/java/com/das/modules/node/handler/NodeMessageHandler.java +++ b/das/src/main/java/com/das/modules/node/handler/NodeMessageHandler.java @@ -1,13 +1,23 @@ package com.das.modules.node.handler; +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.thread.ThreadUtil; +import cn.hutool.core.util.StrUtil; +import com.das.common.utils.JsonUtils; +import com.das.common.utils.SpringUtil; import com.das.modules.node.constant.NodeConstant; +import com.das.modules.node.domain.bo.TerminalMessage; +import com.das.modules.node.service.DataService; +import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.BeanFactoryUtils; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.web.socket.*; import org.springframework.web.socket.handler.TextWebSocketHandler; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.*; @Component @Slf4j @@ -15,6 +25,8 @@ public class NodeMessageHandler extends TextWebSocketHandler { public static final Long HEARTBEAT_TIMEOUT = 1000 * 60 * 1L; private ConcurrentHashMap onlineSessions = new ConcurrentHashMap<>(16); + + private DataService dataService; @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, ""); @@ -25,7 +37,6 @@ public class NodeMessageHandler extends TextWebSocketHandler { //如果终端节点已在线,则拒绝新的终端连接 try { session.close(CloseStatus.NOT_ACCEPTABLE); - } catch (Exception ignore){} } @@ -38,7 +49,21 @@ public class NodeMessageHandler extends TextWebSocketHandler { @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { - super.handleTextMessage(session, message); + TerminalMessage msg = JsonUtils.parseObject(message.getPayload(), TerminalMessage.class); + String nodeId = session.getAttributes().get(NodeConstant.NODE_ID).toString(); + String cmdId = msg.getCmdId(); + String cmd = msg.getCmd(); + long time = msg.getTime(); + JsonNode data = msg.getData(); + + log.info("收到 Node:{} 命令: {}", nodeId, cmd); + log.debug("内容: {}", data.toString()); + if (dataService == null){ + dataService = SpringUtil.getBean(DataService.class); + } + if (dataService != null){ + dataService.pushMessage(msg); + } } @Override @@ -62,9 +87,7 @@ public class NodeMessageHandler extends TextWebSocketHandler { session.getId(), status.toString()); Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); - if (onlineSessions.containsKey(nodeId)){ - onlineSessions.remove(nodeId); - } + onlineSessions.remove(nodeId); } @@ -103,4 +126,22 @@ public class NodeMessageHandler extends TextWebSocketHandler { } catch (Exception ignore){} } + + /** + * 发送无返回值消息 + * @param nodeId + */ + public void sendActionMessage(Long nodeId, TerminalMessage message){ + WebSocketSession session = onlineSessions.get(nodeId); + if (session != null){ + try { + session.sendMessage(new TextMessage(message.toString())); + } + catch (Exception exception){ + log.error(String.format("发送消息失败: NodeId: %s", nodeId), exception); + closeSession(session); + } + } + } + } diff --git a/das/src/main/java/com/das/modules/node/service/DataService.java b/das/src/main/java/com/das/modules/node/service/DataService.java new file mode 100644 index 00000000..ee55d8d3 --- /dev/null +++ b/das/src/main/java/com/das/modules/node/service/DataService.java @@ -0,0 +1,8 @@ +package com.das.modules.node.service; + +import com.das.modules.node.domain.bo.TerminalMessage; + +public interface DataService { + + void pushMessage(TerminalMessage msg); +} diff --git a/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java b/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java new file mode 100644 index 00000000..68018577 --- /dev/null +++ b/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java @@ -0,0 +1,66 @@ +package com.das.modules.node.service.impl; + +import com.das.modules.node.disruptor.MessageEventFactory; +import com.das.modules.node.disruptor.TerminalMessageEventHandler; +import com.das.modules.node.domain.bo.TerminalMessage; +import com.das.modules.node.service.DataService; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.dsl.Disruptor; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +@Slf4j +@Service +public class DataServiceImpl implements DataService { + private Disruptor disruptor = null; + + private RingBuffer ringBuffer = null; + + @Resource + TerminalMessageEventHandler terminalMessageEventHandler; + + @PostConstruct + public void init() { + //初始化高性能队列 + Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + MessageEventFactory factory = new MessageEventFactory(); + Disruptor disruptor = new Disruptor<>(factory, 1024 * 256, executor); + disruptor.handleEventsWith(terminalMessageEventHandler); + disruptor.start(); + ringBuffer = disruptor.getRingBuffer(); + } + + + + @PreDestroy + public void destroy() { + if (disruptor != null){ + disruptor.shutdown(); + } + } + + @Override + public void pushMessage(TerminalMessage msg) { + if (ringBuffer == null){ + return; + } + long seq = ringBuffer.next(); + try{ + TerminalMessage terminalMessage = ringBuffer.get(seq); + terminalMessage.from(msg); + } + catch (Exception e){ + log.error("发送消息失败",e); + } + finally { + ringBuffer.publish(seq); + } + + } +} diff --git a/das/src/main/resources/application.yml b/das/src/main/resources/application.yml index bb752d77..bb25f22a 100644 --- a/das/src/main/resources/application.yml +++ b/das/src/main/resources/application.yml @@ -88,3 +88,8 @@ captcha: das: aes: key: b6967ee87b86d85a + +logging: + level: + com: + das: DEBUG \ No newline at end of file