计算量数据更新至td,redis

This commit is contained in:
huguanghan 2024-10-31 09:34:09 +08:00
parent e3d5a60cd3
commit 786de57c92
5 changed files with 125 additions and 34 deletions

View File

@ -0,0 +1,23 @@
package com.das.modules.node.domain.bo;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class CalculateRTData {
@JsonSerialize(using = ToStringSerializer.class)
private Long deviceId;
private Long dataTime;
private Object dataValue;
private String iotModelField;
}

View File

@ -19,5 +19,6 @@ public class RTData {
@JsonSerialize(using = ToStringSerializer.class) @JsonSerialize(using = ToStringSerializer.class)
private Long deviceId; private Long deviceId;
private Long dataTime; private Long dataTime;
//字段和值的key value
private Map<String, Object> values; private Map<String, Object> values;
} }

View File

@ -1,8 +1,11 @@
package com.das.modules.node.service; package com.das.modules.node.service;
import com.das.modules.node.domain.bo.CalculateRTData;
import com.das.modules.node.domain.bo.TerminalMessage; import com.das.modules.node.domain.bo.TerminalMessage;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import java.util.List;
public interface DataService { public interface DataService {
void pushMessage(TerminalMessage msg); void pushMessage(TerminalMessage msg);
@ -15,4 +18,6 @@ public interface DataService {
void handleHighSpeed(TerminalMessage data); void handleHighSpeed(TerminalMessage data);
void handleLowSpeed(TerminalMessage data); void handleLowSpeed(TerminalMessage data);
void updateCalFieldData(List<CalculateRTData> values);
} }

View File

@ -4,7 +4,9 @@ import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.das.modules.equipment.domain.vo.IotModelFieldVo; import com.das.modules.equipment.domain.vo.IotModelFieldVo;
import com.das.modules.equipment.mapper.SysIotModelMapper; import com.das.modules.equipment.mapper.SysIotModelMapper;
import com.das.modules.node.domain.bo.CalculateRTData;
import com.das.modules.node.domain.bo.RTData; import com.das.modules.node.domain.bo.RTData;
import com.das.modules.node.service.impl.DataServiceImpl;
import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import jakarta.annotation.PreDestroy; import jakarta.annotation.PreDestroy;
@ -30,6 +32,8 @@ public class TDEngineService {
private HikariDataSource hikariDataSource; private HikariDataSource hikariDataSource;
private DataServiceImpl dataService;
@Value("${tdengine.url}") @Value("${tdengine.url}")
private String url; private String url;
@ -336,6 +340,39 @@ public class TDEngineService {
} }
} }
@Async
public void updateCalFieldValues(List<CalculateRTData> values) {
StringBuilder sb = new StringBuilder(1024 * 1024);
try (Connection conn = hikariDataSource.getConnection();
Statement pstmt = conn.createStatement()) {
ListUtil.page(values, batchSize, (list) -> {
sb.setLength(0);
sb.append("insert into ");
for (CalculateRTData dv : list) {
sb.append("c");
sb.append(dv.getDeviceId());
sb.append(" using c_");
sb.append(dataService.deviceModelMap.get(dv.getDeviceId().toString()));
sb.append("_").append(dv.getIotModelField());
sb.append(" tags (");
sb.append(dv.getDeviceId());
sb.append(") values (");
sb.append(dv.getDataTime());
sb.append(",");
sb.append(dv.getDataValue());
sb.append(")");
}
try {
pstmt.executeUpdate(sb.toString());
} catch (SQLException ex) {
log.error("save yc error", ex);
}
});
} catch (SQLException ex) {
log.error(ex.getMessage());
}
}
@Async @Async
public void updateStataValues(List<RTData> values, String iotModelCode) { public void updateStataValues(List<RTData> values, String iotModelCode) {
StringBuilder sb = new StringBuilder(1024 * 1024); StringBuilder sb = new StringBuilder(1024 * 1024);

View File

@ -1,5 +1,6 @@
package com.das.modules.node.service.impl; package com.das.modules.node.service.impl;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.IdUtil;
import com.das.common.constant.MeasType; import com.das.common.constant.MeasType;
import com.das.common.utils.AdminRedisTemplate; import com.das.common.utils.AdminRedisTemplate;
@ -8,6 +9,7 @@ import com.das.modules.equipment.entity.SysIotModelField;
import com.das.modules.equipment.mapper.SysIotModelMapper; import com.das.modules.equipment.mapper.SysIotModelMapper;
import com.das.modules.node.disruptor.MessageEventFactory; import com.das.modules.node.disruptor.MessageEventFactory;
import com.das.modules.node.disruptor.TerminalMessageEventHandler; import com.das.modules.node.disruptor.TerminalMessageEventHandler;
import com.das.modules.node.domain.bo.CalculateRTData;
import com.das.modules.node.domain.bo.RTData; import com.das.modules.node.domain.bo.RTData;
import com.das.modules.node.domain.bo.TerminalMessage; import com.das.modules.node.domain.bo.TerminalMessage;
import com.das.modules.node.domain.vo.*; import com.das.modules.node.domain.vo.*;
@ -44,6 +46,8 @@ public class DataServiceImpl implements DataService {
private RingBuffer<TerminalMessage> ringBuffer = null; private RingBuffer<TerminalMessage> ringBuffer = null;
public static final int COMMIT_NUMBER = 1000;
@Resource @Resource
TerminalMessageEventHandler terminalMessageEventHandler; TerminalMessageEventHandler terminalMessageEventHandler;
@ -65,12 +69,19 @@ public class DataServiceImpl implements DataService {
@Autowired @Autowired
TDEngineService tdEngineService; TDEngineService tdEngineService;
//key:deviceId value:modelCode
public ConcurrentHashMap<String, String> deviceModelMap = new ConcurrentHashMap<>(10000);
//key:modelId value:modelCode
public ConcurrentHashMap<String, String> iotModelMap = new ConcurrentHashMap<>(10000); public ConcurrentHashMap<String, String> iotModelMap = new ConcurrentHashMap<>(10000);
//key:modelCode value:Filed:Code,dataType
public ConcurrentHashMap<String, Map<String, Object>> highIotFieldMap = new ConcurrentHashMap<>(10000); public ConcurrentHashMap<String, Map<String, Object>> highIotFieldMap = new ConcurrentHashMap<>(10000);
//key:modelCode value:Filed:Code,dataType
public ConcurrentHashMap<String, Map<String, Object>> lowIotFieldMap = new ConcurrentHashMap<>(10000); public ConcurrentHashMap<String, Map<String, Object>> lowIotFieldMap = new ConcurrentHashMap<>(10000);
//key:modelCode value:Filed:Code,dataType
public ConcurrentHashMap<String, Map<String, String>> calculateIotFieldMap = new ConcurrentHashMap<>(10000); public ConcurrentHashMap<String, Map<String, String>> calculateIotFieldMap = new ConcurrentHashMap<>(10000);
@PostConstruct @PostConstruct
@ -143,42 +154,42 @@ public class DataServiceImpl implements DataService {
ObjectNode equipJsonNode = JSON_MAPPER.convertValue(map, ObjectNode.class); ObjectNode equipJsonNode = JSON_MAPPER.convertValue(map, ObjectNode.class);
List<NewIotModelVo> attrs = new ArrayList<>(); List<NewIotModelVo> attrs = new ArrayList<>();
List<NewIotModelVo> services= new ArrayList<>(); List<NewIotModelVo> services = new ArrayList<>();
List<SysTabMappingVo> tabMappingVoList = sysImptabmappingMapper.getMappingInfoListByLinkIdAndDeviceId(sysCommunicationLinkVo.getId(), dev.getId()); List<SysTabMappingVo> tabMappingVoList = sysImptabmappingMapper.getMappingInfoListByLinkIdAndDeviceId(sysCommunicationLinkVo.getId(), dev.getId());
if (!CollectionUtils.isEmpty(tabMappingVoList)) { if (!CollectionUtils.isEmpty(tabMappingVoList)) {
for (SysTabMappingVo info : tabMappingVoList) { for (SysTabMappingVo info : tabMappingVoList) {
NewIotModelVo newIotModelVo = new NewIotModelVo(); NewIotModelVo newIotModelVo = new NewIotModelVo();
newIotModelVo.setName(info.getMeasPointCode()); newIotModelVo.setName(info.getMeasPointCode());
if (info.getParams() == null) { if (info.getParams() == null) {
newIotModelVo.setParams(equipJsonNode); newIotModelVo.setParams(equipJsonNode);
} else { } else {
newIotModelVo.setParams(JSON_MAPPER.readTree(info.getParams())); newIotModelVo.setParams(JSON_MAPPER.readTree(info.getParams()));
} }
Integer pointType = info.getMeasPointType(); Integer pointType = info.getMeasPointType();
if (pointType == MeasType.TYPE_PSR_ANALOG) { if (pointType == MeasType.TYPE_PSR_ANALOG) {
newIotModelVo.setHighSpeed(info.getHighSpeed()); newIotModelVo.setHighSpeed(info.getHighSpeed());
newIotModelVo.setType("yc"); newIotModelVo.setType("yc");
attrs.add(newIotModelVo); attrs.add(newIotModelVo);
} else if (pointType == MeasType.TYPE_PSR_ACCUMULATOR) { } else if (pointType == MeasType.TYPE_PSR_ACCUMULATOR) {
newIotModelVo.setHighSpeed(info.getHighSpeed()); newIotModelVo.setHighSpeed(info.getHighSpeed());
newIotModelVo.setType("ym"); newIotModelVo.setType("ym");
attrs.add(newIotModelVo); attrs.add(newIotModelVo);
} else if (pointType == MeasType.TYPE_PSR_DISCRETE) { } else if (pointType == MeasType.TYPE_PSR_DISCRETE) {
newIotModelVo.setHighSpeed(info.getHighSpeed()); newIotModelVo.setHighSpeed(info.getHighSpeed());
newIotModelVo.setType("yx"); newIotModelVo.setType("yx");
attrs.add(newIotModelVo); attrs.add(newIotModelVo);
}else if (pointType == MeasType.TYPE_PSR_SET_POINT) { } else if (pointType == MeasType.TYPE_PSR_SET_POINT) {
newIotModelVo.setType("yt"); newIotModelVo.setType("yt");
services.add(newIotModelVo); services.add(newIotModelVo);
} else if (pointType == MeasType.TYPE_PSR_CONTROL) { } else if (pointType == MeasType.TYPE_PSR_CONTROL) {
newIotModelVo.setType("yk"); newIotModelVo.setType("yk");
services.add(newIotModelVo); services.add(newIotModelVo);
} }
} }
} }
@ -198,7 +209,7 @@ public class DataServiceImpl implements DataService {
configUpdateVo.setVersion(1); configUpdateVo.setVersion(1);
configUpdateVo.setLinks(links); configUpdateVo.setLinks(links);
configUpdateVo.setEquipments(equipments); configUpdateVo.setEquipments(equipments);
log.info("下发配置为{}",configUpdateVo); log.info("下发配置为{}", configUpdateVo);
JsonNode jsonNode = JSON_MAPPER.valueToTree(configUpdateVo); JsonNode jsonNode = JSON_MAPPER.valueToTree(configUpdateVo);
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
@ -220,8 +231,8 @@ public class DataServiceImpl implements DataService {
String key = String.valueOf(item.getId()); String key = String.valueOf(item.getId());
iotModelMap.put(key, item.getIotModelCode()); iotModelMap.put(key, item.getIotModelCode());
List<SysIotModelField> allIotModelField = sysIotModelMapper.getAllIotModelField(item.getId()); List<SysIotModelField> allIotModelField = sysIotModelMapper.getAllIotModelField(item.getId());
Map<String,String> LowModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 0 && field.getAttributeType() != 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1)); Map<String, String> LowModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 0 && field.getAttributeType() != 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1));
Map<String,String> HighModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 1 && field.getAttributeType() != 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1)); Map<String, String> HighModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 1 && field.getAttributeType() != 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1));
Map<String, String> calculateFieldList = allIotModelField.stream().filter(field -> field.getAttributeType() == 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1)); Map<String, String> calculateFieldList = allIotModelField.stream().filter(field -> field.getAttributeType() == 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1));
Map<String, Object> map = new HashMap<>(); Map<String, Object> map = new HashMap<>();
for (String field : HighModelFieldList.keySet()) { for (String field : HighModelFieldList.keySet()) {
@ -233,9 +244,9 @@ public class DataServiceImpl implements DataService {
lowMap.put(field, LowModelFieldList.get(field)); lowMap.put(field, LowModelFieldList.get(field));
} }
lowIotFieldMap.put(item.getIotModelCode(), lowMap); lowIotFieldMap.put(item.getIotModelCode(), lowMap);
calculateIotFieldMap.put(item.getIotModelCode(),calculateFieldList); calculateIotFieldMap.put(item.getIotModelCode(), calculateFieldList);
} }
tdEngineService.initIotModel(allIotModel, highIotFieldMap, lowIotFieldMap,calculateIotFieldMap); tdEngineService.initIotModel(allIotModel, highIotFieldMap, lowIotFieldMap, calculateIotFieldMap);
} }
@Override @Override
@ -250,7 +261,7 @@ public class DataServiceImpl implements DataService {
while (keysHigh.hasNext()) { while (keysHigh.hasNext()) {
String fieldName = keysHigh.next(); String fieldName = keysHigh.next();
String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase()); String key = String.format("RT:%s:%s", deviceId, fieldName.toLowerCase());
keyValueMap.put(key,values.get(fieldName)); keyValueMap.put(key, values.get(fieldName));
} }
adminRedisTemplate.mSet(keyValueMap); adminRedisTemplate.mSet(keyValueMap);
} }
@ -292,7 +303,7 @@ public class DataServiceImpl implements DataService {
String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong()); String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong());
//Low数据 //Low数据
Iterator<String> keysLow= values.fieldNames(); Iterator<String> keysLow = values.fieldNames();
while (keysLow.hasNext()) { while (keysLow.hasNext()) {
String fieldName = keysLow.next(); String fieldName = keysLow.next();
keyValueMap.put(fieldName, values.get(fieldName)); keyValueMap.put(fieldName, values.get(fieldName));
@ -309,4 +320,18 @@ public class DataServiceImpl implements DataService {
tdEngineService.updateYCLowValues(highList, iotModelCode); tdEngineService.updateYCLowValues(highList, iotModelCode);
} }
@Override
public void updateCalFieldData(List<CalculateRTData> calValues) {
//更新数据至redis,TD
ListUtil.page(calValues, COMMIT_NUMBER, list -> {
Map<String, Object> keyValueMap = new HashMap<>();
for (CalculateRTData value : list) {
String key = String.format("RT:%s:%s", value.getDeviceId(), value.getIotModelField().toLowerCase());
keyValueMap.put(key, value.getDataValue());
}
adminRedisTemplate.mSet(keyValueMap);
tdEngineService.updateCalFieldValues(list);
});
}
} }