添加内存队列用于终端消息通讯

This commit is contained in:
谷成伟 2024-07-16 17:29:11 +08:00
parent b0511d7ef7
commit 73f48a4c96
9 changed files with 203 additions and 6 deletions

View File

@ -151,6 +151,12 @@
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
<!--aop切面-->
<dependency>

View File

@ -0,0 +1,5 @@
package com.das.modules.node.constant;
public interface TerminalCommand {
String CMD_HEARTBEAT = "heartbeat";
}

View File

@ -0,0 +1,11 @@
package com.das.modules.node.disruptor;
import com.das.modules.node.domain.bo.TerminalMessage;
import com.lmax.disruptor.EventFactory;
public class MessageEventFactory implements EventFactory<TerminalMessage> {
@Override
public TerminalMessage newInstance() {
return TerminalMessage.builder().build();
}
}

View File

@ -0,0 +1,27 @@
package com.das.modules.node.disruptor;
import com.das.modules.node.domain.bo.TerminalMessage;
import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
public class TerminalMessageEventHandler implements EventHandler<TerminalMessage> {
private ConcurrentHashMap<String, CompletableFuture<TerminalMessage>> callbackMap = new ConcurrentHashMap<>(16);
@Override
public void onEvent(TerminalMessage terminalMessage, long sequence, boolean endOfBatch) throws Exception {
log.info("收到消息: {}", terminalMessage.toJsonString());
if (callbackMap.containsKey(terminalMessage.getCmdId())){
//如果是回调函数推送到回调函数
callbackMap.get(terminalMessage.getCmdId()).complete(terminalMessage);
}
else{
}
}
}

View File

@ -0,0 +1,28 @@
package com.das.modules.node.domain.bo;
import com.das.common.utils.JsonUtils;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Builder
@AllArgsConstructor
@Data
public class TerminalMessage {
private String cmd;
private String cmdId;
private Long time;
private JsonNode data;
public void from(TerminalMessage other){
this.cmd = other.cmd;
this.cmdId = other.cmdId;
this.time = other.time;
this.data = other.data;
}
public String toJsonString() {
return JsonUtils.toJsonString(this);
}
}

View File

@ -1,13 +1,23 @@
package com.das.modules.node.handler;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import com.das.common.utils.JsonUtils;
import com.das.common.utils.SpringUtil;
import com.das.modules.node.constant.NodeConstant;
import com.das.modules.node.domain.bo.TerminalMessage;
import com.das.modules.node.service.DataService;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.BeanFactoryUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.*;
@Component
@Slf4j
@ -15,6 +25,8 @@ public class NodeMessageHandler extends TextWebSocketHandler {
public static final Long HEARTBEAT_TIMEOUT = 1000 * 60 * 1L;
private ConcurrentHashMap<Long, WebSocketSession> onlineSessions = new ConcurrentHashMap<>(16);
private DataService dataService;
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, "");
@ -25,7 +37,6 @@ public class NodeMessageHandler extends TextWebSocketHandler {
//如果终端节点已在线则拒绝新的终端连接
try {
session.close(CloseStatus.NOT_ACCEPTABLE);
}
catch (Exception ignore){}
}
@ -38,7 +49,21 @@ public class NodeMessageHandler extends TextWebSocketHandler {
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
super.handleTextMessage(session, message);
TerminalMessage msg = JsonUtils.parseObject(message.getPayload(), TerminalMessage.class);
String nodeId = session.getAttributes().get(NodeConstant.NODE_ID).toString();
String cmdId = msg.getCmdId();
String cmd = msg.getCmd();
long time = msg.getTime();
JsonNode data = msg.getData();
log.info("收到 Node:{} 命令: {}", nodeId, cmd);
log.debug("内容: {}", data.toString());
if (dataService == null){
dataService = SpringUtil.getBean(DataService.class);
}
if (dataService != null){
dataService.pushMessage(msg);
}
}
@Override
@ -62,9 +87,7 @@ public class NodeMessageHandler extends TextWebSocketHandler {
session.getId(),
status.toString());
Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID);
if (onlineSessions.containsKey(nodeId)){
onlineSessions.remove(nodeId);
}
onlineSessions.remove(nodeId);
}
@ -103,4 +126,22 @@ public class NodeMessageHandler extends TextWebSocketHandler {
}
catch (Exception ignore){}
}
/**
* 发送无返回值消息
* @param nodeId
*/
public void sendActionMessage(Long nodeId, TerminalMessage message){
WebSocketSession session = onlineSessions.get(nodeId);
if (session != null){
try {
session.sendMessage(new TextMessage(message.toString()));
}
catch (Exception exception){
log.error(String.format("发送消息失败: NodeId: %s", nodeId), exception);
closeSession(session);
}
}
}
}

View File

@ -0,0 +1,8 @@
package com.das.modules.node.service;
import com.das.modules.node.domain.bo.TerminalMessage;
public interface DataService {
void pushMessage(TerminalMessage msg);
}

View File

@ -0,0 +1,66 @@
package com.das.modules.node.service.impl;
import com.das.modules.node.disruptor.MessageEventFactory;
import com.das.modules.node.disruptor.TerminalMessageEventHandler;
import com.das.modules.node.domain.bo.TerminalMessage;
import com.das.modules.node.service.DataService;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@Slf4j
@Service
public class DataServiceImpl implements DataService {
private Disruptor<TerminalMessage> disruptor = null;
private RingBuffer<TerminalMessage> ringBuffer = null;
@Resource
TerminalMessageEventHandler terminalMessageEventHandler;
@PostConstruct
public void init() {
//初始化高性能队列
Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
MessageEventFactory factory = new MessageEventFactory();
Disruptor<TerminalMessage> disruptor = new Disruptor<>(factory, 1024 * 256, executor);
disruptor.handleEventsWith(terminalMessageEventHandler);
disruptor.start();
ringBuffer = disruptor.getRingBuffer();
}
@PreDestroy
public void destroy() {
if (disruptor != null){
disruptor.shutdown();
}
}
@Override
public void pushMessage(TerminalMessage msg) {
if (ringBuffer == null){
return;
}
long seq = ringBuffer.next();
try{
TerminalMessage terminalMessage = ringBuffer.get(seq);
terminalMessage.from(msg);
}
catch (Exception e){
log.error("发送消息失败",e);
}
finally {
ringBuffer.publish(seq);
}
}
}

View File

@ -88,3 +88,8 @@ captcha:
das:
aes:
key: b6967ee87b86d85a
logging:
level:
com:
das: DEBUG