高频,低频数据tdengine入库
This commit is contained in:
parent
7a7fa73a12
commit
c4d3e3b1e9
@ -58,21 +58,131 @@ public class TDEngineService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建超级表
|
||||||
|
*/
|
||||||
|
public void createStable(String iotModelCode, String stableType, Map<String, String> fieldNameTypeMap) {
|
||||||
|
try (Connection conn = hikariDataSource.getConnection();
|
||||||
|
Statement pstmt = conn.createStatement()) {
|
||||||
|
StringBuilder sb = new StringBuilder(1024 * 1024);
|
||||||
|
sb.setLength(0);
|
||||||
|
Set<String> keySet = fieldNameTypeMap.keySet();
|
||||||
|
if (keySet.size() != 0) {
|
||||||
|
sb.append("CREATE STABLE IF NOT EXISTS ");
|
||||||
|
sb.append(stableType).append(iotModelCode);
|
||||||
|
sb.append(" (`updatetime` TIMESTAMP");
|
||||||
|
|
||||||
|
// 使用增强的 for 循环遍历键
|
||||||
|
for (String key : keySet) {
|
||||||
|
sb.append(", ");
|
||||||
|
sb.append(key);
|
||||||
|
sb.append(" " + fieldNameTypeMap.get(key));
|
||||||
|
}
|
||||||
|
sb.append(") TAGS (`deviceid` BIGINT);");
|
||||||
|
try {
|
||||||
|
System.out.println(sb.toString());
|
||||||
|
pstmt.executeUpdate(sb.toString());
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("创建超级表失败,失败原因{}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 新增超级表列
|
||||||
|
*/
|
||||||
|
public void addStableColumn(String iotModelCode, String stableType, Map<String, String> fieldNameTypeMap) {
|
||||||
|
try (Connection conn = hikariDataSource.getConnection();
|
||||||
|
Statement pstmt = conn.createStatement()) {
|
||||||
|
StringBuilder sb = new StringBuilder(1024 * 1024);
|
||||||
|
sb.setLength(0);
|
||||||
|
Set<String> keySet = fieldNameTypeMap.keySet();
|
||||||
|
if (keySet.size() != 0) {
|
||||||
|
for (String key : keySet) {
|
||||||
|
sb.append("ALTER STABLE ");
|
||||||
|
sb.append(stableType).append(iotModelCode);
|
||||||
|
sb.append(" ADD COLUMN ");
|
||||||
|
sb.append(key);
|
||||||
|
sb.append(" " + fieldNameTypeMap.get(key));
|
||||||
|
sb.append(";");
|
||||||
|
try {
|
||||||
|
pstmt.executeUpdate(sb.toString());
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("新增超级表列失败:{},失败原因{}", sb.toString(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除超级表列
|
||||||
|
*/
|
||||||
|
public void deleteColumn(String stableName, String fieldCode) {
|
||||||
|
try (Connection conn = hikariDataSource.getConnection();
|
||||||
|
Statement pstmt = conn.createStatement()) {
|
||||||
|
StringBuilder sb = new StringBuilder(1024 * 1024);
|
||||||
|
sb.setLength(0);
|
||||||
|
sb.append("ALTER STABLE ");
|
||||||
|
sb.append(stableName);
|
||||||
|
sb.append(" DROP COLUMN");;
|
||||||
|
sb.append(fieldCode);
|
||||||
|
sb.append(";");
|
||||||
|
try {
|
||||||
|
pstmt.executeUpdate(sb.toString());
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("删除超级表列失败:{},失败原因{}", sb.toString(), e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}catch (Exception ignored){
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除超级表
|
||||||
|
*/
|
||||||
|
public void deleteStable(String stableName){
|
||||||
|
try (Connection conn = hikariDataSource.getConnection();
|
||||||
|
Statement pstmt = conn.createStatement()) {
|
||||||
|
StringBuilder sb = new StringBuilder(1024 * 1024);
|
||||||
|
sb.setLength(0);
|
||||||
|
sb.append("DROP STABLE ");
|
||||||
|
sb.append(stableName);
|
||||||
|
sb.append(";");
|
||||||
|
try {
|
||||||
|
pstmt.executeUpdate(sb.toString());
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("删除超级表失败:{},失败原因{}", sb.toString(), e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}catch (Exception ignored){
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// 遍历所有的物模型存入内存中
|
// 遍历所有的物模型存入内存中
|
||||||
public void initIotModel(List<IotModelFieldVo> allIotModel, ConcurrentHashMap<String,Map<String, Object>> highIotFieldMap,ConcurrentHashMap<String,Map<String, Object>> lowIotFieldMap) {
|
public void initIotModel(List<IotModelFieldVo> allIotModel, ConcurrentHashMap<String, Map<String, Object>> highIotFieldMap, ConcurrentHashMap<String, Map<String, Object>> lowIotFieldMap) {
|
||||||
// 创建物模型超级表
|
// 创建物模型超级表
|
||||||
|
|
||||||
try (Connection conn = hikariDataSource.getConnection();
|
try (Connection conn = hikariDataSource.getConnection();
|
||||||
Statement pstmt = conn.createStatement()) {
|
Statement pstmt = conn.createStatement()) {
|
||||||
|
|
||||||
ListUtil.page(allIotModel, batchSize, list ->{
|
ListUtil.page(allIotModel, batchSize, list -> {
|
||||||
|
|
||||||
for (IotModelFieldVo info : list) {
|
for (IotModelFieldVo info : list) {
|
||||||
StringBuilder sb = new StringBuilder(1024*1024);
|
StringBuilder sb = new StringBuilder(1024 * 1024);
|
||||||
sb.setLength(0);
|
sb.setLength(0);
|
||||||
Map<String, Object> map = highIotFieldMap.get(info.getIotModelCode());
|
Map<String, Object> map = highIotFieldMap.get(info.getIotModelCode());
|
||||||
Set<String> keySet = map.keySet();
|
Set<String> keySet = map.keySet();
|
||||||
if (keySet.size() != 0){
|
if (keySet.size() != 0) {
|
||||||
sb.append("CREATE STABLE IF NOT EXISTS ");
|
sb.append("CREATE STABLE IF NOT EXISTS ");
|
||||||
sb.append("h_").append(info.getIotModelCode());
|
sb.append("h_").append(info.getIotModelCode());
|
||||||
sb.append(" (`updatetime` TIMESTAMP");
|
sb.append(" (`updatetime` TIMESTAMP");
|
||||||
@ -81,7 +191,7 @@ public class TDEngineService {
|
|||||||
for (String key : map.keySet()) {
|
for (String key : map.keySet()) {
|
||||||
sb.append(", ");
|
sb.append(", ");
|
||||||
sb.append(key);
|
sb.append(key);
|
||||||
sb.append(" float");
|
sb.append(" "+map.get(key));
|
||||||
}
|
}
|
||||||
sb.append(") TAGS (`deviceid` BIGINT);");
|
sb.append(") TAGS (`deviceid` BIGINT);");
|
||||||
try {
|
try {
|
||||||
@ -94,10 +204,10 @@ public class TDEngineService {
|
|||||||
|
|
||||||
}
|
}
|
||||||
for (IotModelFieldVo info : list) {
|
for (IotModelFieldVo info : list) {
|
||||||
StringBuilder sb = new StringBuilder(1024*1024);
|
StringBuilder sb = new StringBuilder(1024 * 1024);
|
||||||
sb.setLength(0);
|
sb.setLength(0);
|
||||||
Map<String, Object> map = lowIotFieldMap.get(info.getIotModelCode());
|
Map<String, Object> map = lowIotFieldMap.get(info.getIotModelCode());
|
||||||
if (map.keySet().size() != 0){
|
if (map.keySet().size() != 0) {
|
||||||
sb.append("CREATE STABLE IF NOT EXISTS ");
|
sb.append("CREATE STABLE IF NOT EXISTS ");
|
||||||
sb.append("l_").append(info.getIotModelCode());
|
sb.append("l_").append(info.getIotModelCode());
|
||||||
sb.append(" (`updatetime` TIMESTAMP");
|
sb.append(" (`updatetime` TIMESTAMP");
|
||||||
@ -105,7 +215,7 @@ public class TDEngineService {
|
|||||||
for (String key : map.keySet()) {
|
for (String key : map.keySet()) {
|
||||||
sb.append(", ");
|
sb.append(", ");
|
||||||
sb.append(key);
|
sb.append(key);
|
||||||
sb.append(" float");
|
sb.append(" "+map.get(key));
|
||||||
}
|
}
|
||||||
sb.append(") TAGS (`deviceid` BIGINT);");
|
sb.append(") TAGS (`deviceid` BIGINT);");
|
||||||
try {
|
try {
|
||||||
@ -125,16 +235,16 @@ public class TDEngineService {
|
|||||||
|
|
||||||
@Async
|
@Async
|
||||||
public void updateYCHighValues(List<RTData> values, String iotModelCode) {
|
public void updateYCHighValues(List<RTData> values, String iotModelCode) {
|
||||||
StringBuilder sb = new StringBuilder(1024*1024);
|
StringBuilder sb = new StringBuilder(1024 * 1024);
|
||||||
try (Connection conn = hikariDataSource.getConnection();
|
try (Connection conn = hikariDataSource.getConnection();
|
||||||
Statement pstmt = conn.createStatement()) {
|
Statement pstmt = conn.createStatement()) {
|
||||||
ListUtil.page(values, batchSize, (list)->{
|
ListUtil.page(values, batchSize, (list) -> {
|
||||||
sb.setLength(0);
|
sb.setLength(0);
|
||||||
sb.append("insert into ");
|
sb.append("insert into ");
|
||||||
for (RTData dv : list) {
|
for (RTData dv : list) {
|
||||||
sb.append("h");
|
sb.append("h");
|
||||||
sb.append(dv.getDeviceId());
|
sb.append(dv.getDeviceId());
|
||||||
sb.append(" using h_" );
|
sb.append(" using h_");
|
||||||
sb.append(iotModelCode);
|
sb.append(iotModelCode);
|
||||||
sb.append(" tags (");
|
sb.append(" tags (");
|
||||||
sb.append(dv.getDeviceId());
|
sb.append(dv.getDeviceId());
|
||||||
@ -163,16 +273,16 @@ public class TDEngineService {
|
|||||||
|
|
||||||
@Async
|
@Async
|
||||||
public void updateYCLowValues(List<RTData> values, String iotModelCode) {
|
public void updateYCLowValues(List<RTData> values, String iotModelCode) {
|
||||||
StringBuilder sb = new StringBuilder(1024*1024);
|
StringBuilder sb = new StringBuilder(1024 * 1024);
|
||||||
try (Connection conn = hikariDataSource.getConnection();
|
try (Connection conn = hikariDataSource.getConnection();
|
||||||
Statement pstmt = conn.createStatement()) {
|
Statement pstmt = conn.createStatement()) {
|
||||||
ListUtil.page(values, batchSize, (list)->{
|
ListUtil.page(values, batchSize, (list) -> {
|
||||||
sb.setLength(0);
|
sb.setLength(0);
|
||||||
sb.append("insert into ");
|
sb.append("insert into ");
|
||||||
for (RTData dv : list) {
|
for (RTData dv : list) {
|
||||||
sb.append("l");
|
sb.append("l");
|
||||||
sb.append(dv.getDeviceId());
|
sb.append(dv.getDeviceId());
|
||||||
sb.append(" using l_" );
|
sb.append(" using l_");
|
||||||
sb.append(iotModelCode);
|
sb.append(iotModelCode);
|
||||||
sb.append(" tags (");
|
sb.append(" tags (");
|
||||||
sb.append(dv.getDeviceId());
|
sb.append(dv.getDeviceId());
|
||||||
@ -201,16 +311,16 @@ public class TDEngineService {
|
|||||||
|
|
||||||
@Async
|
@Async
|
||||||
public void updateStataValues(List<RTData> values, String iotModelCode) {
|
public void updateStataValues(List<RTData> values, String iotModelCode) {
|
||||||
StringBuilder sb = new StringBuilder(1024*1024);
|
StringBuilder sb = new StringBuilder(1024 * 1024);
|
||||||
try (Connection conn = hikariDataSource.getConnection();
|
try (Connection conn = hikariDataSource.getConnection();
|
||||||
Statement pstmt = conn.createStatement()) {
|
Statement pstmt = conn.createStatement()) {
|
||||||
ListUtil.page(values, batchSize, (list)->{
|
ListUtil.page(values, batchSize, (list) -> {
|
||||||
sb.setLength(0);
|
sb.setLength(0);
|
||||||
sb.append("insert into ");
|
sb.append("insert into ");
|
||||||
for (RTData dv : list) {
|
for (RTData dv : list) {
|
||||||
sb.append("d");
|
sb.append("d");
|
||||||
sb.append(dv.getDeviceId());
|
sb.append(dv.getDeviceId());
|
||||||
sb.append(" using " );
|
sb.append(" using ");
|
||||||
sb.append(iotModelCode);
|
sb.append(iotModelCode);
|
||||||
sb.append(" tags (");
|
sb.append(" tags (");
|
||||||
sb.append(dv.getDeviceId());
|
sb.append(dv.getDeviceId());
|
||||||
|
@ -218,16 +218,16 @@ public class DataServiceImpl implements DataService {
|
|||||||
String key = String.valueOf(item.getId());
|
String key = String.valueOf(item.getId());
|
||||||
iotModelMap.put(key, item.getIotModelCode());
|
iotModelMap.put(key, item.getIotModelCode());
|
||||||
List<SysIotModelField> allIotModelField = sysIotModelMapper.getAllIotModelField(item.getId());
|
List<SysIotModelField> allIotModelField = sysIotModelMapper.getAllIotModelField(item.getId());
|
||||||
List<String> LowModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 0).map(SysIotModelField::getAttributeCode).collect(Collectors.toList());
|
Map<String,String> LowModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 0).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1));
|
||||||
List<String> HighModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 1).map(SysIotModelField::getAttributeCode).collect(Collectors.toList());
|
Map<String,String> HighModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 1).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1));
|
||||||
Map<String, Object> map = new HashMap<>();
|
Map<String, Object> map = new HashMap<>();
|
||||||
for (String field : HighModelFieldList) {
|
for (String field : HighModelFieldList.keySet()) {
|
||||||
map.put(field, null);
|
map.put(field, HighModelFieldList.get(field));
|
||||||
}
|
}
|
||||||
highIotFieldMap.put(item.getIotModelCode(), map);
|
highIotFieldMap.put(item.getIotModelCode(), map);
|
||||||
Map<String, Object> lowMap = new HashMap<>();
|
Map<String, Object> lowMap = new HashMap<>();
|
||||||
for (String field : LowModelFieldList) {
|
for (String field : LowModelFieldList.keySet()) {
|
||||||
lowMap.put(field, null);
|
lowMap.put(field, LowModelFieldList.get(field));
|
||||||
}
|
}
|
||||||
lowIotFieldMap.put(item.getIotModelCode(), lowMap);
|
lowIotFieldMap.put(item.getIotModelCode(), lowMap);
|
||||||
}
|
}
|
||||||
|
@ -77,7 +77,7 @@
|
|||||||
where se.id = #{id}
|
where se.id = #{id}
|
||||||
</select>
|
</select>
|
||||||
<select id="getAllIotModelField" resultType="com.das.modules.equipment.entity.SysIotModelField">
|
<select id="getAllIotModelField" resultType="com.das.modules.equipment.entity.SysIotModelField">
|
||||||
select simf.attribute_code as attributeCode,simf.highspeed as highSpeed from sys_iot_model_field simf where simf.iot_model_id = #{id} order by simf.attribute_code
|
select simf.attribute_code as attributeCode,simf.highspeed as highSpeed,simf.datatype as dataType from sys_iot_model_field simf where simf.iot_model_id = #{id} order by simf.attribute_code
|
||||||
</select>
|
</select>
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user