This commit is contained in:
高云鹏 2025-01-08 15:50:37 +08:00
commit 1baa3f77c2
8 changed files with 50 additions and 62 deletions

View File

@ -58,7 +58,7 @@ public class AnalogDataCommand implements BaseCommand {
*/ */
private void processAnalogData(TerminalMessage data){ private void processAnalogData(TerminalMessage data){
JsonNode dataNode = data.getData(); JsonNode dataNode = Optional.of(data.getData()).orElseThrow(() -> new ServiceException("data字段缺失"));
String deviceId = Optional.of(dataNode.get("deviceId")).orElseThrow(() -> new ServiceException("deviceId字段缺失")).asText(); String deviceId = Optional.of(dataNode.get("deviceId")).orElseThrow(() -> new ServiceException("deviceId字段缺失")).asText();
Long dataTime = Optional.of(dataNode.get("dataTime")).orElseThrow( () -> new ServiceException("dataTime字段缺失")).asLong(); Long dataTime = Optional.of(dataNode.get("dataTime")).orElseThrow( () -> new ServiceException("dataTime字段缺失")).asLong();
JsonNode values = dataNode.get("values"); JsonNode values = dataNode.get("values");

View File

@ -1,5 +1,6 @@
package com.das.modules.node.command; package com.das.modules.node.command;
import com.das.common.exceptions.ServiceException;
import com.das.modules.cache.domain.DeviceInfoCache; import com.das.modules.cache.domain.DeviceInfoCache;
import com.das.modules.cache.service.CacheService; import com.das.modules.cache.service.CacheService;
import com.das.modules.data.service.TDEngineService; import com.das.modules.data.service.TDEngineService;
@ -19,49 +20,44 @@ import java.util.*;
public class HisHighSpeedCommand implements BaseCommand { public class HisHighSpeedCommand implements BaseCommand {
@Autowired @Autowired
NodeMessageService nodeMessageService; CacheService cacheService;
@Autowired @Autowired
TDEngineService tdEngineService; TDEngineService tdEngineService;
@Autowired
CacheService cacheService;
@Override @Override
public void doCommand(TerminalMessage data) { public void doCommand(TerminalMessage data) {
log.debug("收到历史高频数据"); log.debug("收到[历史高频数据]");
try { try {
//analogData值只存入redis processHisHighSpeedData(data);
handleHighSpeed(data);
} catch (Exception e) { } catch (Exception e) {
log.error("解析数据异常", e); log.error("解析数据异常", e);
} }
} }
private void handleHighSpeed(TerminalMessage data) { public void processHisHighSpeedData(TerminalMessage data) {
JsonNode jsonNode = data.getData(); JsonNode jsonNode = Optional.of(data.getData()).orElseThrow(() -> new ServiceException("data字段缺失"));
String deviceId = jsonNode.get("deviceId").asText(); String deviceId = Optional.of(jsonNode.get("deviceId")).orElseThrow( () -> new ServiceException("deviceId字段缺失")).asText();
Long dataTime = Optional.of(jsonNode.get("dataTime")).orElseThrow( ()-> new ServiceException("dataTime字段缺失")).asLong();
JsonNode values = jsonNode.get("values"); JsonNode values = jsonNode.get("values");
Map<String, Object> keyValueMap = new HashMap<>(); Map<String, Object> keyValueMap = new HashMap<>();
// 根据设备ID获取对应的物模型code // 根据设备ID获取对应的物模型属性
DeviceInfoCache dev = cacheService.getEquipmentCache().getDeviceInfoCacheById(jsonNode.get("deviceId").asLong()); DeviceInfoCache dev = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.valueOf(deviceId));
//High数据 //High数据
Iterator<String> keysHigh = values.fieldNames(); Iterator<String> keysHigh = values.fieldNames();
while (keysHigh.hasNext()) { while (keysHigh.hasNext()) {
String fieldName = keysHigh.next(); String fieldName = keysHigh.next();
keyValueMap.put(fieldName, values.get(fieldName)); if (cacheService.getIotModelCache().isHighSpeed(dev.getIotModelId(), fieldName)) {
keyValueMap.put(fieldName, values.get(fieldName));
}
} }
Long dataTime = jsonNode.get("dataTime").asLong();
List<RTData> highList = new ArrayList<>();
RTData rtHighData = RTData.builder() RTData rtHighData = RTData.builder()
.dataTime(dataTime) .dataTime(dataTime)
.deviceId(Long.valueOf(deviceId)) .deviceId(Long.valueOf(deviceId))
.values(keyValueMap) .values(keyValueMap)
.build(); .build();
highList.add(rtHighData); tdEngineService.updateYCHighValues(List.of(rtHighData), dev.getIotModelCode());
tdEngineService.updateYCHighValues(highList, dev.getIotModelCode());
} }
} }

View File

@ -3,10 +3,10 @@ package com.das.modules.node.command;
import com.das.modules.cache.domain.DeviceInfoCache; import com.das.modules.cache.domain.DeviceInfoCache;
import com.das.modules.cache.service.CacheService; import com.das.modules.cache.service.CacheService;
import com.das.modules.data.service.TDEngineService; import com.das.modules.data.service.TDEngineService;
import com.das.modules.equipment.mapper.SysIotModelMapper;
import com.das.modules.node.constant.NodeConstant; import com.das.modules.node.constant.NodeConstant;
import com.das.modules.node.domain.bo.RTData; import com.das.modules.node.domain.bo.RTData;
import com.das.modules.node.domain.bo.TerminalMessage; import com.das.modules.node.domain.bo.TerminalMessage;
import com.das.modules.node.service.NodeMessageService;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -19,47 +19,54 @@ import java.util.*;
public class HisLowSpeedCommand implements BaseCommand { public class HisLowSpeedCommand implements BaseCommand {
@Autowired @Autowired
TDEngineService tdEngineService; CacheService cacheService;
@Autowired @Autowired
CacheService cacheService; TDEngineService tdEngineService;
@Override @Override
public void doCommand(TerminalMessage data) { public void doCommand(TerminalMessage data) {
log.debug("收到[历史低频数据]"); log.debug("收到[历史低频数据]");
try { try {
//analogData值只存入redis //analogData值只存入redis
handleLowSpeed(data); processHisLowSpeedData(data);
} catch (Exception e) { } catch (Exception e) {
log.error("解析数据异常", e); log.error("解析数据异常", e);
} }
} }
private void handleLowSpeed(TerminalMessage data) { private void processHisLowSpeedData(TerminalMessage data) {
JsonNode jsonNode = data.getData(); JsonNode dataNode = data.getData();
String deviceId = jsonNode.get("deviceId").asText(); if (dataNode == null ) {
JsonNode values = jsonNode.get("values"); log.error("收到无效历史低频数据报文");
return;
}
String deviceId = Optional.of(dataNode.get("deviceId")).orElseThrow(() -> new RuntimeException("设备ID为空")).asText();
Long dataTime = Optional.of(dataNode.get("dataTime")).orElseThrow(() -> new RuntimeException("数据时间为空")).asLong();
JsonNode values = dataNode.get("values");
Map<String, Object> keyValueMap = new HashMap<>(); Map<String, Object> keyValueMap = new HashMap<>();
// 根据设备ID获取对应的物模型code // 根据设备ID获取对应的物模型属性
DeviceInfoCache dev = cacheService.getEquipmentCache().getDeviceInfoCacheById(jsonNode.get("deviceId").asLong());
DeviceInfoCache device = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.valueOf(deviceId));
String iotModelCode = device.getIotModelCode();
//Low数据 //Low数据
Iterator<String> keysLow = values.fieldNames(); Iterator<String> keysLow = values.fieldNames();
while (keysLow.hasNext()) { while (keysLow.hasNext()) {
String fieldName = keysLow.next(); String fieldName = keysLow.next();
keyValueMap.put(fieldName, values.get(fieldName)); if (cacheService.getIotModelCache().isLowSpeed(device.getIotModelId(), fieldName)){
keyValueMap.put(fieldName, values.get(fieldName));
}
} }
Long dataTime = jsonNode.get("dataTime").asLong(); RTData rtLowData = RTData.builder()
List<RTData> highList = new ArrayList<>();
RTData rtHighData = RTData.builder()
.dataTime(dataTime) .dataTime(dataTime)
.deviceId(Long.valueOf(deviceId)) .deviceId(Long.valueOf(deviceId))
.values(keyValueMap) .values(keyValueMap)
.build(); .build();
highList.add(rtHighData);
tdEngineService.updateYCLowValues(highList, dev.getIotModelCode());
}
tdEngineService.updateYCLowValues(List.of(rtLowData), iotModelCode);
}
} }

View File

@ -42,7 +42,7 @@ public class StateDataCommand implements BaseCommand {
} }
private void processStateData(TerminalMessage data) { private void processStateData(TerminalMessage data) {
JsonNode dataNode = data.getData(); JsonNode dataNode = Optional.of(data.getData()).orElseThrow(() -> new ServiceException("data字段缺失"));
String deviceId = Optional.of(dataNode.get("deviceId")).orElseThrow(() -> new ServiceException("deviceId字段缺失")).asText(); String deviceId = Optional.of(dataNode.get("deviceId")).orElseThrow(() -> new ServiceException("deviceId字段缺失")).asText();
Long dataTime = Optional.of(dataNode.get("dataTime")).orElseThrow( () -> new ServiceException("dataTime字段缺失")).asLong(); Long dataTime = Optional.of(dataNode.get("dataTime")).orElseThrow( () -> new ServiceException("dataTime字段缺失")).asLong();
JsonNode values = dataNode.get("values"); JsonNode values = dataNode.get("values");

View File

@ -19,6 +19,8 @@ import org.springframework.web.multipart.MultipartFile;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
/** /**
* 物模型Controller * 物模型Controller
@ -84,7 +86,7 @@ public class SysNodeController {
/** 配置下发 */ /** 配置下发 */
@PostMapping("/configUpdate") @PostMapping("/configUpdate")
public R<?> configUpdate(@RequestBody SysNodeDto sysNodeDto) { public R<?> configUpdate(@RequestBody SysNodeDto sysNodeDto) throws ExecutionException, InterruptedException, TimeoutException {
nodeMessageService.sendTerminalConfig(sysNodeDto.getId()); nodeMessageService.sendTerminalConfig(sysNodeDto.getId());
return R.success(); return R.success();
} }

View File

@ -12,7 +12,7 @@ import java.util.concurrent.TimeoutException;
public interface NodeMessageService { public interface NodeMessageService {
JsonNode sendTerminalConfig(Long nodeId); JsonNode sendTerminalConfig(Long nodeId) throws ExecutionException, InterruptedException, TimeoutException;
/** /**
* 向指定采集节点发送指令(无返回值) * 向指定采集节点发送指令(无返回值)

View File

@ -75,22 +75,6 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
@Resource @Resource
SysImpTabMappingMapper sysImptabmappingMapper; SysImpTabMappingMapper sysImptabmappingMapper;
@Autowired
AdminRedisTemplate adminRedisTemplate;
@Autowired
SysIotModelMapper sysIotModelMapper;
@Autowired
TDEngineService tdEngineService;
@Autowired
private CacheService cacheService;
@Autowired
private DataServiceImpl dataService;
@PostConstruct @PostConstruct
public void init() { public void init() {
//初始化高性能队列 //初始化高性能队列
@ -135,7 +119,7 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
} }
@Override @Override
public JsonNode sendTerminalConfig(Long nodeId) { public JsonNode sendTerminalConfig(Long nodeId) throws ExecutionException, InterruptedException, TimeoutException {
ConfigUpdateVo configUpdateVo = new ConfigUpdateVo(); ConfigUpdateVo configUpdateVo = new ConfigUpdateVo();
List<LinkVo> links = new ArrayList<>(); List<LinkVo> links = new ArrayList<>();
try { try {
@ -232,8 +216,7 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
.time(time) .time(time)
.data(jsonNode) .data(jsonNode)
.build(); .build();
sendActionMessage(nodeId, configUpdate); sendTerminalMessageWithResult(nodeId,configUpdate);
System.out.println(jsonNode);
return jsonNode; return jsonNode;
} }