迁移stateData功能进入StateDataCommand

This commit is contained in:
谷成伟 2024-12-31 16:32:52 +08:00
parent 9e82f9e355
commit f9484cfce6
2 changed files with 69 additions and 48 deletions

View File

@ -106,51 +106,5 @@ public class AnalogDataCommand implements BaseCommand {
tdEngineService.updateYCLowValues(List.of(lowSpeedData), dev.getModel());
}
redisTemplate.opsForValue().multiSet(redisValues);
// Map<String, Object> keyValueMap = new HashMap<>();
// String modelCode = dataService.deviceModelMap.get(deviceId);
// Set<String> highKey = dataService.highIotFieldMap.get(modelCode).keySet();
// Set<String> lowKey = dataService.lowIotFieldMap.get(modelCode).keySet();
// Map<String, Object> highSpeedValueMap = new HashMap<>();
// Map<String, Object> lowSpeedValueMap = new HashMap<>();
// //数据入redis
// if (values != null){
// Iterator<String> keysHigh = values.fieldNames();
// while (keysHigh.hasNext()) {
// String fieldName = keysHigh.next();
// String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase());
// keyValueMap.put(key, values.get(fieldName));
// }
// }
//
// if (archiveValues != null){
// Iterator<String> archiveKeys = archiveValues.fieldNames();
// while (archiveKeys.hasNext()) {
// String fieldName = archiveKeys.next();
// String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase());
// keyValueMap.put(key, archiveValues.get(fieldName));
// if (highKey.contains(fieldName)) {
// highSpeedValueMap.put(fieldName, archiveValues.get(fieldName));
// }
// if (lowKey.contains(fieldName)) {
// lowSpeedValueMap.put(fieldName, archiveValues.get(fieldName));
// }
// }
// }
// //更新td
// if (!highSpeedValueMap.isEmpty()) {
// List<RTData> highSpeedData = new ArrayList<>();
// RTData rtHighData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(highSpeedValueMap).build();
// highSpeedData.add(rtHighData);
// tdEngineService.updateYCHighValues(highSpeedData, modelCode);
// }
//
// if (!lowSpeedValueMap.isEmpty()) {
// List<RTData> lowSpeedData = new ArrayList<>();
// RTData rtLowData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(lowSpeedValueMap).build();
// lowSpeedData.add(rtLowData);
// tdEngineService.updateYCLowValues(lowSpeedData, modelCode);
// }
// adminRedisTemplate.mSet(keyValueMap);
}
}

View File

@ -1,27 +1,94 @@
package com.das.modules.node.command;
import com.das.common.exceptions.ServiceException;
import com.das.modules.cache.domain.DeviceInfoCache;
import com.das.modules.cache.service.CacheService;
import com.das.modules.cache.service.IotModelCache;
import com.das.modules.data.service.TDEngineService;
import com.das.modules.node.constant.NodeConstant;
import com.das.modules.node.domain.bo.RTData;
import com.das.modules.node.domain.bo.TerminalMessage;
import com.das.modules.node.service.NodeMessageService;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.*;
@Service(value = NodeConstant.STATE_DATA)
@Slf4j
public class StateDataCommand implements BaseCommand {
@Autowired
NodeMessageService nodeMessageService;
CacheService cacheService;
@Autowired
RedisTemplate<String,Object> redisTemplate;
@Autowired
TDEngineService tdEngineService;
@Override
public void doCommand(TerminalMessage data) {
log.debug("收到实时数据[状态量]");
try {
//只存入redis
nodeMessageService.handleData(data);
processStateData(data);
} catch (Exception e) {
log.error("解析数据异常", e);
}
}
private void processStateData(TerminalMessage data) {
JsonNode dataNode = data.getData();
String deviceId = Optional.of(dataNode.get("deviceId")).orElseThrow(() -> new ServiceException("deviceId字段缺失")).asText();
Long dataTime = Optional.of(dataNode.get("dataTime")).orElseThrow( () -> new ServiceException("dataTime字段缺失")).asLong();
JsonNode values = dataNode.get("values");
JsonNode archiveValues = dataNode.get("archiveValues");
//排除空数据
if (values == null && archiveValues == null){
return;
}
//如果values有值则存入redis
Map<String,Object> redisValues = new HashMap<>();
if (values != null && values.isObject()){
for (Iterator<String> it = values.fieldNames(); it.hasNext(); ) {
String valueName = it.next();
String key = String.format("RT:%s:%s", deviceId, valueName.toLowerCase());
redisValues.put(key, values.get(valueName));
}
}
DeviceInfoCache dev = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.parseLong(deviceId));
IotModelCache iotCache = cacheService.getIotModelCache();
//如果archiveValues有值则存入td同时也更新redis
if (archiveValues != null && archiveValues.isObject()){
RTData hiSpeedData = new RTData();
hiSpeedData.setDeviceId(Long.parseLong(deviceId));
hiSpeedData.setDataTime(dataTime);
Map<String,Object> hiSpeedValues = new HashMap<>();
RTData lowSpeedData = new RTData();
lowSpeedData.setDeviceId(Long.parseLong(deviceId));
lowSpeedData.setDataTime(dataTime);
Map<String,Object> lowSpeedValues = new HashMap<>();
for (Iterator<String> it = archiveValues.fieldNames(); it.hasNext(); ) {
//加入redis更新列表
String valueName = it.next();
String key = String.format("RT:%s:%s", deviceId, valueName.toLowerCase());
redisValues.put(key, archiveValues.get(valueName));
//加入td更新列表
if (iotCache.isHighSpeed(dev.getIotModelId(), valueName)) {
hiSpeedValues.put(valueName, archiveValues.get(valueName));
} else if (iotCache.isLowSpeed(dev.getIotModelId(), valueName)) {
lowSpeedValues.put(valueName, archiveValues.get(valueName));
}
}
hiSpeedData.setValues(hiSpeedValues);
lowSpeedData.setValues(lowSpeedValues);
tdEngineService.updateYCHighValues(List.of(hiSpeedData), dev.getModel());
tdEngineService.updateYCLowValues(List.of(lowSpeedData), dev.getModel());
}
redisTemplate.opsForValue().multiSet(redisValues);
}
}