tdengine数据按设备分为高频和低频表,入库,redis数据同步
This commit is contained in:
parent
aaa9ba6283
commit
c87c0de17b
@ -16,6 +16,7 @@ public class AnalogDataCommand implements BaseCommand {
|
|||||||
@Override
|
@Override
|
||||||
public void doCommand(TerminalMessage data) {
|
public void doCommand(TerminalMessage data) {
|
||||||
try {
|
try {
|
||||||
|
//analogData值只存入redis
|
||||||
dataService.handleData(data);
|
dataService.handleData(data);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -17,6 +17,7 @@ public class StateDataCommand implements BaseCommand {
|
|||||||
@Override
|
@Override
|
||||||
public void doCommand(TerminalMessage data) {
|
public void doCommand(TerminalMessage data) {
|
||||||
try {
|
try {
|
||||||
|
//只存入redis
|
||||||
dataService.handleData(data);
|
dataService.handleData(data);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("解析数据异常", e);
|
log.error("解析数据异常", e);
|
||||||
|
@ -17,4 +17,10 @@ public interface NodeConstant {
|
|||||||
String ANALOG_DATA = "analogData";
|
String ANALOG_DATA = "analogData";
|
||||||
|
|
||||||
String STATE_DATA = "stateData";
|
String STATE_DATA = "stateData";
|
||||||
|
|
||||||
|
String HIS_ANALOG_DATA = "historyStateData";
|
||||||
|
|
||||||
|
String HIS_STATE_DATA = "historyStateData";
|
||||||
|
|
||||||
|
String DEVICE_CONTROL_RESP = "deviceControlResp";
|
||||||
}
|
}
|
||||||
|
@ -25,4 +25,6 @@ public class IotModelVo {
|
|||||||
private String equipmentService;
|
private String equipmentService;
|
||||||
|
|
||||||
private Object params;
|
private Object params;
|
||||||
|
|
||||||
|
private Integer highSpeed;
|
||||||
}
|
}
|
||||||
|
@ -9,5 +9,7 @@ public class NewIotModelVo {
|
|||||||
|
|
||||||
private String type;
|
private String type;
|
||||||
|
|
||||||
|
private Integer highSpeed;
|
||||||
|
|
||||||
private Object params;
|
private Object params;
|
||||||
}
|
}
|
||||||
|
@ -124,7 +124,7 @@ public class TDEngineService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Async
|
@Async
|
||||||
public void updateYCValues(List<RTData> values, String iotModelCode) {
|
public void updateYCHighValues(List<RTData> values, String iotModelCode) {
|
||||||
StringBuilder sb = new StringBuilder(1024*1024);
|
StringBuilder sb = new StringBuilder(1024*1024);
|
||||||
try (Connection conn = hikariDataSource.getConnection();
|
try (Connection conn = hikariDataSource.getConnection();
|
||||||
Statement pstmt = conn.createStatement()) {
|
Statement pstmt = conn.createStatement()) {
|
||||||
@ -132,12 +132,54 @@ public class TDEngineService {
|
|||||||
sb.setLength(0);
|
sb.setLength(0);
|
||||||
sb.append("insert into ");
|
sb.append("insert into ");
|
||||||
for (RTData dv : list) {
|
for (RTData dv : list) {
|
||||||
sb.append("d");
|
sb.append("h");
|
||||||
sb.append(dv.getDeviceId());
|
sb.append(dv.getDeviceId());
|
||||||
sb.append(" using " );
|
sb.append(" using h_" );
|
||||||
sb.append(iotModelCode);
|
sb.append(iotModelCode);
|
||||||
sb.append(" tags (");
|
sb.append(" tags (");
|
||||||
sb.append(dv.getDeviceId());
|
sb.append(dv.getDeviceId());
|
||||||
|
sb.append(") (");
|
||||||
|
dv.getValues().forEach((key, value) ->
|
||||||
|
sb.append(",").append(key)
|
||||||
|
);
|
||||||
|
sb.append(") values (");
|
||||||
|
sb.append(dv.getDataTime());
|
||||||
|
|
||||||
|
dv.getValues().forEach((key, value) ->
|
||||||
|
sb.append(",").append(value)
|
||||||
|
);
|
||||||
|
sb.append(")");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
pstmt.executeUpdate(sb.toString());
|
||||||
|
} catch (SQLException ex) {
|
||||||
|
log.error("save yc error", ex);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (SQLException ex) {
|
||||||
|
log.error(ex.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Async
|
||||||
|
public void updateYCLowValues(List<RTData> values, String iotModelCode) {
|
||||||
|
StringBuilder sb = new StringBuilder(1024*1024);
|
||||||
|
try (Connection conn = hikariDataSource.getConnection();
|
||||||
|
Statement pstmt = conn.createStatement()) {
|
||||||
|
ListUtil.page(values, batchSize, (list)->{
|
||||||
|
sb.setLength(0);
|
||||||
|
sb.append("insert into ");
|
||||||
|
for (RTData dv : list) {
|
||||||
|
sb.append("l");
|
||||||
|
sb.append(dv.getDeviceId());
|
||||||
|
sb.append(" using l_" );
|
||||||
|
sb.append(iotModelCode);
|
||||||
|
sb.append(" tags (");
|
||||||
|
sb.append(dv.getDeviceId());
|
||||||
|
sb.append(") (");
|
||||||
|
dv.getValues().forEach((key, value) ->
|
||||||
|
sb.append(",").append(key)
|
||||||
|
);
|
||||||
sb.append(") values (");
|
sb.append(") values (");
|
||||||
sb.append(dv.getDataTime());
|
sb.append(dv.getDataTime());
|
||||||
|
|
||||||
|
@ -29,10 +29,7 @@ import org.springframework.stereotype.Service;
|
|||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
import java.text.MessageFormat;
|
import java.text.MessageFormat;
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
@ -72,8 +69,6 @@ public class DataServiceImpl implements DataService {
|
|||||||
|
|
||||||
public ConcurrentHashMap<String, Map<String, Object>> lowIotFieldMap = new ConcurrentHashMap<>(10000);
|
public ConcurrentHashMap<String, Map<String, Object>> lowIotFieldMap = new ConcurrentHashMap<>(10000);
|
||||||
|
|
||||||
public static final String DEVICE_DATA = "deviceData:{0}";
|
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init() {
|
public void init() {
|
||||||
//初始化高性能队列
|
//初始化高性能队列
|
||||||
@ -151,9 +146,10 @@ public class DataServiceImpl implements DataService {
|
|||||||
List<IotModelVo> iotModelFieldList = sysImptabmappingMapper.getIotModelFieldByEquipmentId(equipmentId);
|
List<IotModelVo> iotModelFieldList = sysImptabmappingMapper.getIotModelFieldByEquipmentId(equipmentId);
|
||||||
if (!CollectionUtils.isEmpty(iotModelFieldList)) {
|
if (!CollectionUtils.isEmpty(iotModelFieldList)) {
|
||||||
for (IotModelVo info : iotModelFieldList) {
|
for (IotModelVo info : iotModelFieldList) {
|
||||||
if(info.getServiceType() == null) {
|
if (info.getServiceType() == null) {
|
||||||
NewIotModelVo newIotModelVo = new NewIotModelVo();
|
NewIotModelVo newIotModelVo = new NewIotModelVo();
|
||||||
newIotModelVo.setName(info.getEquipmentAttribute());
|
newIotModelVo.setName(info.getEquipmentAttribute());
|
||||||
|
newIotModelVo.setHighSpeed(info.getHighSpeed());
|
||||||
if (info.getParams() == null) {
|
if (info.getParams() == null) {
|
||||||
newIotModelVo.setParams(equipJsonNode);
|
newIotModelVo.setParams(equipJsonNode);
|
||||||
} else {
|
} else {
|
||||||
@ -235,7 +231,7 @@ public class DataServiceImpl implements DataService {
|
|||||||
}
|
}
|
||||||
lowIotFieldMap.put(item.getIotModelCode(), lowMap);
|
lowIotFieldMap.put(item.getIotModelCode(), lowMap);
|
||||||
}
|
}
|
||||||
tdEngineService.initIotModel(allIotModel, highIotFieldMap,lowIotFieldMap);
|
tdEngineService.initIotModel(allIotModel, highIotFieldMap, lowIotFieldMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -243,33 +239,52 @@ public class DataServiceImpl implements DataService {
|
|||||||
// 先从redis里面获取设备的初始化数据
|
// 先从redis里面获取设备的初始化数据
|
||||||
JsonNode jsonNode = data.getData();
|
JsonNode jsonNode = data.getData();
|
||||||
String deviceId = jsonNode.get("deviceId").asText();
|
String deviceId = jsonNode.get("deviceId").asText();
|
||||||
String key = MessageFormat.format(DEVICE_DATA, deviceId);
|
|
||||||
HashMap<String, Object> initValue = adminRedisTemplate.get(key);
|
|
||||||
JsonNode values = jsonNode.get("values");
|
JsonNode values = jsonNode.get("values");
|
||||||
List<Map.Entry<String, Object>> entryList = new ArrayList<>(initValue.entrySet());
|
JsonNode high = values.get("high");
|
||||||
HashMap<String, Object> newHashMap = new HashMap<>();
|
JsonNode low = values.get("low");
|
||||||
for (Map.Entry<String, Object> entry : entryList) {
|
Map<String, Object> keyValueMap = new HashMap<>();
|
||||||
if(values.get(entry.getKey()) == null){
|
|
||||||
newHashMap.put(entry.getKey(), entry.getValue());
|
|
||||||
} else{
|
|
||||||
newHashMap.put(entry.getKey(), values.get(entry.getKey()).asDouble());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
adminRedisTemplate.set(key, newHashMap);
|
|
||||||
Long dataTime = data.getTime();
|
|
||||||
// 存入td库
|
|
||||||
// 根据设备ID获取对应的物模型属性
|
// 根据设备ID获取对应的物模型属性
|
||||||
String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong());
|
String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong());
|
||||||
HashMap<String, Object> tdValues = adminRedisTemplate.get(key);
|
Map<String, Object> highFieldMap = highIotFieldMap.get(iotModelCode);
|
||||||
List<RTData> list = new ArrayList<>();
|
Map<String, Object> lowFiledMap = lowIotFieldMap.get(iotModelCode);
|
||||||
RTData rtData = RTData.builder()
|
|
||||||
|
//High数据
|
||||||
|
Iterator<String> keysHigh = high.fieldNames();
|
||||||
|
while (keysHigh.hasNext()) {
|
||||||
|
String fieldName = keysHigh.next();
|
||||||
|
highFieldMap.put(fieldName, high.get(fieldName));
|
||||||
|
String key = String.format("RT:[%s]:[%s]", deviceId, fieldName);
|
||||||
|
keyValueMap.put(key, high.get(fieldName).asDouble());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
//LOW数据
|
||||||
|
Iterator<String> keysLow = low.fieldNames();
|
||||||
|
while (keysLow.hasNext()) {
|
||||||
|
String fieldName = keysLow.next();
|
||||||
|
lowFiledMap.put(fieldName, low.get(fieldName));
|
||||||
|
String key = String.format("RT:[%s]:[%s]", deviceId, fieldName);
|
||||||
|
keyValueMap.put(key, high.get(fieldName).asDouble());
|
||||||
|
}
|
||||||
|
adminRedisTemplate.mSet(keyValueMap);
|
||||||
|
Long dataTime = data.getTime();
|
||||||
|
|
||||||
|
// 存入td库
|
||||||
|
List<RTData> highList = new ArrayList<>();
|
||||||
|
List<RTData> lowList = new ArrayList<>();
|
||||||
|
RTData rtHighData = RTData.builder()
|
||||||
.dataTime(dataTime)
|
.dataTime(dataTime)
|
||||||
.deviceId(Long.valueOf(deviceId))
|
.deviceId(Long.valueOf(deviceId))
|
||||||
.values(tdValues)
|
.values(highFieldMap)
|
||||||
.build();
|
.build();
|
||||||
|
RTData rtLowData = RTData.builder()
|
||||||
list.add(rtData);
|
.dataTime(dataTime)
|
||||||
tdEngineService.updateYCValues(list, iotModelCode);
|
.deviceId(Long.valueOf(deviceId))
|
||||||
|
.values(highFieldMap)
|
||||||
|
.build();
|
||||||
|
highList.add(rtHighData);
|
||||||
|
lowList.add(rtLowData);
|
||||||
|
tdEngineService.updateYCHighValues(highList, iotModelCode);
|
||||||
|
tdEngineService.updateYCLowValues(lowList, iotModelCode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -57,7 +57,7 @@
|
|||||||
</select>
|
</select>
|
||||||
|
|
||||||
<select id="getIotModelFieldByEquipmentId" resultType="com.das.modules.node.domain.vo.IotModelVo">
|
<select id="getIotModelFieldByEquipmentId" resultType="com.das.modules.node.domain.vo.IotModelVo">
|
||||||
select simf.attribute_type as attributeType,sims.service_type as serviceType, se.iot_addr as iotAddr,si.equipment_attribute as equipmentAttribute,si.equipment_service as equipmentService, si.params from sys_imptabmapping si
|
select simf.attribute_type as attributeType,sims.service_type as serviceType, se.iot_addr as iotAddr,si.equipment_attribute as equipmentAttribute,si.equipment_service as equipmentService, si.params, simf.highspeed as highSpeed from sys_imptabmapping si
|
||||||
left join sys_equipment se on si.equipment_id = se.id
|
left join sys_equipment se on si.equipment_id = se.id
|
||||||
left join sys_iot_model_field simf on si.equipment_attribute = simf.attribute_code
|
left join sys_iot_model_field simf on si.equipment_attribute = simf.attribute_code
|
||||||
left join sys_iot_model_service sims on si.equipment_service = sims.service_code
|
left join sys_iot_model_service sims on si.equipment_service = sims.service_code
|
||||||
|
Loading…
Reference in New Issue
Block a user