tdengine数据按设备分为高频和低频表

This commit is contained in:
huguanghan 2024-10-11 17:30:44 +08:00
parent 81ccbd9c04
commit 10c0014253
2 changed files with 69 additions and 27 deletions

View File

@ -19,6 +19,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@ -58,33 +59,63 @@ public class TDEngineService {
}
// 遍历所有的物模型存入内存中
public void initIotModel(List<IotModelFieldVo> allIotModel, ConcurrentHashMap<String,Map<String, Object>> iotFieldMap) {
public void initIotModel(List<IotModelFieldVo> allIotModel, ConcurrentHashMap<String,Map<String, Object>> highIotFieldMap,ConcurrentHashMap<String,Map<String, Object>> lowIotFieldMap) {
// 创建物模型超级表
StringBuilder sb = new StringBuilder(1024*1024);
try (Connection conn = hikariDataSource.getConnection();
Statement pstmt = conn.createStatement()) {
ListUtil.page(allIotModel, batchSize, (list)->{
sb.setLength(0);
for (IotModelFieldVo info : list) {
sb.append("CREATE STABLE IF NOT EXISTS ");
sb.append(info.getIotModelCode());
sb.append(" (`updatetime` TIMESTAMP");
Map<String, Object> map = iotFieldMap.get(info.getIotModelCode());
// 使用增强的 for 循环遍历键
for (String key : map.keySet()) {
sb.append(", ");
sb.append(key);
sb.append(" float");
}
sb.append(") TAGS (`deviceid` BIGINT);");
}
try {
pstmt.executeUpdate(sb.toString());
} catch (SQLException ex) {
log.error("save yx error", ex);
}
ListUtil.page(allIotModel, batchSize, list ->{
for (IotModelFieldVo info : list) {
StringBuilder sb = new StringBuilder(1024*1024);
sb.setLength(0);
Map<String, Object> map = highIotFieldMap.get(info.getIotModelCode());
Set<String> keySet = map.keySet();
if (keySet.size() != 0){
sb.append("CREATE STABLE IF NOT EXISTS ");
sb.append("h_").append(info.getIotModelCode());
sb.append(" (`updatetime` TIMESTAMP");
// 使用增强的 for 循环遍历键
for (String key : map.keySet()) {
sb.append(", ");
sb.append(key);
sb.append(" float");
}
sb.append(") TAGS (`deviceid` BIGINT);");
try {
System.out.println(sb.toString());
pstmt.executeUpdate(sb.toString());
} catch (SQLException ex) {
log.error("save yx error", ex);
}
}
}
for (IotModelFieldVo info : list) {
StringBuilder sb = new StringBuilder(1024*1024);
sb.setLength(0);
Map<String, Object> map = lowIotFieldMap.get(info.getIotModelCode());
if (map.keySet().size() != 0){
sb.append("CREATE STABLE IF NOT EXISTS ");
sb.append("l_").append(info.getIotModelCode());
sb.append(" (`updatetime` TIMESTAMP");
// 使用增强的 for 循环遍历键
for (String key : map.keySet()) {
sb.append(", ");
sb.append(key);
sb.append(" float");
}
sb.append(") TAGS (`deviceid` BIGINT);");
try {
System.out.println(sb.toString());
pstmt.executeUpdate(sb.toString());
} catch (SQLException ex) {
log.error("save yx error", ex);
}
}
}
});
} catch (SQLException ex) {
log.error(ex.getMessage());

View File

@ -3,6 +3,7 @@ package com.das.modules.node.service.impl;
import com.das.common.constant.BaseIotModelType;
import com.das.common.utils.AdminRedisTemplate;
import com.das.modules.equipment.domain.vo.IotModelFieldVo;
import com.das.modules.equipment.entity.SysIotModelField;
import com.das.modules.equipment.mapper.SysIotModelMapper;
import com.das.modules.node.disruptor.MessageEventFactory;
import com.das.modules.node.disruptor.TerminalMessageEventHandler;
@ -35,6 +36,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@Slf4j
@Service
@ -66,7 +68,9 @@ public class DataServiceImpl implements DataService {
private ConcurrentHashMap<String, String> iotModelMap = new ConcurrentHashMap<>(10000);
public ConcurrentHashMap<String, Map<String, Object>> iotFieldMap = new ConcurrentHashMap<>(10000);
public ConcurrentHashMap<String, Map<String, Object>> highIotFieldMap = new ConcurrentHashMap<>(10000);
public ConcurrentHashMap<String, Map<String, Object>> lowIotFieldMap = new ConcurrentHashMap<>(10000);
public static final String DEVICE_DATA = "deviceData:{0}";
@ -217,14 +221,21 @@ public class DataServiceImpl implements DataService {
for (IotModelFieldVo item : allIotModel) {
String key = String.valueOf(item.getId());
iotModelMap.put(key, item.getIotModelCode());
List<String> modelFieldList = sysIotModelMapper.getAllIotModelField(item.getId());
List<SysIotModelField> allIotModelField = sysIotModelMapper.getAllIotModelField(item.getId());
List<String> LowModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 0).map(SysIotModelField::getAttributeCode).collect(Collectors.toList());
List<String> HighModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 1).map(SysIotModelField::getAttributeCode).collect(Collectors.toList());
Map<String, Object> map = new HashMap<>();
for (String field : modelFieldList) {
for (String field : HighModelFieldList) {
map.put(field, null);
}
iotFieldMap.put(item.getIotModelCode(), map);
highIotFieldMap.put(item.getIotModelCode(), map);
Map<String, Object> lowMap = new HashMap<>();
for (String field : LowModelFieldList) {
lowMap.put(field, null);
}
lowIotFieldMap.put(item.getIotModelCode(), lowMap);
}
tdEngineService.initIotModel(allIotModel, iotFieldMap);
tdEngineService.initIotModel(allIotModel, highIotFieldMap,lowIotFieldMap);
}
@Override