调整command相关代码
This commit is contained in:
parent
f69e27ced7
commit
acd974f2cb
@ -58,7 +58,7 @@ public class AnalogDataCommand implements BaseCommand {
|
||||
*/
|
||||
|
||||
private void processAnalogData(TerminalMessage data){
|
||||
JsonNode dataNode = data.getData();
|
||||
JsonNode dataNode = Optional.of(data.getData()).orElseThrow(() -> new ServiceException("data字段缺失"));
|
||||
String deviceId = Optional.of(dataNode.get("deviceId")).orElseThrow(() -> new ServiceException("deviceId字段缺失")).asText();
|
||||
Long dataTime = Optional.of(dataNode.get("dataTime")).orElseThrow( () -> new ServiceException("dataTime字段缺失")).asLong();
|
||||
JsonNode values = dataNode.get("values");
|
||||
|
@ -55,7 +55,7 @@ public class DeviceEventCommand implements BaseCommand{
|
||||
JsonNode dataEvent = data.getData();
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
List<DeviceEventInfo> valueList = new ArrayList<>();
|
||||
|
||||
|
||||
// 使用 TypeReference 来指定转换目标类型
|
||||
List<DeviceEventVo> list = objectMapper.convertValue(dataEvent, new TypeReference<List<DeviceEventVo>>() {
|
||||
});
|
||||
|
@ -1,5 +1,6 @@
|
||||
package com.das.modules.node.command;
|
||||
|
||||
import com.das.common.exceptions.ServiceException;
|
||||
import com.das.modules.cache.domain.DeviceInfoCache;
|
||||
import com.das.modules.cache.service.CacheService;
|
||||
import com.das.modules.data.service.TDEngineService;
|
||||
@ -19,49 +20,44 @@ import java.util.*;
|
||||
public class HisHighSpeedCommand implements BaseCommand {
|
||||
|
||||
@Autowired
|
||||
NodeMessageService nodeMessageService;
|
||||
CacheService cacheService;
|
||||
|
||||
@Autowired
|
||||
TDEngineService tdEngineService;
|
||||
|
||||
@Autowired
|
||||
CacheService cacheService;
|
||||
|
||||
@Override
|
||||
public void doCommand(TerminalMessage data) {
|
||||
log.debug("收到历史高频数据");
|
||||
log.debug("收到[历史高频数据]");
|
||||
try {
|
||||
//analogData值只存入redis
|
||||
handleHighSpeed(data);
|
||||
processHisHighSpeedData(data);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("解析数据异常", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleHighSpeed(TerminalMessage data) {
|
||||
JsonNode jsonNode = data.getData();
|
||||
String deviceId = jsonNode.get("deviceId").asText();
|
||||
public void processHisHighSpeedData(TerminalMessage data) {
|
||||
JsonNode jsonNode = Optional.of(data.getData()).orElseThrow(() -> new ServiceException("data字段缺失"));
|
||||
String deviceId = Optional.of(jsonNode.get("deviceId")).orElseThrow( () -> new ServiceException("deviceId字段缺失")).asText();
|
||||
Long dataTime = Optional.of(jsonNode.get("dataTime")).orElseThrow( ()-> new ServiceException("dataTime字段缺失")).asLong();
|
||||
JsonNode values = jsonNode.get("values");
|
||||
Map<String, Object> keyValueMap = new HashMap<>();
|
||||
// 根据设备ID获取对应的物模型code
|
||||
DeviceInfoCache dev = cacheService.getEquipmentCache().getDeviceInfoCacheById(jsonNode.get("deviceId").asLong());
|
||||
|
||||
// 根据设备ID获取对应的物模型属性
|
||||
DeviceInfoCache dev = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.valueOf(deviceId));
|
||||
//High数据
|
||||
Iterator<String> keysHigh = values.fieldNames();
|
||||
while (keysHigh.hasNext()) {
|
||||
String fieldName = keysHigh.next();
|
||||
keyValueMap.put(fieldName, values.get(fieldName));
|
||||
|
||||
if (cacheService.getIotModelCache().isHighSpeed(dev.getIotModelId(), fieldName)) {
|
||||
keyValueMap.put(fieldName, values.get(fieldName));
|
||||
}
|
||||
}
|
||||
Long dataTime = jsonNode.get("dataTime").asLong();
|
||||
List<RTData> highList = new ArrayList<>();
|
||||
|
||||
RTData rtHighData = RTData.builder()
|
||||
.dataTime(dataTime)
|
||||
.deviceId(Long.valueOf(deviceId))
|
||||
.values(keyValueMap)
|
||||
.build();
|
||||
highList.add(rtHighData);
|
||||
tdEngineService.updateYCHighValues(highList, dev.getIotModelCode());
|
||||
tdEngineService.updateYCHighValues(List.of(rtHighData), dev.getIotModelCode());
|
||||
}
|
||||
}
|
||||
|
@ -3,10 +3,10 @@ package com.das.modules.node.command;
|
||||
import com.das.modules.cache.domain.DeviceInfoCache;
|
||||
import com.das.modules.cache.service.CacheService;
|
||||
import com.das.modules.data.service.TDEngineService;
|
||||
import com.das.modules.equipment.mapper.SysIotModelMapper;
|
||||
import com.das.modules.node.constant.NodeConstant;
|
||||
import com.das.modules.node.domain.bo.RTData;
|
||||
import com.das.modules.node.domain.bo.TerminalMessage;
|
||||
import com.das.modules.node.service.NodeMessageService;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@ -19,47 +19,54 @@ import java.util.*;
|
||||
public class HisLowSpeedCommand implements BaseCommand {
|
||||
|
||||
@Autowired
|
||||
TDEngineService tdEngineService;
|
||||
|
||||
CacheService cacheService;
|
||||
|
||||
@Autowired
|
||||
CacheService cacheService;
|
||||
TDEngineService tdEngineService;
|
||||
|
||||
@Override
|
||||
public void doCommand(TerminalMessage data) {
|
||||
log.debug("收到[历史低频数据]");
|
||||
try {
|
||||
//analogData值只存入redis
|
||||
handleLowSpeed(data);
|
||||
processHisLowSpeedData(data);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("解析数据异常", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleLowSpeed(TerminalMessage data) {
|
||||
JsonNode jsonNode = data.getData();
|
||||
String deviceId = jsonNode.get("deviceId").asText();
|
||||
JsonNode values = jsonNode.get("values");
|
||||
private void processHisLowSpeedData(TerminalMessage data) {
|
||||
JsonNode dataNode = data.getData();
|
||||
if (dataNode == null ) {
|
||||
log.error("收到无效历史低频数据报文");
|
||||
return;
|
||||
}
|
||||
String deviceId = Optional.of(dataNode.get("deviceId")).orElseThrow(() -> new RuntimeException("设备ID为空")).asText();
|
||||
Long dataTime = Optional.of(dataNode.get("dataTime")).orElseThrow(() -> new RuntimeException("数据时间为空")).asLong();
|
||||
|
||||
JsonNode values = dataNode.get("values");
|
||||
Map<String, Object> keyValueMap = new HashMap<>();
|
||||
// 根据设备ID获取对应的物模型code
|
||||
DeviceInfoCache dev = cacheService.getEquipmentCache().getDeviceInfoCacheById(jsonNode.get("deviceId").asLong());
|
||||
// 根据设备ID获取对应的物模型属性
|
||||
|
||||
DeviceInfoCache device = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.valueOf(deviceId));
|
||||
|
||||
String iotModelCode = device.getIotModelCode();
|
||||
|
||||
//Low数据
|
||||
Iterator<String> keysLow = values.fieldNames();
|
||||
while (keysLow.hasNext()) {
|
||||
String fieldName = keysLow.next();
|
||||
keyValueMap.put(fieldName, values.get(fieldName));
|
||||
|
||||
if (cacheService.getIotModelCache().isLowSpeed(device.getIotModelId(), fieldName)){
|
||||
keyValueMap.put(fieldName, values.get(fieldName));
|
||||
}
|
||||
}
|
||||
Long dataTime = jsonNode.get("dataTime").asLong();
|
||||
List<RTData> highList = new ArrayList<>();
|
||||
RTData rtHighData = RTData.builder()
|
||||
RTData rtLowData = RTData.builder()
|
||||
.dataTime(dataTime)
|
||||
.deviceId(Long.valueOf(deviceId))
|
||||
.values(keyValueMap)
|
||||
.build();
|
||||
highList.add(rtHighData);
|
||||
tdEngineService.updateYCLowValues(highList, dev.getIotModelCode());
|
||||
}
|
||||
|
||||
tdEngineService.updateYCLowValues(List.of(rtLowData), iotModelCode);
|
||||
}
|
||||
}
|
||||
|
@ -42,7 +42,7 @@ public class StateDataCommand implements BaseCommand {
|
||||
}
|
||||
|
||||
private void processStateData(TerminalMessage data) {
|
||||
JsonNode dataNode = data.getData();
|
||||
JsonNode dataNode = Optional.of(data.getData()).orElseThrow(() -> new ServiceException("data字段缺失"));
|
||||
String deviceId = Optional.of(dataNode.get("deviceId")).orElseThrow(() -> new ServiceException("deviceId字段缺失")).asText();
|
||||
Long dataTime = Optional.of(dataNode.get("dataTime")).orElseThrow( () -> new ServiceException("dataTime字段缺失")).asLong();
|
||||
JsonNode values = dataNode.get("values");
|
||||
|
Loading…
Reference in New Issue
Block a user