diff --git a/das/src/main/java/com/das/modules/node/service/TDEngineService.java b/das/src/main/java/com/das/modules/node/service/TDEngineService.java index 257bf2be..2af353b4 100644 --- a/das/src/main/java/com/das/modules/node/service/TDEngineService.java +++ b/das/src/main/java/com/das/modules/node/service/TDEngineService.java @@ -19,6 +19,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @Slf4j @@ -58,33 +59,63 @@ public class TDEngineService { } // 遍历所有的物模型存入内存中 - public void initIotModel(List allIotModel, ConcurrentHashMap> iotFieldMap) { + public void initIotModel(List allIotModel, ConcurrentHashMap> highIotFieldMap,ConcurrentHashMap> lowIotFieldMap) { // 创建物模型超级表 - StringBuilder sb = new StringBuilder(1024*1024); + try (Connection conn = hikariDataSource.getConnection(); Statement pstmt = conn.createStatement()) { - ListUtil.page(allIotModel, batchSize, (list)->{ - sb.setLength(0); - for (IotModelFieldVo info : list) { - sb.append("CREATE STABLE IF NOT EXISTS "); - sb.append(info.getIotModelCode()); - sb.append(" (`updatetime` TIMESTAMP"); - Map map = iotFieldMap.get(info.getIotModelCode()); - // 使用增强的 for 循环遍历键 - for (String key : map.keySet()) { - sb.append(", "); - sb.append(key); - sb.append(" float"); - } - sb.append(") TAGS (`deviceid` BIGINT);"); - } - try { - pstmt.executeUpdate(sb.toString()); - } catch (SQLException ex) { - log.error("save yx error", ex); - } + ListUtil.page(allIotModel, batchSize, list ->{ + for (IotModelFieldVo info : list) { + StringBuilder sb = new StringBuilder(1024*1024); + sb.setLength(0); + Map map = highIotFieldMap.get(info.getIotModelCode()); + Set keySet = map.keySet(); + if (keySet.size() != 0){ + sb.append("CREATE STABLE IF NOT EXISTS "); + sb.append("h_").append(info.getIotModelCode()); + sb.append(" (`updatetime` TIMESTAMP"); + + // 使用增强的 for 循环遍历键 + for (String key : map.keySet()) { + sb.append(", "); + sb.append(key); + sb.append(" float"); + } + sb.append(") TAGS (`deviceid` BIGINT);"); + try { + System.out.println(sb.toString()); + pstmt.executeUpdate(sb.toString()); + } catch (SQLException ex) { + log.error("save yx error", ex); + } + } + + } + for (IotModelFieldVo info : list) { + StringBuilder sb = new StringBuilder(1024*1024); + sb.setLength(0); + Map map = lowIotFieldMap.get(info.getIotModelCode()); + if (map.keySet().size() != 0){ + sb.append("CREATE STABLE IF NOT EXISTS "); + sb.append("l_").append(info.getIotModelCode()); + sb.append(" (`updatetime` TIMESTAMP"); + // 使用增强的 for 循环遍历键 + for (String key : map.keySet()) { + sb.append(", "); + sb.append(key); + sb.append(" float"); + } + sb.append(") TAGS (`deviceid` BIGINT);"); + try { + System.out.println(sb.toString()); + pstmt.executeUpdate(sb.toString()); + } catch (SQLException ex) { + log.error("save yx error", ex); + } + } + } }); } catch (SQLException ex) { log.error(ex.getMessage()); diff --git a/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java b/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java index 5bb535fe..73b4fb9d 100644 --- a/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java +++ b/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java @@ -3,6 +3,7 @@ package com.das.modules.node.service.impl; import com.das.common.constant.BaseIotModelType; import com.das.common.utils.AdminRedisTemplate; import com.das.modules.equipment.domain.vo.IotModelFieldVo; +import com.das.modules.equipment.entity.SysIotModelField; import com.das.modules.equipment.mapper.SysIotModelMapper; import com.das.modules.node.disruptor.MessageEventFactory; import com.das.modules.node.disruptor.TerminalMessageEventHandler; @@ -35,6 +36,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.stream.Collectors; @Slf4j @Service @@ -66,7 +68,9 @@ public class DataServiceImpl implements DataService { private ConcurrentHashMap iotModelMap = new ConcurrentHashMap<>(10000); - public ConcurrentHashMap> iotFieldMap = new ConcurrentHashMap<>(10000); + public ConcurrentHashMap> highIotFieldMap = new ConcurrentHashMap<>(10000); + + public ConcurrentHashMap> lowIotFieldMap = new ConcurrentHashMap<>(10000); public static final String DEVICE_DATA = "deviceData:{0}"; @@ -217,14 +221,21 @@ public class DataServiceImpl implements DataService { for (IotModelFieldVo item : allIotModel) { String key = String.valueOf(item.getId()); iotModelMap.put(key, item.getIotModelCode()); - List modelFieldList = sysIotModelMapper.getAllIotModelField(item.getId()); + List allIotModelField = sysIotModelMapper.getAllIotModelField(item.getId()); + List LowModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 0).map(SysIotModelField::getAttributeCode).collect(Collectors.toList()); + List HighModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 1).map(SysIotModelField::getAttributeCode).collect(Collectors.toList()); Map map = new HashMap<>(); - for (String field : modelFieldList) { + for (String field : HighModelFieldList) { map.put(field, null); } - iotFieldMap.put(item.getIotModelCode(), map); + highIotFieldMap.put(item.getIotModelCode(), map); + Map lowMap = new HashMap<>(); + for (String field : LowModelFieldList) { + lowMap.put(field, null); + } + lowIotFieldMap.put(item.getIotModelCode(), lowMap); } - tdEngineService.initIotModel(allIotModel, iotFieldMap); + tdEngineService.initIotModel(allIotModel, highIotFieldMap,lowIotFieldMap); } @Override