websocket并发发送消息

This commit is contained in:
谷成伟 2024-10-22 13:10:22 +08:00
parent 3779033bc9
commit ae484ed814
4 changed files with 11 additions and 31 deletions

View File

@ -14,15 +14,10 @@ import com.fasterxml.jackson.databind.JsonNode;
import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse; import jakarta.servlet.http.HttpServletResponse;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartFile;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -91,8 +86,8 @@ public class SysNodeController {
/** 配置下发 */ /** 配置下发 */
@PostMapping("/configUpdate") @PostMapping("/configUpdate")
public R<?> configUpdate() { public R<?> configUpdate() {
JsonNode configUpdateData = dataService.getConfigUpdateInfo(Long.valueOf(1)); dataService.sendTerminalConfig(Long.valueOf(1));
return R.success(configUpdateData); return R.success();
} }

View File

@ -1,8 +1,5 @@
package com.das.modules.node.handler; package com.das.modules.node.handler;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import com.das.common.utils.JsonUtils; import com.das.common.utils.JsonUtils;
import com.das.common.utils.SpringUtil; import com.das.common.utils.SpringUtil;
import com.das.modules.node.constant.NodeConstant; import com.das.modules.node.constant.NodeConstant;
@ -10,8 +7,6 @@ import com.das.modules.node.domain.bo.TerminalMessage;
import com.das.modules.node.service.DataService; import com.das.modules.node.service.DataService;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.BeanFactoryUtils;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.socket.*; import org.springframework.web.socket.*;
@ -47,16 +42,18 @@ public class NodeMessageHandler extends TextWebSocketHandler {
onlineSessions.put(nodeId, new ConcurrentWebSocketSessionDecorator(session, 5 * 1000, 2 * 1024 * 1024)); onlineSessions.put(nodeId, new ConcurrentWebSocketSessionDecorator(session, 5 * 1000, 2 * 1024 * 1024));
} }
// 如果version是0则需要调用一次configUpdate配置更新
if (version == 0){
dataService.sendTerminalConfig(Long.valueOf(nodeId));
}
} }
@Override @Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
TerminalMessage msg = JsonUtils.parseObject(message.getPayload(), TerminalMessage.class); TerminalMessage msg = JsonUtils.parseObject(message.getPayload(), TerminalMessage.class);
String nodeId = session.getAttributes().get(NodeConstant.NODE_ID).toString(); String nodeId = session.getAttributes().get(NodeConstant.NODE_ID).toString();
Long version = (Long)session.getAttributes().get(NodeConstant.VERSION);
String cmdId = msg.getCmdId();
String cmd = msg.getCmd(); String cmd = msg.getCmd();
long time = msg.getTime();
JsonNode data = msg.getData(); JsonNode data = msg.getData();
log.info("收到 Node:{} 命令: {}", nodeId, cmd); log.info("收到 Node:{} 命令: {}", nodeId, cmd);
log.debug("内容: {}", data.toString()); log.debug("内容: {}", data.toString());
@ -66,21 +63,8 @@ public class NodeMessageHandler extends TextWebSocketHandler {
if (dataService != null){ if (dataService != null){
dataService.pushMessage(msg); dataService.pushMessage(msg);
} }
// 如果version是0则需要调用一次configUpdate配置更新
if (version == 0){
JsonNode configUpdateData = dataService.getConfigUpdateInfo(Long.valueOf(nodeId));
TerminalMessage configUpdate = TerminalMessage.builder()
.cmd("configUpdate")
.cmdId(nodeId)
.time(time)
.data(configUpdateData)
.build();
ThreadUtil.execute(()->{
sendActionMessage(Long.valueOf(nodeId), configUpdate);
});
} }
}
@Override @Override
protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {

View File

@ -7,7 +7,7 @@ public interface DataService {
void pushMessage(TerminalMessage msg); void pushMessage(TerminalMessage msg);
JsonNode getConfigUpdateInfo(Long nodeId); JsonNode sendTerminalConfig(Long nodeId);
void createTdStable(); void createTdStable();
void handleData(TerminalMessage data); void handleData(TerminalMessage data);

View File

@ -1,5 +1,6 @@
package com.das.modules.node.service.impl; package com.das.modules.node.service.impl;
import cn.hutool.core.util.IdUtil;
import com.das.common.constant.BaseIotModelType; import com.das.common.constant.BaseIotModelType;
import com.das.common.utils.AdminRedisTemplate; import com.das.common.utils.AdminRedisTemplate;
import com.das.modules.equipment.domain.vo.IotModelFieldVo; import com.das.modules.equipment.domain.vo.IotModelFieldVo;
@ -109,7 +110,7 @@ public class DataServiceImpl implements DataService {
} }
@Override @Override
public JsonNode getConfigUpdateInfo(Long nodeId) { public JsonNode sendTerminalConfig(Long nodeId) {
ConfigUpdateVo configUpdateVo = new ConfigUpdateVo(); ConfigUpdateVo configUpdateVo = new ConfigUpdateVo();
List<LinkVo> links = new ArrayList<>(); List<LinkVo> links = new ArrayList<>();
List<EquipmentVo> equipments = new ArrayList<>(); List<EquipmentVo> equipments = new ArrayList<>();
@ -203,7 +204,7 @@ public class DataServiceImpl implements DataService {
TerminalMessage configUpdate = TerminalMessage.builder() TerminalMessage configUpdate = TerminalMessage.builder()
.cmd("configUpdate") .cmd("configUpdate")
.cmdId(String.valueOf(nodeId)) .cmdId(IdUtil.nanoId())
.time(time) .time(time)
.data(jsonNode) .data(jsonNode)
.build(); .build();