From f9484cfce624fabfcd0d71002f122bafbe6c8953 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=B7=E6=88=90=E4=BC=9F?= Date: Tue, 31 Dec 2024 16:32:52 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BF=81=E7=A7=BBstateData=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E8=BF=9B=E5=85=A5StateDataCommand?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../node/command/AnalogDataCommand.java | 46 ------------ .../node/command/StateDataCommand.java | 71 ++++++++++++++++++- 2 files changed, 69 insertions(+), 48 deletions(-) diff --git a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java index 7b427e20..8a59b3e1 100644 --- a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java @@ -106,51 +106,5 @@ public class AnalogDataCommand implements BaseCommand { tdEngineService.updateYCLowValues(List.of(lowSpeedData), dev.getModel()); } redisTemplate.opsForValue().multiSet(redisValues); -// Map keyValueMap = new HashMap<>(); -// String modelCode = dataService.deviceModelMap.get(deviceId); -// Set highKey = dataService.highIotFieldMap.get(modelCode).keySet(); -// Set lowKey = dataService.lowIotFieldMap.get(modelCode).keySet(); -// Map highSpeedValueMap = new HashMap<>(); -// Map lowSpeedValueMap = new HashMap<>(); - -// //数据入redis -// if (values != null){ -// Iterator keysHigh = values.fieldNames(); -// while (keysHigh.hasNext()) { -// String fieldName = keysHigh.next(); -// String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase()); -// keyValueMap.put(key, values.get(fieldName)); -// } -// } -// -// if (archiveValues != null){ -// Iterator archiveKeys = archiveValues.fieldNames(); -// while (archiveKeys.hasNext()) { -// String fieldName = archiveKeys.next(); -// String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase()); -// keyValueMap.put(key, archiveValues.get(fieldName)); -// if (highKey.contains(fieldName)) { -// highSpeedValueMap.put(fieldName, archiveValues.get(fieldName)); -// } -// if (lowKey.contains(fieldName)) { -// lowSpeedValueMap.put(fieldName, archiveValues.get(fieldName)); -// } -// } -// } -// //更新td -// if (!highSpeedValueMap.isEmpty()) { -// List highSpeedData = new ArrayList<>(); -// RTData rtHighData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(highSpeedValueMap).build(); -// highSpeedData.add(rtHighData); -// tdEngineService.updateYCHighValues(highSpeedData, modelCode); -// } -// -// if (!lowSpeedValueMap.isEmpty()) { -// List lowSpeedData = new ArrayList<>(); -// RTData rtLowData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(lowSpeedValueMap).build(); -// lowSpeedData.add(rtLowData); -// tdEngineService.updateYCLowValues(lowSpeedData, modelCode); -// } -// adminRedisTemplate.mSet(keyValueMap); } } diff --git a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java index ec01ed44..db719dd4 100644 --- a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java @@ -1,27 +1,94 @@ package com.das.modules.node.command; +import com.das.common.exceptions.ServiceException; +import com.das.modules.cache.domain.DeviceInfoCache; +import com.das.modules.cache.service.CacheService; +import com.das.modules.cache.service.IotModelCache; +import com.das.modules.data.service.TDEngineService; import com.das.modules.node.constant.NodeConstant; +import com.das.modules.node.domain.bo.RTData; import com.das.modules.node.domain.bo.TerminalMessage; import com.das.modules.node.service.NodeMessageService; +import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; +import java.util.*; + @Service(value = NodeConstant.STATE_DATA) @Slf4j public class StateDataCommand implements BaseCommand { @Autowired - NodeMessageService nodeMessageService; + CacheService cacheService; + + @Autowired + RedisTemplate redisTemplate; + + @Autowired + TDEngineService tdEngineService; @Override public void doCommand(TerminalMessage data) { log.debug("收到实时数据[状态量]"); try { //只存入redis - nodeMessageService.handleData(data); + processStateData(data); } catch (Exception e) { log.error("解析数据异常", e); } } + + private void processStateData(TerminalMessage data) { + JsonNode dataNode = data.getData(); + String deviceId = Optional.of(dataNode.get("deviceId")).orElseThrow(() -> new ServiceException("deviceId字段缺失")).asText(); + Long dataTime = Optional.of(dataNode.get("dataTime")).orElseThrow( () -> new ServiceException("dataTime字段缺失")).asLong(); + JsonNode values = dataNode.get("values"); + JsonNode archiveValues = dataNode.get("archiveValues"); + //排除空数据 + if (values == null && archiveValues == null){ + return; + } + //如果values有值,则存入redis + Map redisValues = new HashMap<>(); + if (values != null && values.isObject()){ + for (Iterator it = values.fieldNames(); it.hasNext(); ) { + String valueName = it.next(); + String key = String.format("RT:%s:%s", deviceId, valueName.toLowerCase()); + redisValues.put(key, values.get(valueName)); + } + } + DeviceInfoCache dev = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.parseLong(deviceId)); + IotModelCache iotCache = cacheService.getIotModelCache(); + //如果archiveValues有值,则存入td,同时也更新redis + if (archiveValues != null && archiveValues.isObject()){ + RTData hiSpeedData = new RTData(); + hiSpeedData.setDeviceId(Long.parseLong(deviceId)); + hiSpeedData.setDataTime(dataTime); + Map hiSpeedValues = new HashMap<>(); + RTData lowSpeedData = new RTData(); + lowSpeedData.setDeviceId(Long.parseLong(deviceId)); + lowSpeedData.setDataTime(dataTime); + Map lowSpeedValues = new HashMap<>(); + for (Iterator it = archiveValues.fieldNames(); it.hasNext(); ) { + //加入redis更新列表 + String valueName = it.next(); + String key = String.format("RT:%s:%s", deviceId, valueName.toLowerCase()); + redisValues.put(key, archiveValues.get(valueName)); + //加入td更新列表 + if (iotCache.isHighSpeed(dev.getIotModelId(), valueName)) { + hiSpeedValues.put(valueName, archiveValues.get(valueName)); + } else if (iotCache.isLowSpeed(dev.getIotModelId(), valueName)) { + lowSpeedValues.put(valueName, archiveValues.get(valueName)); + } + } + hiSpeedData.setValues(hiSpeedValues); + lowSpeedData.setValues(lowSpeedValues); + tdEngineService.updateYCHighValues(List.of(hiSpeedData), dev.getModel()); + tdEngineService.updateYCLowValues(List.of(lowSpeedData), dev.getModel()); + } + redisTemplate.opsForValue().multiSet(redisValues); + } }