迁移analogData功能进入AnalogDataCommand
This commit is contained in:
parent
81d46ccda5
commit
9e82f9e355
@ -277,6 +277,9 @@ public class TDEngineService {
|
||||
|
||||
@Async
|
||||
public void updateYCHighValues(List<RTData> values, String iotModelCode) {
|
||||
if (values.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
StringBuilder sb = new StringBuilder(1024 * 1024);
|
||||
try (Connection conn = hikariDataSource.getConnection();
|
||||
Statement pstmt = conn.createStatement()) {
|
||||
@ -315,6 +318,9 @@ public class TDEngineService {
|
||||
|
||||
@Async
|
||||
public void updateYCLowValues(List<RTData> values, String iotModelCode) {
|
||||
if (values.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
StringBuilder sb = new StringBuilder(1024 * 1024);
|
||||
try (Connection conn = hikariDataSource.getConnection();
|
||||
Statement pstmt = conn.createStatement()) {
|
||||
@ -353,6 +359,9 @@ public class TDEngineService {
|
||||
|
||||
@Async
|
||||
public void updateCalFieldValues(List<CalculateRTData> values) {
|
||||
if (values.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
StringBuilder sb = new StringBuilder(1024 * 1024);
|
||||
try (Connection conn = hikariDataSource.getConnection();
|
||||
Statement pstmt = conn.createStatement()) {
|
||||
@ -387,6 +396,9 @@ public class TDEngineService {
|
||||
|
||||
@Async
|
||||
public void updateDeviceEventValues(List<DeviceEventInfo> values) {
|
||||
if (values.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
StringBuilder sb = new StringBuilder(1024 * 1024);
|
||||
try (Connection conn = hikariDataSource.getConnection();
|
||||
Statement pstmt = conn.createStatement()) {
|
||||
|
@ -1,26 +1,156 @@
|
||||
package com.das.modules.node.command;
|
||||
|
||||
import com.das.common.exceptions.ServiceException;
|
||||
import com.das.modules.cache.domain.DeviceInfoCache;
|
||||
import com.das.modules.cache.service.CacheService;
|
||||
import com.das.modules.cache.service.IotModelCache;
|
||||
import com.das.modules.data.service.TDEngineService;
|
||||
import com.das.modules.node.constant.NodeConstant;
|
||||
import com.das.modules.node.domain.bo.RTData;
|
||||
import com.das.modules.node.domain.bo.TerminalMessage;
|
||||
import com.das.modules.node.service.NodeMessageService;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
@Service(value = NodeConstant.ANALOG_DATA)
|
||||
@Slf4j
|
||||
public class AnalogDataCommand implements BaseCommand {
|
||||
|
||||
@Autowired
|
||||
NodeMessageService nodeMessageService;
|
||||
CacheService cacheService;
|
||||
|
||||
@Autowired
|
||||
TDEngineService tdEngineService;
|
||||
|
||||
@Autowired
|
||||
RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
/**
|
||||
* 执行命令方法
|
||||
* 当接收到终端消息时,此方法被调用以处理数据
|
||||
* 主要负责处理模拟量数据的接收和处理
|
||||
*
|
||||
* @param data 包含模拟量数据的TerminalMessage对象
|
||||
*/
|
||||
@Override
|
||||
public void doCommand(TerminalMessage data) {
|
||||
if (log.isDebugEnabled()){
|
||||
log.debug("收到实时数据[模拟量]");
|
||||
log.debug("数据内容: {}", data.toJsonString());
|
||||
}
|
||||
try {
|
||||
//analogData值只存入redis
|
||||
nodeMessageService.handleData(data);
|
||||
processAnalogData(data);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("解析数据异常", e);
|
||||
log.error("处理实时数据时产生异常", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理实时数据
|
||||
* @param data
|
||||
*/
|
||||
|
||||
private void processAnalogData(TerminalMessage data){
|
||||
JsonNode dataNode = data.getData();
|
||||
String deviceId = Optional.of(dataNode.get("deviceId")).orElseThrow(() -> new ServiceException("deviceId字段缺失")).asText();
|
||||
Long dataTime = Optional.of(dataNode.get("dataTime")).orElseThrow( () -> new ServiceException("dataTime字段缺失")).asLong();
|
||||
JsonNode values = dataNode.get("values");
|
||||
JsonNode archiveValues = dataNode.get("archiveValues");
|
||||
//排除空数据
|
||||
if (values == null && archiveValues == null){
|
||||
return;
|
||||
}
|
||||
//如果values有值,则存入redis
|
||||
Map<String,Object> redisValues = new HashMap<>();
|
||||
if (values != null && values.isObject()){
|
||||
for (Iterator<String> it = values.fieldNames(); it.hasNext(); ) {
|
||||
String valueName = it.next();
|
||||
String key = String.format("RT:%s:%s", deviceId, valueName.toLowerCase());
|
||||
redisValues.put(key, values.get(valueName));
|
||||
}
|
||||
}
|
||||
DeviceInfoCache dev = cacheService.getEquipmentCache().getDeviceInfoCacheById(Long.parseLong(deviceId));
|
||||
IotModelCache iotCache = cacheService.getIotModelCache();
|
||||
//如果archiveValues有值,则存入td,同时也更新redis
|
||||
if (archiveValues != null && archiveValues.isObject()){
|
||||
RTData hiSpeedData = new RTData();
|
||||
hiSpeedData.setDeviceId(Long.parseLong(deviceId));
|
||||
hiSpeedData.setDataTime(dataTime);
|
||||
Map<String,Object> hiSpeedValues = new HashMap<>();
|
||||
RTData lowSpeedData = new RTData();
|
||||
lowSpeedData.setDeviceId(Long.parseLong(deviceId));
|
||||
lowSpeedData.setDataTime(dataTime);
|
||||
Map<String,Object> lowSpeedValues = new HashMap<>();
|
||||
for (Iterator<String> it = archiveValues.fieldNames(); it.hasNext(); ) {
|
||||
//加入redis更新列表
|
||||
String valueName = it.next();
|
||||
String key = String.format("RT:%s:%s", deviceId, valueName.toLowerCase());
|
||||
redisValues.put(key, archiveValues.get(valueName));
|
||||
//加入td更新列表
|
||||
if (iotCache.isHighSpeed(dev.getIotModelId(), valueName)) {
|
||||
hiSpeedValues.put(valueName, archiveValues.get(valueName));
|
||||
} else if (iotCache.isLowSpeed(dev.getIotModelId(), valueName)) {
|
||||
lowSpeedValues.put(valueName, archiveValues.get(valueName));
|
||||
}
|
||||
}
|
||||
hiSpeedData.setValues(hiSpeedValues);
|
||||
lowSpeedData.setValues(lowSpeedValues);
|
||||
tdEngineService.updateYCHighValues(List.of(hiSpeedData), dev.getModel());
|
||||
tdEngineService.updateYCLowValues(List.of(lowSpeedData), dev.getModel());
|
||||
}
|
||||
redisTemplate.opsForValue().multiSet(redisValues);
|
||||
// Map<String, Object> keyValueMap = new HashMap<>();
|
||||
// String modelCode = dataService.deviceModelMap.get(deviceId);
|
||||
// Set<String> highKey = dataService.highIotFieldMap.get(modelCode).keySet();
|
||||
// Set<String> lowKey = dataService.lowIotFieldMap.get(modelCode).keySet();
|
||||
// Map<String, Object> highSpeedValueMap = new HashMap<>();
|
||||
// Map<String, Object> lowSpeedValueMap = new HashMap<>();
|
||||
|
||||
// //数据入redis
|
||||
// if (values != null){
|
||||
// Iterator<String> keysHigh = values.fieldNames();
|
||||
// while (keysHigh.hasNext()) {
|
||||
// String fieldName = keysHigh.next();
|
||||
// String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase());
|
||||
// keyValueMap.put(key, values.get(fieldName));
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// if (archiveValues != null){
|
||||
// Iterator<String> archiveKeys = archiveValues.fieldNames();
|
||||
// while (archiveKeys.hasNext()) {
|
||||
// String fieldName = archiveKeys.next();
|
||||
// String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase());
|
||||
// keyValueMap.put(key, archiveValues.get(fieldName));
|
||||
// if (highKey.contains(fieldName)) {
|
||||
// highSpeedValueMap.put(fieldName, archiveValues.get(fieldName));
|
||||
// }
|
||||
// if (lowKey.contains(fieldName)) {
|
||||
// lowSpeedValueMap.put(fieldName, archiveValues.get(fieldName));
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// //更新td
|
||||
// if (!highSpeedValueMap.isEmpty()) {
|
||||
// List<RTData> highSpeedData = new ArrayList<>();
|
||||
// RTData rtHighData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(highSpeedValueMap).build();
|
||||
// highSpeedData.add(rtHighData);
|
||||
// tdEngineService.updateYCHighValues(highSpeedData, modelCode);
|
||||
// }
|
||||
//
|
||||
// if (!lowSpeedValueMap.isEmpty()) {
|
||||
// List<RTData> lowSpeedData = new ArrayList<>();
|
||||
// RTData rtLowData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(lowSpeedValueMap).build();
|
||||
// lowSpeedData.add(rtLowData);
|
||||
// tdEngineService.updateYCLowValues(lowSpeedData, modelCode);
|
||||
// }
|
||||
// adminRedisTemplate.mSet(keyValueMap);
|
||||
}
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ public class DeviceEventCommand implements BaseCommand{
|
||||
|
||||
@Override
|
||||
public void doCommand(TerminalMessage data) {
|
||||
log.debug("收到实时[事件告警]");
|
||||
try {
|
||||
nodeMessageService.handleDeviceEvent(data);
|
||||
|
||||
|
@ -34,7 +34,7 @@ public class HeartbeatCommand implements BaseCommand{
|
||||
*/
|
||||
@Override
|
||||
public void doCommand(TerminalMessage data) {
|
||||
log.info("收到[heartbeat]报文");
|
||||
log.debug("收到[heartbeat]报文");
|
||||
|
||||
// 解析心跳报文中的数据信息
|
||||
JsonNode dataInfo = data.getData();
|
||||
|
@ -15,6 +15,7 @@ public class HisHighSpeedCommand implements BaseCommand {
|
||||
NodeMessageService nodeMessageService;
|
||||
@Override
|
||||
public void doCommand(TerminalMessage data) {
|
||||
log.debug("收到历史高频数据");
|
||||
try {
|
||||
//analogData值只存入redis
|
||||
nodeMessageService.handleHighSpeed(data);
|
||||
|
@ -15,6 +15,7 @@ public class HisLowSpeedCommand implements BaseCommand {
|
||||
NodeMessageService nodeMessageService;
|
||||
@Override
|
||||
public void doCommand(TerminalMessage data) {
|
||||
log.debug("收到[历史低频数据]");
|
||||
try {
|
||||
//analogData值只存入redis
|
||||
nodeMessageService.handleLowSpeed(data);
|
||||
|
@ -16,6 +16,7 @@ public class StateDataCommand implements BaseCommand {
|
||||
|
||||
@Override
|
||||
public void doCommand(TerminalMessage data) {
|
||||
log.debug("收到实时数据[状态量]");
|
||||
try {
|
||||
//只存入redis
|
||||
nodeMessageService.handleData(data);
|
||||
|
@ -14,8 +14,6 @@ public interface NodeMessageService {
|
||||
|
||||
JsonNode sendTerminalConfig(Long nodeId);
|
||||
|
||||
void handleData(TerminalMessage data);
|
||||
|
||||
void handleHighSpeed(TerminalMessage data);
|
||||
|
||||
void handleLowSpeed(TerminalMessage data);
|
||||
|
@ -238,62 +238,6 @@ public class NodeMessageServiceImpl extends TextWebSocketHandler implements Node
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleData(TerminalMessage data) {
|
||||
JsonNode jsonNode = data.getData();
|
||||
log.debug("收到消息:{}",data.getData());
|
||||
String deviceId = jsonNode.get("deviceId").asText();
|
||||
JsonNode values = jsonNode.get("values");
|
||||
JsonNode archiveValues = jsonNode.get("archiveValues");
|
||||
Long dataTime = jsonNode.get("dataTime").asLong();
|
||||
Map<String, Object> keyValueMap = new HashMap<>();
|
||||
String modelCode = dataService.deviceModelMap.get(deviceId);
|
||||
Set<String> highKey = dataService.highIotFieldMap.get(modelCode).keySet();
|
||||
Set<String> lowKey = dataService.lowIotFieldMap.get(modelCode).keySet();
|
||||
Map<String, Object> highSpeedValueMap = new HashMap<>();
|
||||
Map<String, Object> lowSpeedValueMap = new HashMap<>();
|
||||
|
||||
//数据入redis
|
||||
if (values != null){
|
||||
Iterator<String> keysHigh = values.fieldNames();
|
||||
while (keysHigh.hasNext()) {
|
||||
String fieldName = keysHigh.next();
|
||||
String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase());
|
||||
keyValueMap.put(key, values.get(fieldName));
|
||||
}
|
||||
}
|
||||
|
||||
if (archiveValues != null){
|
||||
Iterator<String> archiveKeys = archiveValues.fieldNames();
|
||||
while (archiveKeys.hasNext()) {
|
||||
String fieldName = archiveKeys.next();
|
||||
String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase());
|
||||
keyValueMap.put(key, archiveValues.get(fieldName));
|
||||
if (highKey.contains(fieldName)) {
|
||||
highSpeedValueMap.put(fieldName, archiveValues.get(fieldName));
|
||||
}
|
||||
if (lowKey.contains(fieldName)) {
|
||||
lowSpeedValueMap.put(fieldName, archiveValues.get(fieldName));
|
||||
}
|
||||
}
|
||||
}
|
||||
//更新td
|
||||
if (!highSpeedValueMap.isEmpty()) {
|
||||
List<RTData> highSpeedData = new ArrayList<>();
|
||||
RTData rtHighData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(highSpeedValueMap).build();
|
||||
highSpeedData.add(rtHighData);
|
||||
tdEngineService.updateYCHighValues(highSpeedData, modelCode);
|
||||
}
|
||||
|
||||
if (!lowSpeedValueMap.isEmpty()) {
|
||||
List<RTData> lowSpeedData = new ArrayList<>();
|
||||
RTData rtLowData = RTData.builder().dataTime(dataTime).deviceId(Long.valueOf(deviceId)).values(lowSpeedValueMap).build();
|
||||
lowSpeedData.add(rtLowData);
|
||||
tdEngineService.updateYCLowValues(lowSpeedData, modelCode);
|
||||
}
|
||||
adminRedisTemplate.mSet(keyValueMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleHighSpeed(TerminalMessage data) {
|
||||
JsonNode jsonNode = data.getData();
|
||||
|
Loading…
Reference in New Issue
Block a user