更新心跳逻辑
This commit is contained in:
parent
3eb790acdb
commit
6f3ed755f2
@ -0,0 +1,59 @@
|
|||||||
|
package com.das.modules.calc.functions;
|
||||||
|
|
||||||
|
import com.das.common.utils.AdminRedisTemplate;
|
||||||
|
import com.das.modules.cache.domain.DeviceInfoCache;
|
||||||
|
import com.das.modules.cache.service.CacheService;
|
||||||
|
import com.das.modules.data.service.DataService;
|
||||||
|
import com.googlecode.aviator.runtime.function.AbstractFunction;
|
||||||
|
import com.googlecode.aviator.runtime.type.AviatorBoolean;
|
||||||
|
import com.googlecode.aviator.runtime.type.AviatorNil;
|
||||||
|
import com.googlecode.aviator.runtime.type.AviatorObject;
|
||||||
|
import com.googlecode.aviator.runtime.type.AviatorRuntimeJavaType;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.checkerframework.checker.units.qual.C;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class FunctionIsOnline extends AbstractFunction {
|
||||||
|
private CacheService cacheService;
|
||||||
|
private AdminRedisTemplate redis;
|
||||||
|
public FunctionIsOnline(AdminRedisTemplate redis, CacheService cacheService) {
|
||||||
|
this.redis = redis;
|
||||||
|
this.cacheService = cacheService;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
return "isOnline";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据设备Code判断设备是否在线
|
||||||
|
*
|
||||||
|
* @param env 环境变量映射,包含调用此方法时所需的上下文信息
|
||||||
|
* @param deviceCode 设备Code的AviatorObject表示
|
||||||
|
* @return 如果设备在线,则返回AviatorBoolean.TRUE;否则返回AviatorBoolean.FALSE
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public AviatorObject call(Map<String, Object> env, AviatorObject deviceCode) {
|
||||||
|
//设备Code
|
||||||
|
String code = (String)deviceCode.getValue(env);
|
||||||
|
|
||||||
|
DeviceInfoCache deviceInfoCache = cacheService.getEquipmentCache().getDeviceInfoCacheByCode(code);
|
||||||
|
if (deviceInfoCache == null) {
|
||||||
|
return AviatorBoolean.FALSE;
|
||||||
|
}
|
||||||
|
|
||||||
|
String key = String.format("device:%d:online", deviceInfoCache.getDeviceId());
|
||||||
|
Integer online = redis.get(key);
|
||||||
|
if (online == null || online == 0) {
|
||||||
|
return AviatorBoolean.FALSE;
|
||||||
|
}
|
||||||
|
//未找到缓存,查询时序API获取数据
|
||||||
|
return AviatorBoolean.TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -18,7 +18,7 @@ import java.text.MessageFormat;
|
|||||||
@Service(value = NodeConstant.HEARTBEAT)
|
@Service(value = NodeConstant.HEARTBEAT)
|
||||||
public class HeartbeatCommand implements BaseCommand{
|
public class HeartbeatCommand implements BaseCommand{
|
||||||
|
|
||||||
public static final String HEART_BEAT = "heartBeat:{0}";
|
public static final long HEARTBEAT_TTL = 60L;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
AdminRedisTemplate adminRedisTemplate;
|
AdminRedisTemplate adminRedisTemplate;
|
||||||
@ -29,44 +29,39 @@ public class HeartbeatCommand implements BaseCommand{
|
|||||||
public void doCommand(TerminalMessage data) {
|
public void doCommand(TerminalMessage data) {
|
||||||
JsonNode dataInfo = data.getData();
|
JsonNode dataInfo = data.getData();
|
||||||
if (!dataInfo.isEmpty()) {
|
if (!dataInfo.isEmpty()) {
|
||||||
JsonNode linkNode = data.getData().get("links");
|
JsonNode links = data.getData().get("links");
|
||||||
if (linkNode != null && linkNode.isArray()) {
|
if (links != null && links.isArray()) {
|
||||||
for (JsonNode fruitNode : linkNode) {
|
for (JsonNode linkNode : links) {
|
||||||
String linkId = fruitNode.get("linkId").asText();
|
String linkId = linkNode.get("linkId").asText();
|
||||||
String online = fruitNode.get("online").asText();
|
boolean online = linkNode.get("online").asBoolean();
|
||||||
String key = MessageFormat.format(HEART_BEAT, linkId);
|
String key = String.format("link:%d:online", linkId, online);
|
||||||
if (StringUtils.isEmpty(online)) {
|
adminRedisTemplate.set(key, online ? 1 : 0);
|
||||||
adminRedisTemplate.set(key, 0);
|
adminRedisTemplate.expire(key, HEARTBEAT_TTL);
|
||||||
} else {
|
|
||||||
if ("true".equals(online)) {
|
|
||||||
adminRedisTemplate.set(key, 1);
|
|
||||||
} else {
|
|
||||||
adminRedisTemplate.set(key, 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
adminRedisTemplate.expire(key, 300L);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
JsonNode devices = data.getData().get("devices");
|
JsonNode devices = data.getData().get("devices");
|
||||||
if (devices != null && devices.isArray()) {
|
if (devices != null && devices.isArray()) {
|
||||||
for (JsonNode device : devices) {
|
for (JsonNode device : devices) {
|
||||||
Long deviceId = device.get("deviceId").asLong();
|
long deviceId = device.get("deviceId").asLong();
|
||||||
int online = device.get("online").asInt();
|
boolean online = device.get("online").asBoolean();
|
||||||
DeviceInfoCache deviceInfoCacheById = cacheService.getEquipmentCache().getDeviceInfoCacheById(deviceId);
|
DeviceInfoCache deviceInfoCacheById = cacheService.getEquipmentCache().getDeviceInfoCacheById(deviceId);
|
||||||
if (deviceInfoCacheById == null || !deviceInfoCacheById.getObjectType().equals(EquipmentTypeIds.EQUIPMENT_TYPE_STATION_WTG)) {
|
if (deviceInfoCacheById == null || !deviceInfoCacheById.getObjectType().equals(EquipmentTypeIds.EQUIPMENT_TYPE_STATION_WTG)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
//判断是不是风机
|
//判断是不是风机
|
||||||
|
String keyDeviceOnline = String.format("device:%d:online", deviceId);
|
||||||
|
adminRedisTemplate.set(keyDeviceOnline, online ? 1 : 0);
|
||||||
|
adminRedisTemplate.expire(keyDeviceOnline, HEARTBEAT_TTL);
|
||||||
|
|
||||||
String keyPLCDeviceStatus = String.format("RT:%d:iturbineoperationmode", deviceId);
|
String keyPLCDeviceStatus = String.format("RT:%d:iturbineoperationmode", deviceId);
|
||||||
String keyCommFaultState = String.format("RT:%d:commfaultstate",deviceId);
|
String keyCommFaultState = String.format("RT:%d:commfaultstate",deviceId);
|
||||||
Integer plcDeviceStatus = adminRedisTemplate.get(keyPLCDeviceStatus);
|
Integer plcDeviceStatus = adminRedisTemplate.get(keyPLCDeviceStatus);
|
||||||
log.debug("设备ID:{},在线状态:{},通讯状态: {}", deviceId, online, plcDeviceStatus);
|
log.debug("设备ID:{},在线状态:{},通讯状态: {}", deviceId, online, plcDeviceStatus);
|
||||||
if (plcDeviceStatus == null){
|
if (plcDeviceStatus == null){
|
||||||
adminRedisTemplate.set(keyCommFaultState, (online == 1) ? 0 : 1);
|
adminRedisTemplate.set(keyCommFaultState, online ? 0 : 1);
|
||||||
}
|
}
|
||||||
else{
|
else{
|
||||||
adminRedisTemplate.set(keyCommFaultState, (online == 1) && (plcDeviceStatus != 0) ? 0 : 1);
|
adminRedisTemplate.set(keyCommFaultState, online && (plcDeviceStatus != 0) ? 0 : 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -33,6 +33,11 @@ public class SysCommunicationLinkDto implements Serializable {
|
|||||||
*/
|
*/
|
||||||
private String params;
|
private String params;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 结果是否包含状态
|
||||||
|
*/
|
||||||
|
private Integer withStatus;
|
||||||
|
|
||||||
private Integer revision;
|
private Integer revision;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -37,6 +37,11 @@ public class SysCommunicationLinkVo {
|
|||||||
*/
|
*/
|
||||||
private String nodeName;
|
private String nodeName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 状态 0 - 正常 1 - 通讯中断
|
||||||
|
*/
|
||||||
|
private Integer status;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 所属系统节点id
|
* 所属系统节点id
|
||||||
*/
|
*/
|
||||||
|
@ -465,7 +465,6 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
|
|||||||
//如果是主动请求报文,加入队列,等待处理
|
//如果是主动请求报文,加入队列,等待处理
|
||||||
String nodeId = session.getAttributes().get(NodeConstant.NODE_ID).toString();
|
String nodeId = session.getAttributes().get(NodeConstant.NODE_ID).toString();
|
||||||
String cmd = msg.getCmd();
|
String cmd = msg.getCmd();
|
||||||
JsonNode data = msg.getData();
|
|
||||||
log.debug("收到 Node:{} WS 报文: {}", nodeId, cmd);
|
log.debug("收到 Node:{} WS 报文: {}", nodeId, cmd);
|
||||||
pushMessage(msg);
|
pushMessage(msg);
|
||||||
}
|
}
|
||||||
@ -487,6 +486,13 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
|
|||||||
log.error(String.format("IP: %s 通讯异常, NodeId:%d, Version: %d", remoteIp, nodeId, version), exception);
|
log.error(String.format("IP: %s 通讯异常, NodeId:%d, Version: %d", remoteIp, nodeId, version), exception);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 当WebSocket连接关闭后,执行该方法。
|
||||||
|
*
|
||||||
|
* @param session 关闭的WebSocket会话对象
|
||||||
|
* @param status 连接关闭的状态信息
|
||||||
|
* @throws Exception 如果在关闭连接的过程中出现异常,则抛出该异常
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
|
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
|
||||||
String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, "");
|
String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, "");
|
||||||
|
@ -2,14 +2,12 @@ package com.das.modules.node.service.impl;
|
|||||||
|
|
||||||
import cn.dev33.satoken.stp.StpUtil;
|
import cn.dev33.satoken.stp.StpUtil;
|
||||||
import cn.hutool.core.io.IoUtil;
|
import cn.hutool.core.io.IoUtil;
|
||||||
|
import cn.hutool.core.util.NumberUtil;
|
||||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||||
import com.das.common.config.SessionUtil;
|
import com.das.common.config.SessionUtil;
|
||||||
import com.das.common.constant.MeasType;
|
import com.das.common.constant.MeasType;
|
||||||
import com.das.common.exceptions.ServiceException;
|
import com.das.common.exceptions.ServiceException;
|
||||||
import com.das.common.utils.BeanCopyUtils;
|
import com.das.common.utils.*;
|
||||||
import com.das.common.utils.PageDataInfo;
|
|
||||||
import com.das.common.utils.PageQuery;
|
|
||||||
import com.das.common.utils.SequenceUtils;
|
|
||||||
import com.das.modules.auth.domain.vo.SysUserVo;
|
import com.das.modules.auth.domain.vo.SysUserVo;
|
||||||
import com.das.modules.auth.entity.SysOrg;
|
import com.das.modules.auth.entity.SysOrg;
|
||||||
import com.das.modules.auth.mapper.SysOrgMapper;
|
import com.das.modules.auth.mapper.SysOrgMapper;
|
||||||
@ -40,6 +38,7 @@ import jakarta.servlet.http.HttpServletRequest;
|
|||||||
import jakarta.servlet.http.HttpServletResponse;
|
import jakarta.servlet.http.HttpServletResponse;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
@ -81,6 +80,8 @@ public class SysNodeServiceImpl implements SysNodeService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
SysIotModelServiceMapper iotModelServiceMapper;
|
SysIotModelServiceMapper iotModelServiceMapper;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
AdminRedisTemplate adminRedisTemplate;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<SysNodeVo> querySysNodeList() {
|
public List<SysNodeVo> querySysNodeList() {
|
||||||
@ -135,13 +136,60 @@ public class SysNodeServiceImpl implements SysNodeService {
|
|||||||
sysNodeMapper.deleteById(id);
|
sysNodeMapper.deleteById(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 查询系统通信链路列表
|
||||||
|
*
|
||||||
|
* @param sysCommunicationLinkDto 通信链路查询DTO,包含分页信息和查询条件
|
||||||
|
* @return 返回包含通信链路列表的PageDataInfo对象
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public PageDataInfo<SysCommunicationLinkVo> querySysCommunicationLinkList(SysCommunicationLinkDto sysCommunicationLinkDto) {
|
public PageDataInfo<SysCommunicationLinkVo> querySysCommunicationLinkList(SysCommunicationLinkDto sysCommunicationLinkDto) {
|
||||||
|
// 初始化分页查询对象
|
||||||
PageQuery pageQuery = new PageQuery();
|
PageQuery pageQuery = new PageQuery();
|
||||||
pageQuery.setPageNum(sysCommunicationLinkDto.getPageNum());
|
pageQuery.setPageNum(sysCommunicationLinkDto.getPageNum());
|
||||||
pageQuery.setPageSize(sysCommunicationLinkDto.getPageSize());
|
pageQuery.setPageSize(sysCommunicationLinkDto.getPageSize());
|
||||||
|
|
||||||
|
// 调用Mapper方法查询通信链路列表
|
||||||
IPage<SysCommunicationLinkVo> iPage = sysCommunicationLinkMapper.querySysCommunicationLinkList(pageQuery.build(), sysCommunicationLinkDto);
|
IPage<SysCommunicationLinkVo> iPage = sysCommunicationLinkMapper.querySysCommunicationLinkList(pageQuery.build(), sysCommunicationLinkDto);
|
||||||
return PageDataInfo.build(iPage.getRecords(), iPage.getTotal());
|
|
||||||
|
// 获取查询结果列表
|
||||||
|
List<SysCommunicationLinkVo> records = iPage.getRecords();
|
||||||
|
|
||||||
|
// 根据条件判断是否需要查询在线状态
|
||||||
|
if (sysCommunicationLinkDto.getWithStatus() != null && sysCommunicationLinkDto.getWithStatus() == 1) {
|
||||||
|
// 初始化用于存储在线状态键的列表
|
||||||
|
List<String> keys = new ArrayList<>(records.size());
|
||||||
|
|
||||||
|
// 遍历查询结果,构造每个通信链路的在线状态键
|
||||||
|
for (int i = 0; i < records.size(); i++) {
|
||||||
|
SysCommunicationLinkVo sysCommunicationLinkVo = records.get(i);
|
||||||
|
String onlineKey = String.format("link:%d:online", sysCommunicationLinkVo.getId());
|
||||||
|
keys.add(onlineKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 批量获取在线状态值
|
||||||
|
List<Object> values = adminRedisTemplate.mGet(keys);
|
||||||
|
|
||||||
|
// 遍历在线状态值,更新通信链路的在线状态
|
||||||
|
for (int i = 0; i < values.size(); i++) {
|
||||||
|
Object val = values.get(i);
|
||||||
|
if (val == null) {
|
||||||
|
records.get(i).setStatus(1);
|
||||||
|
}
|
||||||
|
else{
|
||||||
|
Integer status = NumberUtil.parseInt(val.toString(), null);
|
||||||
|
if (status == null || status == 0) {
|
||||||
|
records.get(i).setStatus(1);
|
||||||
|
}
|
||||||
|
else{
|
||||||
|
records.get(i).setStatus(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 构建并返回分页数据信息
|
||||||
|
return PageDataInfo.build(records, iPage.getTotal());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
Reference in New Issue
Block a user