事件上报新增deviceEvent
This commit is contained in:
parent
b1b986a516
commit
b86f72f629
@ -0,0 +1,26 @@
|
||||
package com.das.modules.data.domain;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class DeviceEventInfo {
|
||||
private Long updateTime;
|
||||
|
||||
private Long eventId;
|
||||
|
||||
private Integer eventLevel;
|
||||
|
||||
private String eventText;
|
||||
|
||||
private Integer confirmed;
|
||||
|
||||
private String confirmAccount;
|
||||
|
||||
private Long confirmTime;
|
||||
|
||||
private String deviceId;
|
||||
|
||||
private String deviceCode;
|
||||
|
||||
private String deviceName;
|
||||
}
|
@ -2,10 +2,12 @@ package com.das.modules.data.service;
|
||||
|
||||
import cn.hutool.core.collection.ListUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.das.modules.data.domain.DeviceEventInfo;
|
||||
import com.das.modules.data.service.impl.DataServiceImpl;
|
||||
import com.das.modules.equipment.domain.vo.IotModelFieldVo;
|
||||
import com.das.modules.node.domain.bo.CalculateRTData;
|
||||
import com.das.modules.node.domain.bo.RTData;
|
||||
import com.das.modules.node.domain.vo.DeviceEventVo;
|
||||
import com.das.modules.node.service.impl.NodeMessageServiceImpl;
|
||||
import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
@ -372,6 +374,48 @@ public class TDEngineService {
|
||||
}
|
||||
}
|
||||
|
||||
@Async
|
||||
public void updateDeviceEventValues(List<DeviceEventInfo> values) {
|
||||
StringBuilder sb = new StringBuilder(1024 * 1024);
|
||||
try (Connection conn = hikariDataSource.getConnection();
|
||||
Statement pstmt = conn.createStatement()) {
|
||||
ListUtil.page(values, batchSize, (list) -> {
|
||||
sb.setLength(0);
|
||||
sb.append("insert into ");
|
||||
for (DeviceEventInfo dv : list) {
|
||||
sb.append("E_");
|
||||
sb.append(dv.getDeviceId());
|
||||
sb.append(" using event_info tags (");
|
||||
sb.append(dv.getDeviceCode());
|
||||
sb.append(",");
|
||||
sb.append(dv.getDeviceName());
|
||||
sb.append(") values (");
|
||||
sb.append(dv.getUpdateTime());
|
||||
sb.append(",");
|
||||
sb.append(dv.getEventId());
|
||||
sb.append(",");
|
||||
sb.append(dv.getEventLevel());
|
||||
sb.append(",");
|
||||
sb.append(dv.getEventText());
|
||||
sb.append(",");
|
||||
sb.append(dv.getConfirmed());
|
||||
sb.append(",");
|
||||
sb.append(dv.getConfirmAccount());
|
||||
sb.append(",");
|
||||
sb.append(dv.getConfirmTime());
|
||||
sb.append(")");
|
||||
}
|
||||
try {
|
||||
pstmt.executeUpdate(sb.toString());
|
||||
} catch (SQLException ex) {
|
||||
log.error("save yc error", ex);
|
||||
}
|
||||
});
|
||||
} catch (SQLException ex) {
|
||||
log.error(ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Async
|
||||
public void updateStataValues(List<RTData> values, String iotModelCode) {
|
||||
StringBuilder sb = new StringBuilder(1024 * 1024);
|
||||
|
@ -51,6 +51,9 @@ public class DataServiceImpl implements DataService {
|
||||
//key:modelId value:modelCode
|
||||
public ConcurrentHashMap<String, String> iotModelMap = new ConcurrentHashMap<>(10000);
|
||||
|
||||
//key:modelId value:fieldCode fieldName
|
||||
public ConcurrentHashMap<String, Map<String, String>> fieldCodeNameMap = new ConcurrentHashMap<>(10000);
|
||||
|
||||
//key:modelCode value:FiledCode,dataType
|
||||
public ConcurrentHashMap<String, Map<String, Object>> highIotFieldMap = new ConcurrentHashMap<>(10000);
|
||||
|
||||
@ -196,6 +199,7 @@ public class DataServiceImpl implements DataService {
|
||||
Map<String, String> LowModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 0 && field.getAttributeType() != 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1));
|
||||
Map<String, String> HighModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 1 && field.getAttributeType() != 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1));
|
||||
Map<String, String> calculateFieldList = allIotModelField.stream().filter(field -> field.getAttributeType() == 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1));
|
||||
Map<String, String> fieldCodeNameList = allIotModelField.stream().collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getAttributeName, (value1, value2) -> value1));
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
for (String field : HighModelFieldList.keySet()) {
|
||||
map.put(field, HighModelFieldList.get(field));
|
||||
@ -206,6 +210,7 @@ public class DataServiceImpl implements DataService {
|
||||
lowMap.put(field, LowModelFieldList.get(field));
|
||||
}
|
||||
lowIotFieldMap.put(item.getIotModelCode(), lowMap);
|
||||
fieldCodeNameMap.put(item.getIotModelCode(),fieldCodeNameList);
|
||||
calculateIotFieldMap.put(item.getIotModelCode(), calculateFieldList);
|
||||
}
|
||||
tdEngineService.initIotModel(allIotModel, highIotFieldMap, lowIotFieldMap, calculateIotFieldMap);
|
||||
|
@ -0,0 +1,25 @@
|
||||
package com.das.modules.node.command;
|
||||
|
||||
import com.das.modules.node.constant.NodeConstant;
|
||||
import com.das.modules.node.domain.bo.TerminalMessage;
|
||||
import com.das.modules.node.service.NodeMessageService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service(value = NodeConstant.DEVICE_EVENT)
|
||||
@Slf4j
|
||||
public class DeviceEventCommand implements BaseCommand{
|
||||
@Autowired
|
||||
private NodeMessageService nodeMessageService;
|
||||
|
||||
@Override
|
||||
public void doCommand(TerminalMessage data) {
|
||||
try {
|
||||
nodeMessageService.handleDeviceEvent(data);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("解析数据异常", e);
|
||||
}
|
||||
}
|
||||
}
|
@ -23,4 +23,6 @@ public interface NodeConstant {
|
||||
String HIS_LOW_SPEED_DATA = "historyLowSpeedData";
|
||||
|
||||
String DEVICE_CONTROL_RESP = "deviceControlResp";
|
||||
|
||||
String DEVICE_EVENT = "deviceEvent";
|
||||
}
|
||||
|
@ -0,0 +1,23 @@
|
||||
package com.das.modules.node.domain.vo;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class DeviceEventVo {
|
||||
|
||||
private String deviceId;
|
||||
|
||||
private String attrCode;
|
||||
|
||||
private Object attrValue;
|
||||
|
||||
private Integer eventType;
|
||||
|
||||
private Long eventTime;
|
||||
|
||||
private Object limitValue;
|
||||
}
|
@ -17,4 +17,6 @@ public interface NodeMessageService {
|
||||
void handleHighSpeed(TerminalMessage data);
|
||||
|
||||
void handleLowSpeed(TerminalMessage data);
|
||||
|
||||
void handleDeviceEvent(TerminalMessage data);
|
||||
}
|
||||
|
@ -1,5 +1,15 @@
|
||||
package com.das.modules.node.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
|
||||
import com.das.common.utils.StringUtils;
|
||||
import com.das.modules.cache.domain.DeviceInfoCache;
|
||||
import com.das.modules.cache.service.CacheService;
|
||||
import com.das.modules.data.domain.DeviceEventInfo;
|
||||
import com.das.modules.data.service.impl.DataServiceImpl;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import cn.hutool.core.collection.ListUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import com.das.common.constant.MeasType;
|
||||
@ -67,6 +77,12 @@ public class NodeMessageServiceImpl implements NodeMessageService {
|
||||
@Autowired
|
||||
TDEngineService tdEngineService;
|
||||
|
||||
@Autowired
|
||||
private CacheService cacheService;
|
||||
|
||||
@Autowired
|
||||
private DataServiceImpl dataService;
|
||||
|
||||
|
||||
|
||||
@PostConstruct
|
||||
@ -280,6 +296,63 @@ public class NodeMessageServiceImpl implements NodeMessageService {
|
||||
tdEngineService.updateYCLowValues(highList, iotModelCode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleDeviceEvent(TerminalMessage data) {
|
||||
|
||||
JsonNode dataEvent = data.getData();
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
List<DeviceEventInfo> valueList = new ArrayList<>();
|
||||
|
||||
// 使用 TypeReference 来指定转换目标类型
|
||||
List<DeviceEventVo> list = objectMapper.convertValue(dataEvent, new TypeReference<List<DeviceEventVo>>() {
|
||||
});
|
||||
log.info("消息data转化deviceVo,{}",list);
|
||||
for (DeviceEventVo item : list){
|
||||
DeviceInfoCache deviceInfoCache = cacheService.getDeviceInfoCache(item.getDeviceId());
|
||||
DeviceEventInfo deviceEventInfo = new DeviceEventInfo();
|
||||
deviceEventInfo.setUpdateTime(item.getEventTime());
|
||||
deviceEventInfo.setEventId(IdWorker.getId());
|
||||
deviceEventInfo.setDeviceId(item.getDeviceId());
|
||||
deviceEventInfo.setDeviceName(deviceInfoCache.getDeviceName());
|
||||
deviceEventInfo.setDeviceCode(deviceInfoCache.getDeviceCode());
|
||||
String eventType = getEventType(item.getEventType());
|
||||
String model = dataService.iotModelMap.get(item.getDeviceId());
|
||||
if (StringUtils.isEmpty(model)){
|
||||
log.debug("未查询到物模型code,设备id:{}",item.getDeviceId());
|
||||
}
|
||||
String fieldName = dataService.fieldCodeNameMap.get(model).get(item.getAttrCode());
|
||||
if (StringUtils.isEmpty(fieldName)){
|
||||
log.debug("未查询到物模型属性code,设备id:{}",item.getDeviceId());
|
||||
}
|
||||
deviceEventInfo.setEventLevel(0);
|
||||
deviceEventInfo.setConfirmed(0);
|
||||
if (!StringUtils.isEmpty(fieldName) && fieldName.equals("遥信变位")){
|
||||
deviceEventInfo.setEventText(fieldName + eventType + "负归");
|
||||
}
|
||||
else{
|
||||
deviceEventInfo.setEventText(fieldName + eventType + "属性值为:"+item.getAttrValue()+",越限值为:"+item.getLimitValue());
|
||||
}
|
||||
valueList.add(deviceEventInfo);
|
||||
}
|
||||
try {
|
||||
tdEngineService.updateDeviceEventValues(valueList);
|
||||
}catch (Exception e){
|
||||
log.error("事件信息存入Td失败,失败原因{}",e);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
private String getEventType(int eventType) {
|
||||
switch (eventType) {
|
||||
case 0:
|
||||
return "遥信变位";
|
||||
case 1:
|
||||
return "越上限";
|
||||
case 2:
|
||||
return "越下限";
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -78,7 +78,7 @@
|
||||
where se.id = #{id}
|
||||
</select>
|
||||
<select id="getAllIotModelField" resultType="com.das.modules.equipment.entity.SysIotModelField">
|
||||
select simf.attribute_code as attributeCode,simf.highspeed as highSpeed,simf.datatype as dataType,simf.attribute_type as attributeType from sys_iot_model_field simf where simf.iot_model_id = #{id} order by simf.attribute_code
|
||||
select simf.attribute_name as attributeName, simf.attribute_code as attributeCode,simf.highspeed as highSpeed,simf.datatype as dataType,simf.attribute_type as attributeType from sys_iot_model_field simf where simf.iot_model_id = #{id} order by simf.attribute_code
|
||||
</select>
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user