diff --git a/das/src/main/java/com/das/modules/data/domain/DeviceEventInfo.java b/das/src/main/java/com/das/modules/data/domain/DeviceEventInfo.java new file mode 100644 index 00000000..45ec8839 --- /dev/null +++ b/das/src/main/java/com/das/modules/data/domain/DeviceEventInfo.java @@ -0,0 +1,26 @@ +package com.das.modules.data.domain; + +import lombok.Data; + +@Data +public class DeviceEventInfo { + private Long updateTime; + + private Long eventId; + + private Integer eventLevel; + + private String eventText; + + private Integer confirmed; + + private String confirmAccount; + + private Long confirmTime; + + private String deviceId; + + private String deviceCode; + + private String deviceName; +} diff --git a/das/src/main/java/com/das/modules/data/service/TDEngineService.java b/das/src/main/java/com/das/modules/data/service/TDEngineService.java index b5d4ee07..9b3e7fc1 100644 --- a/das/src/main/java/com/das/modules/data/service/TDEngineService.java +++ b/das/src/main/java/com/das/modules/data/service/TDEngineService.java @@ -2,10 +2,12 @@ package com.das.modules.data.service; import cn.hutool.core.collection.ListUtil; import cn.hutool.core.util.StrUtil; +import com.das.modules.data.domain.DeviceEventInfo; import com.das.modules.data.service.impl.DataServiceImpl; import com.das.modules.equipment.domain.vo.IotModelFieldVo; import com.das.modules.node.domain.bo.CalculateRTData; import com.das.modules.node.domain.bo.RTData; +import com.das.modules.node.domain.vo.DeviceEventVo; import com.das.modules.node.service.impl.NodeMessageServiceImpl; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; @@ -372,6 +374,48 @@ public class TDEngineService { } } + @Async + public void updateDeviceEventValues(List values) { + StringBuilder sb = new StringBuilder(1024 * 1024); + try (Connection conn = hikariDataSource.getConnection(); + Statement pstmt = conn.createStatement()) { + ListUtil.page(values, batchSize, (list) -> { + sb.setLength(0); + sb.append("insert into "); + for (DeviceEventInfo dv : list) { + sb.append("E_"); + sb.append(dv.getDeviceId()); + sb.append(" using event_info tags ("); + sb.append(dv.getDeviceCode()); + sb.append(","); + sb.append(dv.getDeviceName()); + sb.append(") values ("); + sb.append(dv.getUpdateTime()); + sb.append(","); + sb.append(dv.getEventId()); + sb.append(","); + sb.append(dv.getEventLevel()); + sb.append(","); + sb.append(dv.getEventText()); + sb.append(","); + sb.append(dv.getConfirmed()); + sb.append(","); + sb.append(dv.getConfirmAccount()); + sb.append(","); + sb.append(dv.getConfirmTime()); + sb.append(")"); + } + try { + pstmt.executeUpdate(sb.toString()); + } catch (SQLException ex) { + log.error("save yc error", ex); + } + }); + } catch (SQLException ex) { + log.error(ex.getMessage()); + } + } + @Async public void updateStataValues(List values, String iotModelCode) { StringBuilder sb = new StringBuilder(1024 * 1024); diff --git a/das/src/main/java/com/das/modules/data/service/impl/DataServiceImpl.java b/das/src/main/java/com/das/modules/data/service/impl/DataServiceImpl.java index 98a3b084..02928b3b 100644 --- a/das/src/main/java/com/das/modules/data/service/impl/DataServiceImpl.java +++ b/das/src/main/java/com/das/modules/data/service/impl/DataServiceImpl.java @@ -51,6 +51,9 @@ public class DataServiceImpl implements DataService { //key:modelId value:modelCode public ConcurrentHashMap iotModelMap = new ConcurrentHashMap<>(10000); + //key:modelId value:fieldCode fieldName + public ConcurrentHashMap> fieldCodeNameMap = new ConcurrentHashMap<>(10000); + //key:modelCode value:FiledCode,dataType public ConcurrentHashMap> highIotFieldMap = new ConcurrentHashMap<>(10000); @@ -196,6 +199,7 @@ public class DataServiceImpl implements DataService { Map LowModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 0 && field.getAttributeType() != 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1)); Map HighModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 1 && field.getAttributeType() != 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1)); Map calculateFieldList = allIotModelField.stream().filter(field -> field.getAttributeType() == 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1)); + Map fieldCodeNameList = allIotModelField.stream().collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getAttributeName, (value1, value2) -> value1)); Map map = new HashMap<>(); for (String field : HighModelFieldList.keySet()) { map.put(field, HighModelFieldList.get(field)); @@ -206,6 +210,7 @@ public class DataServiceImpl implements DataService { lowMap.put(field, LowModelFieldList.get(field)); } lowIotFieldMap.put(item.getIotModelCode(), lowMap); + fieldCodeNameMap.put(item.getIotModelCode(),fieldCodeNameList); calculateIotFieldMap.put(item.getIotModelCode(), calculateFieldList); } tdEngineService.initIotModel(allIotModel, highIotFieldMap, lowIotFieldMap, calculateIotFieldMap); diff --git a/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java b/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java new file mode 100644 index 00000000..aeb96bcf --- /dev/null +++ b/das/src/main/java/com/das/modules/node/command/DeviceEventCommand.java @@ -0,0 +1,25 @@ +package com.das.modules.node.command; + +import com.das.modules.node.constant.NodeConstant; +import com.das.modules.node.domain.bo.TerminalMessage; +import com.das.modules.node.service.NodeMessageService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service(value = NodeConstant.DEVICE_EVENT) +@Slf4j +public class DeviceEventCommand implements BaseCommand{ + @Autowired + private NodeMessageService nodeMessageService; + + @Override + public void doCommand(TerminalMessage data) { + try { + nodeMessageService.handleDeviceEvent(data); + + } catch (Exception e) { + log.error("解析数据异常", e); + } + } +} diff --git a/das/src/main/java/com/das/modules/node/constant/NodeConstant.java b/das/src/main/java/com/das/modules/node/constant/NodeConstant.java index 364fd3c3..8fdc5f37 100644 --- a/das/src/main/java/com/das/modules/node/constant/NodeConstant.java +++ b/das/src/main/java/com/das/modules/node/constant/NodeConstant.java @@ -23,4 +23,6 @@ public interface NodeConstant { String HIS_LOW_SPEED_DATA = "historyLowSpeedData"; String DEVICE_CONTROL_RESP = "deviceControlResp"; + + String DEVICE_EVENT = "deviceEvent"; } diff --git a/das/src/main/java/com/das/modules/node/domain/vo/DeviceEventVo.java b/das/src/main/java/com/das/modules/node/domain/vo/DeviceEventVo.java new file mode 100644 index 00000000..b9edc1a3 --- /dev/null +++ b/das/src/main/java/com/das/modules/node/domain/vo/DeviceEventVo.java @@ -0,0 +1,23 @@ +package com.das.modules.node.domain.vo; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class DeviceEventVo { + + private String deviceId; + + private String attrCode; + + private Object attrValue; + + private Integer eventType; + + private Long eventTime; + + private Object limitValue; +} diff --git a/das/src/main/java/com/das/modules/node/service/NodeMessageService.java b/das/src/main/java/com/das/modules/node/service/NodeMessageService.java index 29d55b01..ec1abb92 100644 --- a/das/src/main/java/com/das/modules/node/service/NodeMessageService.java +++ b/das/src/main/java/com/das/modules/node/service/NodeMessageService.java @@ -17,4 +17,6 @@ public interface NodeMessageService { void handleHighSpeed(TerminalMessage data); void handleLowSpeed(TerminalMessage data); + + void handleDeviceEvent(TerminalMessage data); } diff --git a/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java index 7cc101be..6be2f9c0 100644 --- a/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java +++ b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java @@ -1,5 +1,15 @@ package com.das.modules.node.service.impl; +import com.baomidou.mybatisplus.core.toolkit.IdWorker; +import com.das.common.utils.StringUtils; +import com.das.modules.cache.domain.DeviceInfoCache; +import com.das.modules.cache.service.CacheService; +import com.das.modules.data.domain.DeviceEventInfo; +import com.das.modules.data.service.impl.DataServiceImpl; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.core.type.TypeReference; import cn.hutool.core.collection.ListUtil; import cn.hutool.core.util.IdUtil; import com.das.common.constant.MeasType; @@ -67,6 +77,12 @@ public class NodeMessageServiceImpl implements NodeMessageService { @Autowired TDEngineService tdEngineService; + @Autowired + private CacheService cacheService; + + @Autowired + private DataServiceImpl dataService; + @PostConstruct @@ -280,6 +296,63 @@ public class NodeMessageServiceImpl implements NodeMessageService { tdEngineService.updateYCLowValues(highList, iotModelCode); } + @Override + public void handleDeviceEvent(TerminalMessage data) { + + JsonNode dataEvent = data.getData(); + ObjectMapper objectMapper = new ObjectMapper(); + List valueList = new ArrayList<>(); + + // 使用 TypeReference 来指定转换目标类型 + List list = objectMapper.convertValue(dataEvent, new TypeReference>() { + }); + log.info("消息data转化deviceVo,{}",list); + for (DeviceEventVo item : list){ + DeviceInfoCache deviceInfoCache = cacheService.getDeviceInfoCache(item.getDeviceId()); + DeviceEventInfo deviceEventInfo = new DeviceEventInfo(); + deviceEventInfo.setUpdateTime(item.getEventTime()); + deviceEventInfo.setEventId(IdWorker.getId()); + deviceEventInfo.setDeviceId(item.getDeviceId()); + deviceEventInfo.setDeviceName(deviceInfoCache.getDeviceName()); + deviceEventInfo.setDeviceCode(deviceInfoCache.getDeviceCode()); + String eventType = getEventType(item.getEventType()); + String model = dataService.iotModelMap.get(item.getDeviceId()); + if (StringUtils.isEmpty(model)){ + log.debug("未查询到物模型code,设备id:{}",item.getDeviceId()); + } + String fieldName = dataService.fieldCodeNameMap.get(model).get(item.getAttrCode()); + if (StringUtils.isEmpty(fieldName)){ + log.debug("未查询到物模型属性code,设备id:{}",item.getDeviceId()); + } + deviceEventInfo.setEventLevel(0); + deviceEventInfo.setConfirmed(0); + if (!StringUtils.isEmpty(fieldName) && fieldName.equals("遥信变位")){ + deviceEventInfo.setEventText(fieldName + eventType + "负归"); + } + else{ + deviceEventInfo.setEventText(fieldName + eventType + "属性值为:"+item.getAttrValue()+",越限值为:"+item.getLimitValue()); + } + valueList.add(deviceEventInfo); + } + try { + tdEngineService.updateDeviceEventValues(valueList); + }catch (Exception e){ + log.error("事件信息存入Td失败,失败原因{}",e); + } + } + + private String getEventType(int eventType) { + switch (eventType) { + case 0: + return "遥信变位"; + case 1: + return "越上限"; + case 2: + return "越下限"; + default: + return null; + } + } } diff --git a/das/src/main/resources/mapper/SysIotModelMapper.xml b/das/src/main/resources/mapper/SysIotModelMapper.xml index e9e43643..e4436ec7 100644 --- a/das/src/main/resources/mapper/SysIotModelMapper.xml +++ b/das/src/main/resources/mapper/SysIotModelMapper.xml @@ -78,7 +78,7 @@ where se.id = #{id}