From 1033f63006a2120c068e3fe51a988cddaf52dece Mon Sep 17 00:00:00 2001 From: huguanghan Date: Thu, 31 Oct 2024 17:01:12 +0800 Subject: [PATCH] =?UTF-8?q?tDService=E7=9B=B8=E5=85=B3=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- das/src/main/java/com/das/DasApplication.java | 4 +- .../data/controller/DataController.java | 1 + .../das/modules/data/service/DataService.java | 137 +----------- .../service/TDEngineService.java | 7 +- .../data/service/impl/DataServiceImpl.java | 208 ++++++++++++++++++ .../service/impl/SysIotModelServiceImpl.java | 5 +- .../node/command/AnalogDataCommand.java | 6 +- .../node/command/HisHighSpeedCommand.java | 6 +- .../node/command/HisLowSpeedCommand.java | 6 +- .../node/command/StateDataCommand.java | 6 +- .../node/controller/SysNodeController.java | 6 +- .../node/handler/NodeMessageHandler.java | 18 +- ...taService.java => NodeMessageService.java} | 5 +- ...eImpl.java => NodeMessageServiceImpl.java} | 60 +---- .../page/service/WindTurbinesPageService.java | 7 +- .../page/service/impl/HomeServiceImpl.java | 8 +- 16 files changed, 260 insertions(+), 230 deletions(-) rename das/src/main/java/com/das/modules/{node => data}/service/TDEngineService.java (99%) create mode 100644 das/src/main/java/com/das/modules/data/service/impl/DataServiceImpl.java rename das/src/main/java/com/das/modules/node/service/{DataService.java => NodeMessageService.java} (79%) rename das/src/main/java/com/das/modules/node/service/impl/{DataServiceImpl.java => NodeMessageServiceImpl.java} (76%) diff --git a/das/src/main/java/com/das/DasApplication.java b/das/src/main/java/com/das/DasApplication.java index 38fe33b4..025e64d0 100644 --- a/das/src/main/java/com/das/DasApplication.java +++ b/das/src/main/java/com/das/DasApplication.java @@ -1,7 +1,7 @@ package com.das; -import com.das.modules.node.service.DataService; -import com.das.modules.node.service.TDEngineService; +import com.das.modules.data.service.DataService; +import com.das.modules.data.service.TDEngineService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; diff --git a/das/src/main/java/com/das/modules/data/controller/DataController.java b/das/src/main/java/com/das/modules/data/controller/DataController.java index edcaecd3..8b84cb94 100644 --- a/das/src/main/java/com/das/modules/data/controller/DataController.java +++ b/das/src/main/java/com/das/modules/data/controller/DataController.java @@ -5,6 +5,7 @@ import com.das.common.utils.JsonUtils; import com.das.modules.data.domain.SnapshotValueQueryParam; import com.das.modules.data.domain.TSValueQueryParam; import com.das.modules.data.service.DataService; +import com.das.modules.data.service.impl.DataServiceImpl; import jakarta.validation.Valid; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; diff --git a/das/src/main/java/com/das/modules/data/service/DataService.java b/das/src/main/java/com/das/modules/data/service/DataService.java index 1baefcb5..87e239a3 100644 --- a/das/src/main/java/com/das/modules/data/service/DataService.java +++ b/das/src/main/java/com/das/modules/data/service/DataService.java @@ -1,142 +1,21 @@ package com.das.modules.data.service; -import cn.hutool.core.collection.CollectionUtil; -import cn.hutool.core.collection.ListUtil; -import com.das.common.exceptions.ServiceException; -import com.das.common.utils.AdminRedisTemplate; import com.das.modules.data.domain.SnapshotValueQueryParam; import com.das.modules.data.domain.TSValueQueryParam; -import com.das.modules.equipment.entity.SysIotModelField; -import com.das.modules.equipment.mapper.SysIotModelFieldMapper; -import com.das.modules.node.service.TDEngineService; -import com.das.modules.node.service.impl.DataServiceImpl; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import org.springframework.util.CollectionUtils; -import org.springframework.util.StopWatch; +import com.das.modules.node.domain.bo.CalculateRTData; -import java.util.*; -import java.util.concurrent.TimeUnit; +import java.util.List; +import java.util.Map; -@Slf4j -@Service -public class DataService { +public interface DataService { - public static final int COMMIT_COUNT = 1000; + Map> querySnapshotValues(List paramList); - @Autowired - AdminRedisTemplate adminRedisTemplate; + Map>> queryTimeSeriesValues(TSValueQueryParam param); - @Autowired - private SysIotModelFieldMapper sysIotModelFieldMapper; + void createTdStable(); - @Autowired - private TDEngineService tdEngineService; - - @Autowired - private DataServiceImpl dataService; - - /** - * 读取实时数据快照 - * @param paramList 设备id及设备属性列表 - * @return 实时数据快照 - */ - public Map> querySnapshotValues(List paramList) { - long start = System.currentTimeMillis(); - Map> result = new HashMap<>(paramList.size()); - Map> finalResult = result; - ListUtil.page(paramList, COMMIT_COUNT, list -> { - List keyList = new ArrayList<>(); - for (int i = 0; i < list.size(); i++) { - SnapshotValueQueryParam snapshotValueQueryParam = list.get(i); - List attributes = snapshotValueQueryParam.getAttributes(); - if (CollectionUtils.isEmpty(attributes)) { - //为空查全部 - List sysIotModelFields = sysIotModelFieldMapper.queryAllFiledNames(Long.valueOf(snapshotValueQueryParam.getDeviceId())); - for (String item : sysIotModelFields) { - String key = String.format("RT:%s:%s", snapshotValueQueryParam.getDeviceId(), item.toLowerCase()); - keyList.add(key); - } - } else { - for (String item : attributes) { - String key = String.format("RT:%s:%s", snapshotValueQueryParam.getDeviceId(), item.toLowerCase()); - keyList.add(key); - } - } - } - List dataList = adminRedisTemplate.mGet(keyList); - for (int i = 0; i < keyList.size(); i++) { - String key = keyList.get(i); - int firstColonIndex = key.indexOf(':'); - int secondColonIndex = key.indexOf(':', firstColonIndex + 1); - String deviceId = key.substring(firstColonIndex + 1, secondColonIndex); - String fieldName = key.substring(secondColonIndex + 1); - if (finalResult.get(deviceId) == null) { - Map valueMap = new HashMap<>(); - valueMap.put(fieldName, dataList.get(i)); - finalResult.put(deviceId, valueMap); - } else { - finalResult.get(deviceId).put(fieldName, dataList.get(i)); - } - } - }); - long end = System.currentTimeMillis(); - log.debug("读取快照{}个,耗时: {}秒", paramList.size(), (end - start) / 1000.0); - return finalResult; - } - - /** - * 历史区间数据查询 - * @param param 查询条件 - * @return TD数据库数据 - */ - public Map>> queryTimeSeriesValues(TSValueQueryParam param) { - if (CollectionUtil.isEmpty(param.getDevices()) || (param.getStartTime() == null && param.getEndTime() == null)) { - throw new ServiceException("必要参数缺失"); - } - Date startTime = new Date(Long.parseLong(param.getStartTime())); - Date endTime = new Date(Long.parseLong(param.getEndTime())); - Map>> result = new HashMap<>(param.getDevices().size()); - List deviceFieldList = param.getDevices(); - for (SnapshotValueQueryParam item : deviceFieldList) { - //field分为高频和低频查询 - Map>> values = queryHistoryCurveValues(Long.valueOf(item.getDeviceId()), startTime, endTime, param.getInterval(), param.getFill(), item.getAttributes()); - result.putAll(values); - } - return result; - } - - private Map>> queryHistoryCurveValues(Long irn, Date startTime, Date endTime, String interval, String fill, List attributes) { - - String iotModelCode = sysIotModelFieldMapper.queryModelCodeByDeviceId(irn); - Map highSpeedFieldMap = dataService.highIotFieldMap.get(iotModelCode); - Map lowSpeedFieldMap = dataService.lowIotFieldMap.get(iotModelCode); - List highSpeedField = new ArrayList<>(); - List lowSpeedField = new ArrayList<>(); - for (String field : attributes) { - if (highSpeedFieldMap.containsKey(field)) { - highSpeedField.add(field); - } - if (lowSpeedFieldMap.containsKey(field)) { - lowSpeedField.add(field); - } - } - Map>> result = new HashMap<>(); - if (!CollectionUtils.isEmpty(highSpeedField)) { - Map>> highHistoryCurve = tdEngineService.fetchHighHistoryCurve(irn, startTime, endTime, interval, highSpeedField); - result.putAll(highHistoryCurve); - } - if (!CollectionUtils.isEmpty(lowSpeedField)) { - Map>> lowHistoryCurve = tdEngineService.fetchLowHistoryCurve(irn, startTime, endTime, interval, lowSpeedField); - if (result.get(irn.toString()) == null) { - result.putAll(lowHistoryCurve); - } else { - result.get(irn.toString()).putAll(lowHistoryCurve.get(irn.toString())); - } - } - return result; - } + void updateCalFieldData(List values); } diff --git a/das/src/main/java/com/das/modules/node/service/TDEngineService.java b/das/src/main/java/com/das/modules/data/service/TDEngineService.java similarity index 99% rename from das/src/main/java/com/das/modules/node/service/TDEngineService.java rename to das/src/main/java/com/das/modules/data/service/TDEngineService.java index 6408a613..d85e8010 100644 --- a/das/src/main/java/com/das/modules/node/service/TDEngineService.java +++ b/das/src/main/java/com/das/modules/data/service/TDEngineService.java @@ -1,17 +1,16 @@ -package com.das.modules.node.service; +package com.das.modules.data.service; import cn.hutool.core.collection.ListUtil; import cn.hutool.core.util.StrUtil; +import com.das.modules.data.service.impl.DataServiceImpl; import com.das.modules.equipment.domain.vo.IotModelFieldVo; -import com.das.modules.equipment.mapper.SysIotModelMapper; import com.das.modules.node.domain.bo.CalculateRTData; import com.das.modules.node.domain.bo.RTData; -import com.das.modules.node.service.impl.DataServiceImpl; +import com.das.modules.node.service.impl.NodeMessageServiceImpl; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; diff --git a/das/src/main/java/com/das/modules/data/service/impl/DataServiceImpl.java b/das/src/main/java/com/das/modules/data/service/impl/DataServiceImpl.java new file mode 100644 index 00000000..432fa733 --- /dev/null +++ b/das/src/main/java/com/das/modules/data/service/impl/DataServiceImpl.java @@ -0,0 +1,208 @@ +package com.das.modules.data.service.impl; + +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.collection.ListUtil; +import com.das.common.exceptions.ServiceException; +import com.das.common.utils.AdminRedisTemplate; +import com.das.modules.data.domain.SnapshotValueQueryParam; +import com.das.modules.data.domain.TSValueQueryParam; +import com.das.modules.data.service.DataService; +import com.das.modules.data.service.TDEngineService; +import com.das.modules.equipment.domain.vo.IotModelFieldVo; +import com.das.modules.equipment.entity.SysIotModelField; +import com.das.modules.equipment.mapper.SysIotModelFieldMapper; +import com.das.modules.equipment.mapper.SysIotModelMapper; +import com.das.modules.node.domain.bo.CalculateRTData; +import com.das.modules.node.service.impl.NodeMessageServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +@Slf4j +@Service +public class DataServiceImpl implements DataService { + + public static final int COMMIT_COUNT = 1000; + + @Autowired + AdminRedisTemplate adminRedisTemplate; + + @Autowired + private SysIotModelFieldMapper sysIotModelFieldMapper; + + @Autowired + private TDEngineService tdEngineService; + + @Autowired + private NodeMessageServiceImpl dataService; + + @Autowired + SysIotModelMapper sysIotModelMapper; + + //key:deviceId value:modelCode + public ConcurrentHashMap deviceModelMap = new ConcurrentHashMap<>(10000); + + //key:modelId value:modelCode + public ConcurrentHashMap iotModelMap = new ConcurrentHashMap<>(10000); + + //key:modelCode value:Filed:Code,dataType + public ConcurrentHashMap> highIotFieldMap = new ConcurrentHashMap<>(10000); + + //key:modelCode value:Filed:Code,dataType + public ConcurrentHashMap> lowIotFieldMap = new ConcurrentHashMap<>(10000); + + //key:modelCode value:Filed:Code,dataType + public ConcurrentHashMap> calculateIotFieldMap = new ConcurrentHashMap<>(10000); + + /** + * 读取实时数据快照 + * @param paramList 设备id及设备属性列表 + * @return 实时数据快照 + */ + @Override + public Map> querySnapshotValues(List paramList) { + long start = System.currentTimeMillis(); + Map> result = new HashMap<>(paramList.size()); + Map> finalResult = result; + ListUtil.page(paramList, COMMIT_COUNT, list -> { + List keyList = new ArrayList<>(); + for (int i = 0; i < list.size(); i++) { + SnapshotValueQueryParam snapshotValueQueryParam = list.get(i); + List attributes = snapshotValueQueryParam.getAttributes(); + if (CollectionUtils.isEmpty(attributes)) { + //为空查全部 + List sysIotModelFields = sysIotModelFieldMapper.queryAllFiledNames(Long.valueOf(snapshotValueQueryParam.getDeviceId())); + for (String item : sysIotModelFields) { + String key = String.format("RT:%s:%s", snapshotValueQueryParam.getDeviceId(), item.toLowerCase()); + keyList.add(key); + } + } else { + for (String item : attributes) { + String key = String.format("RT:%s:%s", snapshotValueQueryParam.getDeviceId(), item.toLowerCase()); + keyList.add(key); + } + } + } + List dataList = adminRedisTemplate.mGet(keyList); + for (int i = 0; i < keyList.size(); i++) { + String key = keyList.get(i); + int firstColonIndex = key.indexOf(':'); + int secondColonIndex = key.indexOf(':', firstColonIndex + 1); + String deviceId = key.substring(firstColonIndex + 1, secondColonIndex); + String fieldName = key.substring(secondColonIndex + 1); + if (finalResult.get(deviceId) == null) { + Map valueMap = new HashMap<>(); + valueMap.put(fieldName, dataList.get(i)); + finalResult.put(deviceId, valueMap); + } else { + finalResult.get(deviceId).put(fieldName, dataList.get(i)); + } + } + }); + long end = System.currentTimeMillis(); + log.debug("读取快照{}个,耗时: {}秒", paramList.size(), (end - start) / 1000.0); + return finalResult; + } + + /** + * 历史区间数据查询 + * @param param 查询条件 + * @return TD数据库数据 + */ + @Override + public Map>> queryTimeSeriesValues(TSValueQueryParam param) { + + Long start = System.currentTimeMillis(); + if (CollectionUtil.isEmpty(param.getDevices()) || (param.getStartTime() == null && param.getEndTime() == null)) { + throw new ServiceException("必要参数缺失"); + } + Date startTime = new Date(Long.parseLong(param.getStartTime())); + Date endTime = new Date(Long.parseLong(param.getEndTime())); + Map>> result = new HashMap<>(param.getDevices().size()); + List deviceFieldList = param.getDevices(); + for (SnapshotValueQueryParam item : deviceFieldList) { + //field分为高频和低频查询 + Map>> values = queryHistoryCurveValues(Long.valueOf(item.getDeviceId()), startTime, endTime, param.getInterval(), param.getFill(), item.getAttributes()); + result.putAll(values); + } + Long end = System.currentTimeMillis(); + log.debug("读取快照{}个,耗时: {}秒", param.getDevices().size(), (end-start)/ 1000.0); + return result; + } + + private Map>> queryHistoryCurveValues(Long irn, Date startTime, Date endTime, String interval, String fill, List attributes) { + + String iotModelCode = sysIotModelFieldMapper.queryModelCodeByDeviceId(irn); + Map highSpeedFieldMap = highIotFieldMap.get(iotModelCode); + Map lowSpeedFieldMap = lowIotFieldMap.get(iotModelCode); + List highSpeedField = new ArrayList<>(); + List lowSpeedField = new ArrayList<>(); + for (String field : attributes) { + if (highSpeedFieldMap.containsKey(field)) { + highSpeedField.add(field); + } + if (lowSpeedFieldMap.containsKey(field)) { + lowSpeedField.add(field); + } + } + Map>> result = new HashMap<>(); + if (!CollectionUtils.isEmpty(highSpeedField)) { + Map>> highHistoryCurve = tdEngineService.fetchHighHistoryCurve(irn, startTime, endTime, interval, highSpeedField); + result.putAll(highHistoryCurve); + } + if (!CollectionUtils.isEmpty(lowSpeedField)) { + Map>> lowHistoryCurve = tdEngineService.fetchLowHistoryCurve(irn, startTime, endTime, interval, lowSpeedField); + if (result.get(irn.toString()) == null) { + result.putAll(lowHistoryCurve); + } else { + result.get(irn.toString()).putAll(lowHistoryCurve.get(irn.toString())); + } + } + return result; + } + + @Override + public void updateCalFieldData(List calValues) { + //更新数据至redis,TD + ListUtil.page(calValues, COMMIT_COUNT, list -> { + Map keyValueMap = new HashMap<>(); + for (CalculateRTData value : list) { + String key = String.format("RT:%s:%s", value.getDeviceId(), value.getIotModelField().toLowerCase()); + keyValueMap.put(key, value.getDataValue()); + } + adminRedisTemplate.mSet(keyValueMap); + tdEngineService.updateCalFieldValues(list); + }); + } + + @Override + public void createTdStable() { + List allIotModel = sysIotModelMapper.getAllIotModel(); + for (IotModelFieldVo item : allIotModel) { + String key = String.valueOf(item.getId()); + iotModelMap.put(key, item.getIotModelCode()); + List allIotModelField = sysIotModelMapper.getAllIotModelField(item.getId()); + Map LowModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 0 && field.getAttributeType() != 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1)); + Map HighModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 1 && field.getAttributeType() != 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1)); + Map calculateFieldList = allIotModelField.stream().filter(field -> field.getAttributeType() == 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1)); + Map map = new HashMap<>(); + for (String field : HighModelFieldList.keySet()) { + map.put(field, HighModelFieldList.get(field)); + } + highIotFieldMap.put(item.getIotModelCode(), map); + Map lowMap = new HashMap<>(); + for (String field : LowModelFieldList.keySet()) { + lowMap.put(field, LowModelFieldList.get(field)); + } + lowIotFieldMap.put(item.getIotModelCode(), lowMap); + calculateIotFieldMap.put(item.getIotModelCode(), calculateFieldList); + } + tdEngineService.initIotModel(allIotModel, highIotFieldMap, lowIotFieldMap, calculateIotFieldMap); + } + +} diff --git a/das/src/main/java/com/das/modules/equipment/service/impl/SysIotModelServiceImpl.java b/das/src/main/java/com/das/modules/equipment/service/impl/SysIotModelServiceImpl.java index b6c30c70..67750e0c 100644 --- a/das/src/main/java/com/das/modules/equipment/service/impl/SysIotModelServiceImpl.java +++ b/das/src/main/java/com/das/modules/equipment/service/impl/SysIotModelServiceImpl.java @@ -10,6 +10,7 @@ import com.das.common.config.SessionUtil; import com.das.common.exceptions.ServiceException; import com.das.common.utils.*; import com.das.modules.auth.domain.vo.SysUserVo; +import com.das.modules.data.service.impl.DataServiceImpl; import com.das.modules.equipment.domain.dto.SysIotModelDto; import com.das.modules.equipment.domain.dto.SysIotModelFieldDto; import com.das.modules.equipment.domain.dto.SysIotModelServiceDto; @@ -26,8 +27,7 @@ import com.das.modules.equipment.mapper.SysIotModelFieldMapper; import com.das.modules.equipment.mapper.SysIotModelMapper; import com.das.modules.equipment.mapper.SysIotModelServiceMapper; import com.das.modules.equipment.service.SysIotModelService; -import com.das.modules.node.service.TDEngineService; -import com.das.modules.node.service.impl.DataServiceImpl; +import com.das.modules.data.service.TDEngineService; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; import lombok.extern.slf4j.Slf4j; @@ -45,7 +45,6 @@ import java.util.stream.Collectors; @Service @Slf4j public class SysIotModelServiceImpl implements SysIotModelService { - public static final int COMMIT_COUNT = 1000; @Autowired private SysIotModelMapper sysIotModelMapper; diff --git a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java index 519d4e56..fcb1717c 100644 --- a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java @@ -2,7 +2,7 @@ package com.das.modules.node.command; import com.das.modules.node.constant.NodeConstant; import com.das.modules.node.domain.bo.TerminalMessage; -import com.das.modules.node.service.DataService; +import com.das.modules.node.service.NodeMessageService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -12,12 +12,12 @@ import org.springframework.stereotype.Service; public class AnalogDataCommand implements BaseCommand { @Autowired - DataService dataService; + NodeMessageService nodeMessageService; @Override public void doCommand(TerminalMessage data) { try { //analogData值只存入redis - dataService.handleData(data); + nodeMessageService.handleData(data); } catch (Exception e) { log.error("解析数据异常", e); diff --git a/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java b/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java index c59a8954..78662a09 100644 --- a/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java +++ b/das/src/main/java/com/das/modules/node/command/HisHighSpeedCommand.java @@ -2,7 +2,7 @@ package com.das.modules.node.command; import com.das.modules.node.constant.NodeConstant; import com.das.modules.node.domain.bo.TerminalMessage; -import com.das.modules.node.service.DataService; +import com.das.modules.node.service.NodeMessageService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -12,12 +12,12 @@ import org.springframework.stereotype.Service; public class HisHighSpeedCommand implements BaseCommand { @Autowired - DataService dataService; + NodeMessageService nodeMessageService; @Override public void doCommand(TerminalMessage data) { try { //analogData值只存入redis - dataService.handleHighSpeed(data); + nodeMessageService.handleHighSpeed(data); } catch (Exception e) { log.error("解析数据异常", e); diff --git a/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java b/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java index 0d1a6710..82d5497f 100644 --- a/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java +++ b/das/src/main/java/com/das/modules/node/command/HisLowSpeedCommand.java @@ -2,7 +2,7 @@ package com.das.modules.node.command; import com.das.modules.node.constant.NodeConstant; import com.das.modules.node.domain.bo.TerminalMessage; -import com.das.modules.node.service.DataService; +import com.das.modules.node.service.NodeMessageService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -12,12 +12,12 @@ import org.springframework.stereotype.Service; public class HisLowSpeedCommand implements BaseCommand { @Autowired - DataService dataService; + NodeMessageService nodeMessageService; @Override public void doCommand(TerminalMessage data) { try { //analogData值只存入redis - dataService.handleLowSpeed(data); + nodeMessageService.handleLowSpeed(data); } catch (Exception e) { log.error("解析数据异常", e); diff --git a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java index f758b68a..eb61758a 100644 --- a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java @@ -2,7 +2,7 @@ package com.das.modules.node.command; import com.das.modules.node.constant.NodeConstant; import com.das.modules.node.domain.bo.TerminalMessage; -import com.das.modules.node.service.DataService; +import com.das.modules.node.service.NodeMessageService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -12,13 +12,13 @@ import org.springframework.stereotype.Service; public class StateDataCommand implements BaseCommand { @Autowired - DataService dataService; + NodeMessageService nodeMessageService; @Override public void doCommand(TerminalMessage data) { try { //只存入redis - dataService.handleData(data); + nodeMessageService.handleData(data); } catch (Exception e) { log.error("解析数据异常", e); } diff --git a/das/src/main/java/com/das/modules/node/controller/SysNodeController.java b/das/src/main/java/com/das/modules/node/controller/SysNodeController.java index d8787ed9..b7ba3fd8 100644 --- a/das/src/main/java/com/das/modules/node/controller/SysNodeController.java +++ b/das/src/main/java/com/das/modules/node/controller/SysNodeController.java @@ -8,7 +8,7 @@ import com.das.common.result.R; import com.das.common.utils.PageDataInfo; import com.das.modules.node.domain.dto.*; import com.das.modules.node.domain.vo.*; -import com.das.modules.node.service.DataService; +import com.das.modules.node.service.NodeMessageService; import com.das.modules.node.service.SysNodeService; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; @@ -31,7 +31,7 @@ public class SysNodeController { private SysNodeService sysNodeService; @Autowired - private DataService dataService; + private NodeMessageService nodeMessageService; @@ -85,7 +85,7 @@ public class SysNodeController { /** 配置下发 */ @PostMapping("/configUpdate") public R configUpdate(@RequestBody SysNodeDto sysNodeDto) { - dataService.sendTerminalConfig(sysNodeDto.getId()); + nodeMessageService.sendTerminalConfig(sysNodeDto.getId()); return R.success(); } diff --git a/das/src/main/java/com/das/modules/node/handler/NodeMessageHandler.java b/das/src/main/java/com/das/modules/node/handler/NodeMessageHandler.java index a20070e8..1c7c2262 100644 --- a/das/src/main/java/com/das/modules/node/handler/NodeMessageHandler.java +++ b/das/src/main/java/com/das/modules/node/handler/NodeMessageHandler.java @@ -4,7 +4,7 @@ import com.das.common.utils.JsonUtils; import com.das.common.utils.SpringUtil; import com.das.modules.node.constant.NodeConstant; import com.das.modules.node.domain.bo.TerminalMessage; -import com.das.modules.node.service.DataService; +import com.das.modules.node.service.NodeMessageService; import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; @@ -22,7 +22,7 @@ public class NodeMessageHandler extends TextWebSocketHandler { public static final Long HEARTBEAT_TIMEOUT = 1000 * 60 * 1L; private ConcurrentHashMap onlineSessions = new ConcurrentHashMap<>(16); - private DataService dataService; + private NodeMessageService nodeMessageService; @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, ""); @@ -44,10 +44,10 @@ public class NodeMessageHandler extends TextWebSocketHandler { // 如果version是0,则需要调用一次configUpdate配置更新 if (version == 0){ - if (dataService == null){ - dataService = SpringUtil.getBean(DataService.class); + if (nodeMessageService == null){ + nodeMessageService = SpringUtil.getBean(NodeMessageService.class); } - dataService.sendTerminalConfig(Long.valueOf(nodeId)); + nodeMessageService.sendTerminalConfig(Long.valueOf(nodeId)); } } @@ -59,11 +59,11 @@ public class NodeMessageHandler extends TextWebSocketHandler { String cmd = msg.getCmd(); JsonNode data = msg.getData(); log.info("收到 Node:{} 命令: {}", nodeId, cmd); - if (dataService == null){ - dataService = SpringUtil.getBean(DataService.class); + if (nodeMessageService == null){ + nodeMessageService = SpringUtil.getBean(NodeMessageService.class); } - if (dataService != null){ - dataService.pushMessage(msg); + if (nodeMessageService != null){ + nodeMessageService.pushMessage(msg); } } diff --git a/das/src/main/java/com/das/modules/node/service/DataService.java b/das/src/main/java/com/das/modules/node/service/NodeMessageService.java similarity index 79% rename from das/src/main/java/com/das/modules/node/service/DataService.java rename to das/src/main/java/com/das/modules/node/service/NodeMessageService.java index 33b34c4c..29d55b01 100644 --- a/das/src/main/java/com/das/modules/node/service/DataService.java +++ b/das/src/main/java/com/das/modules/node/service/NodeMessageService.java @@ -6,18 +6,15 @@ import com.fasterxml.jackson.databind.JsonNode; import java.util.List; -public interface DataService { +public interface NodeMessageService { void pushMessage(TerminalMessage msg); JsonNode sendTerminalConfig(Long nodeId); - void createTdStable(); void handleData(TerminalMessage data); void handleHighSpeed(TerminalMessage data); void handleLowSpeed(TerminalMessage data); - - void updateCalFieldData(List values); } diff --git a/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java similarity index 76% rename from das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java rename to das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java index c32d5b28..7cc101be 100644 --- a/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java +++ b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java @@ -16,8 +16,8 @@ import com.das.modules.node.domain.vo.*; import com.das.modules.node.handler.NodeMessageHandler; import com.das.modules.node.mapper.SysCommunicationLinkMapper; import com.das.modules.node.mapper.SysImpTabMappingMapper; -import com.das.modules.node.service.DataService; -import com.das.modules.node.service.TDEngineService; +import com.das.modules.node.service.NodeMessageService; +import com.das.modules.data.service.TDEngineService; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -39,15 +39,13 @@ import java.util.stream.Collectors; @Slf4j @Service -public class DataServiceImpl implements DataService { +public class NodeMessageServiceImpl implements NodeMessageService { private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); private Disruptor disruptor = null; private RingBuffer ringBuffer = null; - public static final int COMMIT_NUMBER = 1000; - @Resource TerminalMessageEventHandler terminalMessageEventHandler; @@ -69,20 +67,7 @@ public class DataServiceImpl implements DataService { @Autowired TDEngineService tdEngineService; - //key:deviceId value:modelCode - public ConcurrentHashMap deviceModelMap = new ConcurrentHashMap<>(10000); - //key:modelId value:modelCode - public ConcurrentHashMap iotModelMap = new ConcurrentHashMap<>(10000); - - //key:modelCode value:Filed:Code,dataType - public ConcurrentHashMap> highIotFieldMap = new ConcurrentHashMap<>(10000); - - //key:modelCode value:Filed:Code,dataType - public ConcurrentHashMap> lowIotFieldMap = new ConcurrentHashMap<>(10000); - - //key:modelCode value:Filed:Code,dataType - public ConcurrentHashMap> calculateIotFieldMap = new ConcurrentHashMap<>(10000); @PostConstruct public void init() { @@ -224,31 +209,6 @@ public class DataServiceImpl implements DataService { return jsonNode; } - @Override - public void createTdStable() { - List allIotModel = sysIotModelMapper.getAllIotModel(); - for (IotModelFieldVo item : allIotModel) { - String key = String.valueOf(item.getId()); - iotModelMap.put(key, item.getIotModelCode()); - List allIotModelField = sysIotModelMapper.getAllIotModelField(item.getId()); - Map LowModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 0 && field.getAttributeType() != 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1)); - Map HighModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 1 && field.getAttributeType() != 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1)); - Map calculateFieldList = allIotModelField.stream().filter(field -> field.getAttributeType() == 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1)); - Map map = new HashMap<>(); - for (String field : HighModelFieldList.keySet()) { - map.put(field, HighModelFieldList.get(field)); - } - highIotFieldMap.put(item.getIotModelCode(), map); - Map lowMap = new HashMap<>(); - for (String field : LowModelFieldList.keySet()) { - lowMap.put(field, LowModelFieldList.get(field)); - } - lowIotFieldMap.put(item.getIotModelCode(), lowMap); - calculateIotFieldMap.put(item.getIotModelCode(), calculateFieldList); - } - tdEngineService.initIotModel(allIotModel, highIotFieldMap, lowIotFieldMap, calculateIotFieldMap); - } - @Override public void handleData(TerminalMessage data) { JsonNode jsonNode = data.getData(); @@ -320,18 +280,6 @@ public class DataServiceImpl implements DataService { tdEngineService.updateYCLowValues(highList, iotModelCode); } - @Override - public void updateCalFieldData(List calValues) { - //更新数据至redis,TD - ListUtil.page(calValues, COMMIT_NUMBER, list -> { - Map keyValueMap = new HashMap<>(); - for (CalculateRTData value : list) { - String key = String.format("RT:%s:%s", value.getDeviceId(), value.getIotModelField().toLowerCase()); - keyValueMap.put(key, value.getDataValue()); - } - adminRedisTemplate.mSet(keyValueMap); - tdEngineService.updateCalFieldValues(list); - }); - } + } diff --git a/das/src/main/java/com/das/modules/page/service/WindTurbinesPageService.java b/das/src/main/java/com/das/modules/page/service/WindTurbinesPageService.java index 95687060..a9423601 100644 --- a/das/src/main/java/com/das/modules/page/service/WindTurbinesPageService.java +++ b/das/src/main/java/com/das/modules/page/service/WindTurbinesPageService.java @@ -3,12 +3,11 @@ package com.das.modules.page.service; import com.das.common.constant.EquipmentTypeIds; import com.das.common.exceptions.ServiceException; import com.das.modules.data.domain.SnapshotValueQueryParam; -import com.das.modules.data.service.DataService; +import com.das.modules.data.service.impl.DataServiceImpl; import com.das.modules.equipment.domain.dto.SysEquipmentDto; import com.das.modules.equipment.domain.vo.SysEquipmentVo; import com.das.modules.equipment.mapper.SysEquipmentMapper; import com.das.modules.node.domain.dto.DeviceCommandDto; -import com.das.modules.node.service.SysNodeService; import com.das.modules.operation.service.OperationService; import com.das.modules.page.domian.WindTurbinesPageVo; import lombok.extern.slf4j.Slf4j; @@ -31,7 +30,7 @@ public class WindTurbinesPageService { SysEquipmentMapper sysEquipmentMapper; @Autowired - private DataService dataService; + private DataServiceImpl dataServiceImpl; @Autowired OperationService optService; @@ -107,7 +106,7 @@ public class WindTurbinesPageService { } //获取设备测点数据 - Map> map = dataService.querySnapshotValues(paramList); + Map> map = dataServiceImpl.querySnapshotValues(paramList); for (WindTurbinesPageVo item : windTurbinesPageVos) { item.setAttributeMap(map.get(item.getIrn().toString())); } diff --git a/das/src/main/java/com/das/modules/page/service/impl/HomeServiceImpl.java b/das/src/main/java/com/das/modules/page/service/impl/HomeServiceImpl.java index d1916498..7b5da110 100644 --- a/das/src/main/java/com/das/modules/page/service/impl/HomeServiceImpl.java +++ b/das/src/main/java/com/das/modules/page/service/impl/HomeServiceImpl.java @@ -3,7 +3,7 @@ package com.das.modules.page.service.impl; import com.das.common.constant.EquipmentTypeIds; import com.das.modules.data.domain.SnapshotValueQueryParam; -import com.das.modules.data.service.DataService; +import com.das.modules.data.service.impl.DataServiceImpl; import com.das.modules.equipment.domain.dto.SysEquipmentDto; import com.das.modules.equipment.domain.vo.SysEquipmentVo; import com.das.modules.equipment.mapper.SysEquipmentMapper; @@ -25,7 +25,7 @@ public class HomeServiceImpl implements HomeService { SysEquipmentMapper sysEquipmentMapper; @Autowired - private DataService dataService; + private DataServiceImpl dataServiceImpl; //缺省风电场对象 private long defaultWindFarmId = 0; @@ -80,7 +80,7 @@ public class HomeServiceImpl implements HomeService { homeWindRealTimeVoList.add(homeWindRealTimeVoResult); } //获取设备测点数据 - Map> map = dataService.querySnapshotValues(paramList); + Map> map = dataServiceImpl.querySnapshotValues(paramList); for (HomeWindTurbineMatrixDataVoVo item : homeWindRealTimeVoList) { item.setAttributeMap(map.get(item.getIrn().toString())); } @@ -137,7 +137,7 @@ public class HomeServiceImpl implements HomeService { snapshotValueQueryParam.setDeviceId(windFarmId.toString()); paramList.add(snapshotValueQueryParam); //获取设备测点数据 - Map> map = dataService.querySnapshotValues(paramList); + Map> map = dataServiceImpl.querySnapshotValues(paramList); HomeWindFarmRealDataVo homeWindFarmRealDataVo = new HomeWindFarmRealDataVo(); homeWindFarmRealDataVo.setWindFarmId(windFarmId); if (map !=null){