调整节点,链路代码结构
This commit is contained in:
parent
b3792cc841
commit
efb2cce55f
@ -1,10 +1,9 @@
|
||||
package com.das.common.config;
|
||||
|
||||
import com.das.modules.node.handler.NodeHandshakeInterceptor;
|
||||
import com.das.modules.node.handler.NodeMessageHandler;
|
||||
import com.das.modules.node.handler.NodeWebsocketHandshakeInterceptor;
|
||||
import com.das.modules.node.service.impl.NodeMessageServiceImpl;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.socket.config.annotation.EnableWebSocket;
|
||||
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
|
||||
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
|
||||
@ -12,15 +11,13 @@ import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry
|
||||
@EnableWebSocket
|
||||
@Configuration
|
||||
public class WebsocketConfig implements WebSocketConfigurer {
|
||||
@Autowired
|
||||
NodeHandshakeInterceptor nodeHandshakeInterceptor;
|
||||
|
||||
@Autowired
|
||||
NodeMessageHandler nodeMessageHandler;
|
||||
NodeMessageServiceImpl nodeMessageService;
|
||||
@Override
|
||||
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
|
||||
registry.addHandler(nodeMessageHandler, "/node/{nodeId}/{version}")
|
||||
registry.addHandler(nodeMessageService, "/node/{nodeId}/{version}")
|
||||
.setAllowedOrigins("*")
|
||||
.addInterceptors(nodeHandshakeInterceptor);
|
||||
.addInterceptors(new NodeWebsocketHandshakeInterceptor());
|
||||
}
|
||||
}
|
||||
|
@ -1,11 +0,0 @@
|
||||
package com.das.modules.node.disruptor;
|
||||
|
||||
import com.das.modules.node.domain.bo.TerminalMessage;
|
||||
import com.lmax.disruptor.EventFactory;
|
||||
|
||||
public class MessageEventFactory implements EventFactory<TerminalMessage> {
|
||||
@Override
|
||||
public TerminalMessage newInstance() {
|
||||
return TerminalMessage.builder().build();
|
||||
}
|
||||
}
|
@ -1,57 +0,0 @@
|
||||
package com.das.modules.node.disruptor;
|
||||
|
||||
import com.das.common.utils.SpringUtils;
|
||||
import com.das.modules.node.command.BaseCommand;
|
||||
import com.das.modules.node.domain.bo.TerminalMessage;
|
||||
import com.das.modules.node.handler.NodeMessageHandler;
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.concurrent.*;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class TerminalMessageEventHandler implements EventHandler<TerminalMessage> {
|
||||
|
||||
@Autowired
|
||||
NodeMessageHandler nodeMessageHandler;
|
||||
|
||||
private ConcurrentHashMap<String, CompletableFuture<TerminalMessage>> callbackMap = new ConcurrentHashMap<>(16);
|
||||
@Override
|
||||
public void onEvent(TerminalMessage terminalMessage, long sequence, boolean endOfBatch) throws Exception {
|
||||
// log.info("收到消息: {}", terminalMessage.toJsonString());
|
||||
if (callbackMap.containsKey(terminalMessage.getCmdId())){
|
||||
//如果是回调函数,推送到回调函数
|
||||
callbackMap.get(terminalMessage.getCmdId()).complete(terminalMessage);
|
||||
} else{
|
||||
String cmd = terminalMessage.getCmd();
|
||||
BaseCommand commander = null;
|
||||
try {
|
||||
commander = SpringUtils.getBean(cmd);
|
||||
} catch (Exception e) {
|
||||
log.debug("当前未找到执行command容器");
|
||||
}
|
||||
if (commander != null) {
|
||||
try {
|
||||
commander.doCommand(terminalMessage);
|
||||
} catch (Exception ex) {
|
||||
log.error(String.format("命令 - %s 处理失败", cmd), ex);
|
||||
}
|
||||
} else {
|
||||
log.error("命令[{}]无效, 未发现实现适配器!", cmd);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void sendTerminalMessageWithResult(Long nodeId, TerminalMessage message) throws ExecutionException, InterruptedException, TimeoutException {
|
||||
nodeMessageHandler.sendActionMessage(nodeId, message);
|
||||
|
||||
CompletableFuture<TerminalMessage> future = new CompletableFuture<>();
|
||||
callbackMap.put(message.getCmdId(), future);
|
||||
|
||||
TerminalMessage result = future.get(10, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
@ -0,0 +1,32 @@
|
||||
package com.das.modules.node.disruptor;
|
||||
|
||||
import com.das.common.utils.SpringUtils;
|
||||
import com.das.modules.node.command.BaseCommand;
|
||||
import com.das.modules.node.domain.bo.TerminalMessage;
|
||||
import com.lmax.disruptor.WorkHandler;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class TerminalMessageWorkerHandler implements WorkHandler<TerminalMessage> {
|
||||
|
||||
|
||||
@Override
|
||||
public void onEvent(TerminalMessage event) throws Exception {
|
||||
String cmd = event.getCmd();
|
||||
BaseCommand commander = null;
|
||||
try {
|
||||
commander = SpringUtils.getBean(cmd);
|
||||
} catch (Exception e) {
|
||||
log.debug("当前未找到执行command容器");
|
||||
}
|
||||
if (commander != null) {
|
||||
try {
|
||||
commander.doCommand(event);
|
||||
} catch (Exception ex) {
|
||||
log.error(String.format("命令 - %s 处理失败", cmd), ex);
|
||||
}
|
||||
} else {
|
||||
log.error("命令[{}]无效, 未发现实现适配器!", cmd);
|
||||
}
|
||||
}
|
||||
}
|
@ -6,8 +6,8 @@ import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Data
|
||||
public class TerminalMessage {
|
||||
|
@ -1,150 +0,0 @@
|
||||
package com.das.modules.node.handler;
|
||||
|
||||
import com.das.common.utils.JsonUtils;
|
||||
import com.das.common.utils.SpringUtil;
|
||||
import com.das.modules.node.constant.NodeConstant;
|
||||
import com.das.modules.node.domain.bo.TerminalMessage;
|
||||
import com.das.modules.node.service.NodeMessageService;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.socket.*;
|
||||
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
|
||||
import org.springframework.web.socket.handler.TextWebSocketHandler;
|
||||
|
||||
import java.util.concurrent.*;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public class NodeMessageHandler extends TextWebSocketHandler {
|
||||
|
||||
public static final Long HEARTBEAT_TIMEOUT = 1000 * 60 * 1L;
|
||||
private ConcurrentHashMap<Long, ConcurrentWebSocketSessionDecorator> onlineSessions = new ConcurrentHashMap<>(16);
|
||||
|
||||
private NodeMessageService nodeMessageService;
|
||||
@Override
|
||||
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
|
||||
String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, "");
|
||||
Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID);
|
||||
Long version = (Long)session.getAttributes().get(NodeConstant.VERSION);
|
||||
long time = System.currentTimeMillis();
|
||||
log.debug("IP: {} 请求连接. sessionId: {}", remoteIp, session.getId());
|
||||
if (onlineSessions.containsKey(nodeId)){
|
||||
//如果终端节点已在线,则拒绝新的终端连接
|
||||
try {
|
||||
session.close(CloseStatus.NOT_ACCEPTABLE);
|
||||
}
|
||||
catch (Exception ignore){}
|
||||
}
|
||||
else {
|
||||
log.info("IP: {} 准许连接, NodeId:{}, Version: {}, sessionId: {}", remoteIp, nodeId, version, session.getId());
|
||||
onlineSessions.put(nodeId, new ConcurrentWebSocketSessionDecorator(session, 5 * 1000, 2 * 1024 * 1024));
|
||||
}
|
||||
|
||||
// 如果version是0,则需要调用一次configUpdate配置更新
|
||||
if (version == 0){
|
||||
if (nodeMessageService == null){
|
||||
nodeMessageService = SpringUtil.getBean(NodeMessageService.class);
|
||||
}
|
||||
nodeMessageService.sendTerminalConfig(Long.valueOf(nodeId));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
|
||||
TerminalMessage msg = JsonUtils.parseObject(message.getPayload(), TerminalMessage.class);
|
||||
String nodeId = session.getAttributes().get(NodeConstant.NODE_ID).toString();
|
||||
String cmd = msg.getCmd();
|
||||
JsonNode data = msg.getData();
|
||||
log.info("收到 Node:{} 命令: {}", nodeId, cmd);
|
||||
if (nodeMessageService == null){
|
||||
nodeMessageService = SpringUtil.getBean(NodeMessageService.class);
|
||||
}
|
||||
if (nodeMessageService != null){
|
||||
nodeMessageService.pushMessage(msg);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
|
||||
Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID);
|
||||
log.info("收到 Node:{} Pong Message", nodeId);
|
||||
session.getAttributes().put(NodeConstant.LAST_PONG_TIME, System.currentTimeMillis());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
|
||||
log.error(String.format("通讯异常: NodeId: %s", session.getAttributes().get(NodeConstant.NODE_ID)), exception);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
|
||||
log.info("IP: {} 已断开连接, NodeId:{}, Version: {}, sessionId:{}, 原因: {}",
|
||||
session.getAttributes().get(NodeConstant.REMOTE_IP),
|
||||
session.getAttributes().get(NodeConstant.NODE_ID),
|
||||
session.getAttributes().get(NodeConstant.VERSION),
|
||||
session.getId(),
|
||||
status.toString());
|
||||
Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID);
|
||||
onlineSessions.remove(nodeId);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 定时发送心跳报文,并清理离线的终端
|
||||
*/
|
||||
@Scheduled(cron = "0/15 * * * * ?")
|
||||
public void sendHeartbeat(){
|
||||
for (ConcurrentWebSocketSessionDecorator session : onlineSessions.values()) {
|
||||
//判断心跳是否超时,超时则主动断开连接
|
||||
Long lastPongTime = (Long) session.getAttributes().get(NodeConstant.LAST_PONG_TIME);
|
||||
if (lastPongTime != null && ((System.currentTimeMillis() - lastPongTime) > HEARTBEAT_TIMEOUT)){
|
||||
closeSession(session);
|
||||
return;
|
||||
}
|
||||
SendPingMessage(session);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void closeSession(WebSocketSession session){
|
||||
try{
|
||||
session.close(CloseStatus.NO_CLOSE_FRAME);
|
||||
}
|
||||
catch (Exception ignore){}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送ping消息
|
||||
* @param session
|
||||
*/
|
||||
private void SendPingMessage(ConcurrentWebSocketSessionDecorator session){
|
||||
try {
|
||||
session.sendMessage(new PingMessage());
|
||||
}
|
||||
catch (Exception ignore){}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送无返回值消息
|
||||
* @param nodeId
|
||||
*/
|
||||
public void sendActionMessage(Long nodeId, TerminalMessage message){
|
||||
ConcurrentWebSocketSessionDecorator session = onlineSessions.get(nodeId);
|
||||
if (session != null){
|
||||
try {
|
||||
session.sendMessage(new TextMessage(message.toJsonString()));
|
||||
log.info("发送的消息为:{}", message.toJsonString());
|
||||
}
|
||||
catch (Exception exception){
|
||||
log.error(String.format("发送消息失败: NodeId: %s", nodeId), exception);
|
||||
closeSession(session);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -7,10 +7,8 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.server.ServerHttpRequest;
|
||||
import org.springframework.http.server.ServerHttpResponse;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.web.socket.WebSocketHandler;
|
||||
import org.springframework.web.socket.server.HandshakeInterceptor;
|
||||
import org.springframework.web.util.UriComponentsBuilder;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
@ -22,8 +20,7 @@ import java.util.Map;
|
||||
* Websocket握手拦截器
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class NodeHandshakeInterceptor implements HandshakeInterceptor {
|
||||
public class NodeWebsocketHandshakeInterceptor implements HandshakeInterceptor {
|
||||
|
||||
public String getRealIp(ServerHttpRequest request) {
|
||||
|
@ -3,12 +3,19 @@ package com.das.modules.node.service;
|
||||
import com.das.modules.node.domain.bo.CalculateRTData;
|
||||
import com.das.modules.node.domain.bo.TerminalMessage;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import org.springframework.web.socket.CloseStatus;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* 节点消息处理服务
|
||||
*/
|
||||
public interface NodeMessageService {
|
||||
|
||||
void pushMessage(TerminalMessage msg);
|
||||
|
||||
JsonNode sendTerminalConfig(Long nodeId);
|
||||
|
||||
@ -19,4 +26,7 @@ public interface NodeMessageService {
|
||||
void handleLowSpeed(TerminalMessage data);
|
||||
|
||||
void handleDeviceEvent(TerminalMessage data);
|
||||
|
||||
TerminalMessage sendTerminalMessageWithResult(Long nodeId, TerminalMessage message) throws ExecutionException, InterruptedException, TimeoutException;
|
||||
TerminalMessage sendTerminalMessageWithResult(Long nodeId, TerminalMessage message, long timeout) throws ExecutionException, InterruptedException, TimeoutException;
|
||||
}
|
||||
|
@ -1,64 +1,76 @@
|
||||
package com.das.modules.node.service.impl;
|
||||
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
|
||||
import com.das.common.constant.MeasType;
|
||||
import com.das.common.utils.AdminRedisTemplate;
|
||||
import com.das.common.utils.JsonUtils;
|
||||
import com.das.common.utils.StringUtils;
|
||||
import com.das.modules.cache.domain.DeviceInfoCache;
|
||||
import com.das.modules.cache.service.CacheService;
|
||||
import com.das.modules.data.domain.DeviceEventInfo;
|
||||
import com.das.modules.data.service.TDEngineService;
|
||||
import com.das.modules.data.service.impl.DataServiceImpl;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import com.das.common.constant.MeasType;
|
||||
import com.das.common.utils.AdminRedisTemplate;
|
||||
import com.das.modules.equipment.mapper.SysIotModelMapper;
|
||||
import com.das.modules.node.disruptor.MessageEventFactory;
|
||||
import com.das.modules.node.disruptor.TerminalMessageEventHandler;
|
||||
import com.das.modules.node.constant.NodeConstant;
|
||||
import com.das.modules.node.disruptor.TerminalMessageWorkerHandler;
|
||||
import com.das.modules.node.domain.bo.RTData;
|
||||
import com.das.modules.node.domain.bo.TerminalMessage;
|
||||
import com.das.modules.node.domain.vo.*;
|
||||
import com.das.modules.node.handler.NodeMessageHandler;
|
||||
import com.das.modules.node.mapper.SysCommunicationLinkMapper;
|
||||
import com.das.modules.node.mapper.SysImpTabMappingMapper;
|
||||
import com.das.modules.node.service.NodeMessageService;
|
||||
import com.das.modules.data.service.TDEngineService;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
import com.lmax.disruptor.YieldingWaitStrategy;
|
||||
import com.lmax.disruptor.dsl.Disruptor;
|
||||
import com.lmax.disruptor.dsl.ProducerType;
|
||||
import com.lmax.disruptor.util.DaemonThreadFactory;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.web.socket.*;
|
||||
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
|
||||
import org.springframework.web.socket.handler.TextWebSocketHandler;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
public class NodeMessageServiceImpl implements NodeMessageService {
|
||||
public class NodeMessageServiceImpl extends TextWebSocketHandler implements NodeMessageService {
|
||||
|
||||
public static final Long HEARTBEAT_TIMEOUT = 1000 * 60 * 1L;
|
||||
|
||||
/**
|
||||
* JSON 转换器
|
||||
*/
|
||||
private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
|
||||
|
||||
/**
|
||||
* websocket 会话集合
|
||||
*/
|
||||
private final ConcurrentHashMap<Long, ConcurrentWebSocketSessionDecorator> onlineWSSessions = new ConcurrentHashMap<>(16);
|
||||
|
||||
private ConcurrentHashMap<String, CompletableFuture<TerminalMessage>> responseCallback = new ConcurrentHashMap<>(16);
|
||||
|
||||
private Disruptor<TerminalMessage> disruptor = null;
|
||||
|
||||
private RingBuffer<TerminalMessage> ringBuffer = null;
|
||||
|
||||
@Resource
|
||||
TerminalMessageEventHandler terminalMessageEventHandler;
|
||||
|
||||
@Resource
|
||||
SysCommunicationLinkMapper sysCommunicationLinkMapper;
|
||||
|
||||
@Resource
|
||||
SysImpTabMappingMapper sysImptabmappingMapper;
|
||||
|
||||
@Resource
|
||||
private NodeMessageHandler nodeMessageHandler;
|
||||
|
||||
@Autowired
|
||||
AdminRedisTemplate adminRedisTemplate;
|
||||
|
||||
@ -79,27 +91,27 @@ public class NodeMessageServiceImpl implements NodeMessageService {
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
//初始化高性能队列
|
||||
Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
|
||||
MessageEventFactory factory = new MessageEventFactory();
|
||||
Disruptor<TerminalMessage> disruptor = new Disruptor<>(factory, 1024 * 256, executor);
|
||||
disruptor.handleEventsWith(terminalMessageEventHandler);
|
||||
int cpu = Runtime.getRuntime().availableProcessors();
|
||||
int bufferSize = 1024 * 4;
|
||||
disruptor = new Disruptor<>(TerminalMessage::new, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, new YieldingWaitStrategy());
|
||||
//
|
||||
TerminalMessageWorkerHandler[] workerHandlers = new TerminalMessageWorkerHandler[cpu];
|
||||
for (int i = 0; i < cpu; i++) {
|
||||
workerHandlers[i] = new TerminalMessageWorkerHandler();
|
||||
}
|
||||
disruptor.handleEventsWithWorkerPool(workerHandlers);
|
||||
disruptor.start();
|
||||
ringBuffer = disruptor.getRingBuffer();
|
||||
}
|
||||
|
||||
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
if (ringBuffer != null) {
|
||||
ringBuffer = null;
|
||||
}
|
||||
if (disruptor != null) {
|
||||
disruptor.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushMessage(TerminalMessage msg) {
|
||||
RingBuffer<TerminalMessage> ringBuffer = disruptor.getRingBuffer();
|
||||
if (ringBuffer == null) {
|
||||
return;
|
||||
}
|
||||
@ -212,7 +224,7 @@ public class NodeMessageServiceImpl implements NodeMessageService {
|
||||
.time(time)
|
||||
.data(jsonNode)
|
||||
.build();
|
||||
nodeMessageHandler.sendActionMessage(nodeId, configUpdate);
|
||||
sendActionMessage(nodeId, configUpdate);
|
||||
return jsonNode;
|
||||
}
|
||||
|
||||
@ -347,4 +359,156 @@ public class NodeMessageServiceImpl implements NodeMessageService {
|
||||
default -> null;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
|
||||
String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, "");
|
||||
Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID);
|
||||
Long version = (Long)session.getAttributes().get(NodeConstant.VERSION);
|
||||
if (nodeId == null || version == null) {
|
||||
log.warn("检测到非法连接请求, IP: {}", remoteIp);
|
||||
try {
|
||||
session.close(CloseStatus.NOT_ACCEPTABLE);
|
||||
} catch (IOException ignored) {
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (onlineWSSessions.contains(nodeId)){
|
||||
log.warn("检测到同一节点连接请求,已断开. NodeId: {}, IP:{}", nodeId, remoteIp);
|
||||
try {
|
||||
session.close(CloseStatus.NOT_ACCEPTABLE);
|
||||
} catch (IOException ignored) {
|
||||
}
|
||||
return;
|
||||
}
|
||||
ConcurrentWebSocketSessionDecorator concurrentWebSocketSessionDecorator = new ConcurrentWebSocketSessionDecorator(session, 5 * 1000, 2 * 1024 * 1024);
|
||||
onlineWSSessions.put(nodeId, concurrentWebSocketSessionDecorator);
|
||||
|
||||
//如果采集程序的版本是0,则直接下发当前配置。
|
||||
if (version == 0){
|
||||
sendTerminalConfig(nodeId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 收到节点Websocket报文回调函数
|
||||
* @param session websocket session
|
||||
* @param message 报文内容
|
||||
* @throws Exception
|
||||
*/
|
||||
@Override
|
||||
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
|
||||
TerminalMessage msg = JsonUtils.parseObject(message.getPayload(), TerminalMessage.class);
|
||||
if (msg == null) {
|
||||
log.warn("收到非法报文:{}", message.getPayload());
|
||||
return;
|
||||
}
|
||||
//如果是应答报文,跳过队列,直接异步返回
|
||||
if (responseCallback.contains(msg.getCmdId())){
|
||||
responseCallback.get(msg.getCmdId()).complete(msg);
|
||||
}
|
||||
else{
|
||||
//如果是主动请求报文,加入队列,等待处理
|
||||
String nodeId = session.getAttributes().get(NodeConstant.NODE_ID).toString();
|
||||
String cmd = msg.getCmd();
|
||||
JsonNode data = msg.getData();
|
||||
log.debug("收到 Node:{} WS 报文: {}", nodeId, cmd);
|
||||
pushMessage(msg);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
|
||||
Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID);
|
||||
log.info("收到 Node:{} Pong Message", nodeId);
|
||||
session.getAttributes().put(NodeConstant.LAST_PONG_TIME, System.currentTimeMillis());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
|
||||
String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, "");
|
||||
Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID);
|
||||
Long version = (Long)session.getAttributes().get(NodeConstant.VERSION);
|
||||
log.error(String.format("IP: %s 通讯异常, NodeId:%d, Version: %d", remoteIp, nodeId, version), exception);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
|
||||
String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, "");
|
||||
Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID);
|
||||
Long version = (Long)session.getAttributes().get(NodeConstant.VERSION);
|
||||
log.info("IP: {} 已断开连接, NodeId:{}, Version: {}, sessionId:{}, 原因: {}",
|
||||
remoteIp,
|
||||
nodeId,
|
||||
version,
|
||||
session.getId(),
|
||||
status.toString());
|
||||
onlineWSSessions.remove(nodeId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 定时发送心跳报文,并清理离线的终端
|
||||
*/
|
||||
@Scheduled(cron = "0/15 * * * * ?")
|
||||
public void sendHeartbeat(){
|
||||
for (ConcurrentWebSocketSessionDecorator session : onlineWSSessions.values()) {
|
||||
//判断心跳是否超时,超时则主动断开连接
|
||||
Long lastPongTime = (Long) session.getAttributes().get(NodeConstant.LAST_PONG_TIME);
|
||||
if (lastPongTime != null && ((System.currentTimeMillis() - lastPongTime) > HEARTBEAT_TIMEOUT)){
|
||||
closeSession(session);
|
||||
return;
|
||||
}
|
||||
SendPingMessage(session);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送ping消息
|
||||
* @param session
|
||||
*/
|
||||
private void SendPingMessage(ConcurrentWebSocketSessionDecorator session){
|
||||
try {
|
||||
session.sendMessage(new PingMessage());
|
||||
}
|
||||
catch (Exception ignore){}
|
||||
}
|
||||
|
||||
private void closeSession(WebSocketSession session){
|
||||
try{
|
||||
session.close(CloseStatus.NO_CLOSE_FRAME);
|
||||
}
|
||||
catch (Exception ignore){}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送无返回值消息
|
||||
* @param nodeId
|
||||
*/
|
||||
public void sendActionMessage(Long nodeId, TerminalMessage message){
|
||||
ConcurrentWebSocketSessionDecorator session = onlineWSSessions.get(nodeId);
|
||||
if (session != null){
|
||||
try {
|
||||
session.sendMessage(new TextMessage(message.toJsonString()));
|
||||
log.info("发送的消息为:{}", message.toJsonString());
|
||||
}
|
||||
catch (Exception exception){
|
||||
log.error(String.format("发送消息失败: NodeId: %s", nodeId), exception);
|
||||
closeSession(session);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public TerminalMessage sendTerminalMessageWithResult(Long nodeId, TerminalMessage message) throws ExecutionException, InterruptedException, TimeoutException {
|
||||
return sendTerminalMessageWithResult(nodeId,message, 10);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TerminalMessage sendTerminalMessageWithResult(Long nodeId, TerminalMessage message, long timeout) throws ExecutionException, InterruptedException, TimeoutException {
|
||||
sendActionMessage(nodeId, message);
|
||||
CompletableFuture<TerminalMessage> future = new CompletableFuture<>();
|
||||
responseCallback.put(message.getCmdId(), future);
|
||||
return future.get(timeout, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
@ -2,11 +2,9 @@ package com.das.modules.node.service.impl;
|
||||
|
||||
import cn.dev33.satoken.stp.StpUtil;
|
||||
import cn.hutool.core.io.IoUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.das.common.config.SessionUtil;
|
||||
import com.das.common.constant.MeasType;
|
||||
import com.das.common.exceptions.ServiceException;
|
||||
import com.das.common.utils.BeanCopyUtils;
|
||||
import com.das.common.utils.PageDataInfo;
|
||||
import com.das.common.utils.PageQuery;
|
||||
@ -19,11 +17,14 @@ import com.das.modules.equipment.domain.vo.SysIotModelServiceVo;
|
||||
import com.das.modules.equipment.mapper.SysEquipmentMapper;
|
||||
import com.das.modules.equipment.mapper.SysIotModelFieldMapper;
|
||||
import com.das.modules.equipment.mapper.SysIotModelServiceMapper;
|
||||
import com.das.modules.node.constant.NodeConstant;
|
||||
import com.das.modules.node.disruptor.TerminalMessageEventHandler;
|
||||
import com.das.modules.node.domain.bo.TerminalMessage;
|
||||
import com.das.modules.node.domain.dto.*;
|
||||
import com.das.modules.node.domain.vo.*;
|
||||
import com.das.modules.node.domain.dto.BindEquipmentInfoDto;
|
||||
import com.das.modules.node.domain.dto.QueryTabMappingParamDto;
|
||||
import com.das.modules.node.domain.dto.SysCommunicationLinkDto;
|
||||
import com.das.modules.node.domain.dto.SysNodeDto;
|
||||
import com.das.modules.node.domain.vo.EquipmentVo;
|
||||
import com.das.modules.node.domain.vo.SysCommunicationLinkVo;
|
||||
import com.das.modules.node.domain.vo.SysNodeVo;
|
||||
import com.das.modules.node.domain.vo.SysTabMappingVo;
|
||||
import com.das.modules.node.entity.SysCommunicationLink;
|
||||
import com.das.modules.node.entity.SysNode;
|
||||
import com.das.modules.node.entity.SysTabMapping;
|
||||
@ -32,7 +33,6 @@ import com.das.modules.node.mapper.SysImpTabMappingMapper;
|
||||
import com.das.modules.node.mapper.SysNodeMapper;
|
||||
import com.das.modules.node.service.SysNodeService;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import jakarta.servlet.http.HttpServletResponse;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -42,7 +42,10 @@ import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
import java.io.*;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
|
||||
@ -74,9 +77,6 @@ public class SysNodeServiceImpl implements SysNodeService {
|
||||
@Autowired
|
||||
SysIotModelServiceMapper iotModelServiceMapper;
|
||||
|
||||
@Autowired
|
||||
TerminalMessageEventHandler terminalMessageEventHandler;
|
||||
|
||||
|
||||
@Override
|
||||
public List<SysNodeVo> querySysNodeList() {
|
||||
|
@ -8,10 +8,10 @@ import com.das.common.exceptions.ServiceException;
|
||||
import com.das.common.utils.AdminRedisTemplate;
|
||||
import com.das.modules.auth.domain.vo.SysUserVo;
|
||||
import com.das.modules.node.constant.NodeConstant;
|
||||
import com.das.modules.node.disruptor.TerminalMessageEventHandler;
|
||||
import com.das.modules.node.domain.bo.TerminalMessage;
|
||||
import com.das.modules.node.domain.vo.SysNodeVo;
|
||||
import com.das.modules.node.mapper.SysNodeMapper;
|
||||
import com.das.modules.node.service.NodeMessageService;
|
||||
import com.das.modules.operation.domain.dto.CommandInfoDto;
|
||||
import com.das.modules.operation.entity.SysManualStatus;
|
||||
import com.das.modules.operation.entity.SysOperationLog;
|
||||
@ -49,10 +49,10 @@ public class OperationService {
|
||||
private SysOperationLogMapper sysOperationLogMapper;
|
||||
|
||||
@Autowired
|
||||
TerminalMessageEventHandler terminalMessageEventHandler;
|
||||
AdminRedisTemplate adminRedisTemplate;
|
||||
|
||||
@Autowired
|
||||
AdminRedisTemplate adminRedisTemplate;
|
||||
NodeMessageService nodeMessageService;
|
||||
|
||||
|
||||
|
||||
@ -127,7 +127,7 @@ public class OperationService {
|
||||
.time(time)
|
||||
.data(jsonNode)
|
||||
.build();
|
||||
terminalMessageEventHandler.sendTerminalMessageWithResult(activeNodeId, configUpdate);
|
||||
nodeMessageService.sendTerminalMessageWithResult(activeNodeId, configUpdate);
|
||||
} catch (Exception e) {
|
||||
throw new ServiceException("设备控制失败 "+ e);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user