tDService相关调整

This commit is contained in:
huguanghan 2024-10-31 17:01:12 +08:00
parent 2f1f91b3b4
commit 1033f63006
16 changed files with 260 additions and 230 deletions

View File

@ -1,7 +1,7 @@
package com.das;
import com.das.modules.node.service.DataService;
import com.das.modules.node.service.TDEngineService;
import com.das.modules.data.service.DataService;
import com.das.modules.data.service.TDEngineService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;

View File

@ -5,6 +5,7 @@ import com.das.common.utils.JsonUtils;
import com.das.modules.data.domain.SnapshotValueQueryParam;
import com.das.modules.data.domain.TSValueQueryParam;
import com.das.modules.data.service.DataService;
import com.das.modules.data.service.impl.DataServiceImpl;
import jakarta.validation.Valid;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;

View File

@ -1,142 +1,21 @@
package com.das.modules.data.service;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil;
import com.das.common.exceptions.ServiceException;
import com.das.common.utils.AdminRedisTemplate;
import com.das.modules.data.domain.SnapshotValueQueryParam;
import com.das.modules.data.domain.TSValueQueryParam;
import com.das.modules.equipment.entity.SysIotModelField;
import com.das.modules.equipment.mapper.SysIotModelFieldMapper;
import com.das.modules.node.service.TDEngineService;
import com.das.modules.node.service.impl.DataServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StopWatch;
import com.das.modules.node.domain.bo.CalculateRTData;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.List;
import java.util.Map;
@Slf4j
@Service
public class DataService {
public interface DataService {
public static final int COMMIT_COUNT = 1000;
Map<String, Map<String, Object>> querySnapshotValues(List<SnapshotValueQueryParam> paramList);
@Autowired
AdminRedisTemplate adminRedisTemplate;
Map<String, Map<String, Map<String, Object>>> queryTimeSeriesValues(TSValueQueryParam param);
@Autowired
private SysIotModelFieldMapper sysIotModelFieldMapper;
void createTdStable();
@Autowired
private TDEngineService tdEngineService;
@Autowired
private DataServiceImpl dataService;
/**
* 读取实时数据快照
* @param paramList 设备id及设备属性列表
* @return 实时数据快照
*/
public Map<String, Map<String, Object>> querySnapshotValues(List<SnapshotValueQueryParam> paramList) {
long start = System.currentTimeMillis();
Map<String, Map<String, Object>> result = new HashMap<>(paramList.size());
Map<String, Map<String, Object>> finalResult = result;
ListUtil.page(paramList, COMMIT_COUNT, list -> {
List<String> keyList = new ArrayList<>();
for (int i = 0; i < list.size(); i++) {
SnapshotValueQueryParam snapshotValueQueryParam = list.get(i);
List<String> attributes = snapshotValueQueryParam.getAttributes();
if (CollectionUtils.isEmpty(attributes)) {
//为空查全部
List<String> sysIotModelFields = sysIotModelFieldMapper.queryAllFiledNames(Long.valueOf(snapshotValueQueryParam.getDeviceId()));
for (String item : sysIotModelFields) {
String key = String.format("RT:%s:%s", snapshotValueQueryParam.getDeviceId(), item.toLowerCase());
keyList.add(key);
}
} else {
for (String item : attributes) {
String key = String.format("RT:%s:%s", snapshotValueQueryParam.getDeviceId(), item.toLowerCase());
keyList.add(key);
}
}
}
List<Object> dataList = adminRedisTemplate.mGet(keyList);
for (int i = 0; i < keyList.size(); i++) {
String key = keyList.get(i);
int firstColonIndex = key.indexOf(':');
int secondColonIndex = key.indexOf(':', firstColonIndex + 1);
String deviceId = key.substring(firstColonIndex + 1, secondColonIndex);
String fieldName = key.substring(secondColonIndex + 1);
if (finalResult.get(deviceId) == null) {
Map<String, Object> valueMap = new HashMap<>();
valueMap.put(fieldName, dataList.get(i));
finalResult.put(deviceId, valueMap);
} else {
finalResult.get(deviceId).put(fieldName, dataList.get(i));
}
}
});
long end = System.currentTimeMillis();
log.debug("读取快照{}个,耗时: {}秒", paramList.size(), (end - start) / 1000.0);
return finalResult;
}
/**
* 历史区间数据查询
* @param param 查询条件
* @return TD数据库数据
*/
public Map<String, Map<String, Map<String, Object>>> queryTimeSeriesValues(TSValueQueryParam param) {
if (CollectionUtil.isEmpty(param.getDevices()) || (param.getStartTime() == null && param.getEndTime() == null)) {
throw new ServiceException("必要参数缺失");
}
Date startTime = new Date(Long.parseLong(param.getStartTime()));
Date endTime = new Date(Long.parseLong(param.getEndTime()));
Map<String, Map<String, Map<String, Object>>> result = new HashMap<>(param.getDevices().size());
List<SnapshotValueQueryParam> deviceFieldList = param.getDevices();
for (SnapshotValueQueryParam item : deviceFieldList) {
//field分为高频和低频查询
Map<String, Map<String, Map<String, Object>>> values = queryHistoryCurveValues(Long.valueOf(item.getDeviceId()), startTime, endTime, param.getInterval(), param.getFill(), item.getAttributes());
result.putAll(values);
}
return result;
}
private Map<String, Map<String, Map<String, Object>>> queryHistoryCurveValues(Long irn, Date startTime, Date endTime, String interval, String fill, List<String> attributes) {
String iotModelCode = sysIotModelFieldMapper.queryModelCodeByDeviceId(irn);
Map<String, Object> highSpeedFieldMap = dataService.highIotFieldMap.get(iotModelCode);
Map<String, Object> lowSpeedFieldMap = dataService.lowIotFieldMap.get(iotModelCode);
List<String> highSpeedField = new ArrayList<>();
List<String> lowSpeedField = new ArrayList<>();
for (String field : attributes) {
if (highSpeedFieldMap.containsKey(field)) {
highSpeedField.add(field);
}
if (lowSpeedFieldMap.containsKey(field)) {
lowSpeedField.add(field);
}
}
Map<String, Map<String, Map<String, Object>>> result = new HashMap<>();
if (!CollectionUtils.isEmpty(highSpeedField)) {
Map<String, Map<String, Map<String, Object>>> highHistoryCurve = tdEngineService.fetchHighHistoryCurve(irn, startTime, endTime, interval, highSpeedField);
result.putAll(highHistoryCurve);
}
if (!CollectionUtils.isEmpty(lowSpeedField)) {
Map<String, Map<String, Map<String, Object>>> lowHistoryCurve = tdEngineService.fetchLowHistoryCurve(irn, startTime, endTime, interval, lowSpeedField);
if (result.get(irn.toString()) == null) {
result.putAll(lowHistoryCurve);
} else {
result.get(irn.toString()).putAll(lowHistoryCurve.get(irn.toString()));
}
}
return result;
}
void updateCalFieldData(List<CalculateRTData> values);
}

View File

@ -1,17 +1,16 @@
package com.das.modules.node.service;
package com.das.modules.data.service;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.StrUtil;
import com.das.modules.data.service.impl.DataServiceImpl;
import com.das.modules.equipment.domain.vo.IotModelFieldVo;
import com.das.modules.equipment.mapper.SysIotModelMapper;
import com.das.modules.node.domain.bo.CalculateRTData;
import com.das.modules.node.domain.bo.RTData;
import com.das.modules.node.service.impl.DataServiceImpl;
import com.das.modules.node.service.impl.NodeMessageServiceImpl;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;

View File

@ -0,0 +1,208 @@
package com.das.modules.data.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil;
import com.das.common.exceptions.ServiceException;
import com.das.common.utils.AdminRedisTemplate;
import com.das.modules.data.domain.SnapshotValueQueryParam;
import com.das.modules.data.domain.TSValueQueryParam;
import com.das.modules.data.service.DataService;
import com.das.modules.data.service.TDEngineService;
import com.das.modules.equipment.domain.vo.IotModelFieldVo;
import com.das.modules.equipment.entity.SysIotModelField;
import com.das.modules.equipment.mapper.SysIotModelFieldMapper;
import com.das.modules.equipment.mapper.SysIotModelMapper;
import com.das.modules.node.domain.bo.CalculateRTData;
import com.das.modules.node.service.impl.NodeMessageServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Slf4j
@Service
public class DataServiceImpl implements DataService {
public static final int COMMIT_COUNT = 1000;
@Autowired
AdminRedisTemplate adminRedisTemplate;
@Autowired
private SysIotModelFieldMapper sysIotModelFieldMapper;
@Autowired
private TDEngineService tdEngineService;
@Autowired
private NodeMessageServiceImpl dataService;
@Autowired
SysIotModelMapper sysIotModelMapper;
//key:deviceId value:modelCode
public ConcurrentHashMap<String, String> deviceModelMap = new ConcurrentHashMap<>(10000);
//key:modelId value:modelCode
public ConcurrentHashMap<String, String> iotModelMap = new ConcurrentHashMap<>(10000);
//key:modelCode value:Filed:Code,dataType
public ConcurrentHashMap<String, Map<String, Object>> highIotFieldMap = new ConcurrentHashMap<>(10000);
//key:modelCode value:Filed:Code,dataType
public ConcurrentHashMap<String, Map<String, Object>> lowIotFieldMap = new ConcurrentHashMap<>(10000);
//key:modelCode value:Filed:Code,dataType
public ConcurrentHashMap<String, Map<String, String>> calculateIotFieldMap = new ConcurrentHashMap<>(10000);
/**
* 读取实时数据快照
* @param paramList 设备id及设备属性列表
* @return 实时数据快照
*/
@Override
public Map<String, Map<String, Object>> querySnapshotValues(List<SnapshotValueQueryParam> paramList) {
long start = System.currentTimeMillis();
Map<String, Map<String, Object>> result = new HashMap<>(paramList.size());
Map<String, Map<String, Object>> finalResult = result;
ListUtil.page(paramList, COMMIT_COUNT, list -> {
List<String> keyList = new ArrayList<>();
for (int i = 0; i < list.size(); i++) {
SnapshotValueQueryParam snapshotValueQueryParam = list.get(i);
List<String> attributes = snapshotValueQueryParam.getAttributes();
if (CollectionUtils.isEmpty(attributes)) {
//为空查全部
List<String> sysIotModelFields = sysIotModelFieldMapper.queryAllFiledNames(Long.valueOf(snapshotValueQueryParam.getDeviceId()));
for (String item : sysIotModelFields) {
String key = String.format("RT:%s:%s", snapshotValueQueryParam.getDeviceId(), item.toLowerCase());
keyList.add(key);
}
} else {
for (String item : attributes) {
String key = String.format("RT:%s:%s", snapshotValueQueryParam.getDeviceId(), item.toLowerCase());
keyList.add(key);
}
}
}
List<Object> dataList = adminRedisTemplate.mGet(keyList);
for (int i = 0; i < keyList.size(); i++) {
String key = keyList.get(i);
int firstColonIndex = key.indexOf(':');
int secondColonIndex = key.indexOf(':', firstColonIndex + 1);
String deviceId = key.substring(firstColonIndex + 1, secondColonIndex);
String fieldName = key.substring(secondColonIndex + 1);
if (finalResult.get(deviceId) == null) {
Map<String, Object> valueMap = new HashMap<>();
valueMap.put(fieldName, dataList.get(i));
finalResult.put(deviceId, valueMap);
} else {
finalResult.get(deviceId).put(fieldName, dataList.get(i));
}
}
});
long end = System.currentTimeMillis();
log.debug("读取快照{}个,耗时: {}秒", paramList.size(), (end - start) / 1000.0);
return finalResult;
}
/**
* 历史区间数据查询
* @param param 查询条件
* @return TD数据库数据
*/
@Override
public Map<String, Map<String, Map<String, Object>>> queryTimeSeriesValues(TSValueQueryParam param) {
Long start = System.currentTimeMillis();
if (CollectionUtil.isEmpty(param.getDevices()) || (param.getStartTime() == null && param.getEndTime() == null)) {
throw new ServiceException("必要参数缺失");
}
Date startTime = new Date(Long.parseLong(param.getStartTime()));
Date endTime = new Date(Long.parseLong(param.getEndTime()));
Map<String, Map<String, Map<String, Object>>> result = new HashMap<>(param.getDevices().size());
List<SnapshotValueQueryParam> deviceFieldList = param.getDevices();
for (SnapshotValueQueryParam item : deviceFieldList) {
//field分为高频和低频查询
Map<String, Map<String, Map<String, Object>>> values = queryHistoryCurveValues(Long.valueOf(item.getDeviceId()), startTime, endTime, param.getInterval(), param.getFill(), item.getAttributes());
result.putAll(values);
}
Long end = System.currentTimeMillis();
log.debug("读取快照{}个,耗时: {}秒", param.getDevices().size(), (end-start)/ 1000.0);
return result;
}
private Map<String, Map<String, Map<String, Object>>> queryHistoryCurveValues(Long irn, Date startTime, Date endTime, String interval, String fill, List<String> attributes) {
String iotModelCode = sysIotModelFieldMapper.queryModelCodeByDeviceId(irn);
Map<String, Object> highSpeedFieldMap = highIotFieldMap.get(iotModelCode);
Map<String, Object> lowSpeedFieldMap = lowIotFieldMap.get(iotModelCode);
List<String> highSpeedField = new ArrayList<>();
List<String> lowSpeedField = new ArrayList<>();
for (String field : attributes) {
if (highSpeedFieldMap.containsKey(field)) {
highSpeedField.add(field);
}
if (lowSpeedFieldMap.containsKey(field)) {
lowSpeedField.add(field);
}
}
Map<String, Map<String, Map<String, Object>>> result = new HashMap<>();
if (!CollectionUtils.isEmpty(highSpeedField)) {
Map<String, Map<String, Map<String, Object>>> highHistoryCurve = tdEngineService.fetchHighHistoryCurve(irn, startTime, endTime, interval, highSpeedField);
result.putAll(highHistoryCurve);
}
if (!CollectionUtils.isEmpty(lowSpeedField)) {
Map<String, Map<String, Map<String, Object>>> lowHistoryCurve = tdEngineService.fetchLowHistoryCurve(irn, startTime, endTime, interval, lowSpeedField);
if (result.get(irn.toString()) == null) {
result.putAll(lowHistoryCurve);
} else {
result.get(irn.toString()).putAll(lowHistoryCurve.get(irn.toString()));
}
}
return result;
}
@Override
public void updateCalFieldData(List<CalculateRTData> calValues) {
//更新数据至redis,TD
ListUtil.page(calValues, COMMIT_COUNT, list -> {
Map<String, Object> keyValueMap = new HashMap<>();
for (CalculateRTData value : list) {
String key = String.format("RT:%s:%s", value.getDeviceId(), value.getIotModelField().toLowerCase());
keyValueMap.put(key, value.getDataValue());
}
adminRedisTemplate.mSet(keyValueMap);
tdEngineService.updateCalFieldValues(list);
});
}
@Override
public void createTdStable() {
List<IotModelFieldVo> allIotModel = sysIotModelMapper.getAllIotModel();
for (IotModelFieldVo item : allIotModel) {
String key = String.valueOf(item.getId());
iotModelMap.put(key, item.getIotModelCode());
List<SysIotModelField> allIotModelField = sysIotModelMapper.getAllIotModelField(item.getId());
Map<String, String> LowModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 0 && field.getAttributeType() != 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1));
Map<String, String> HighModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 1 && field.getAttributeType() != 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1));
Map<String, String> calculateFieldList = allIotModelField.stream().filter(field -> field.getAttributeType() == 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1));
Map<String, Object> map = new HashMap<>();
for (String field : HighModelFieldList.keySet()) {
map.put(field, HighModelFieldList.get(field));
}
highIotFieldMap.put(item.getIotModelCode(), map);
Map<String, Object> lowMap = new HashMap<>();
for (String field : LowModelFieldList.keySet()) {
lowMap.put(field, LowModelFieldList.get(field));
}
lowIotFieldMap.put(item.getIotModelCode(), lowMap);
calculateIotFieldMap.put(item.getIotModelCode(), calculateFieldList);
}
tdEngineService.initIotModel(allIotModel, highIotFieldMap, lowIotFieldMap, calculateIotFieldMap);
}
}

View File

@ -10,6 +10,7 @@ import com.das.common.config.SessionUtil;
import com.das.common.exceptions.ServiceException;
import com.das.common.utils.*;
import com.das.modules.auth.domain.vo.SysUserVo;
import com.das.modules.data.service.impl.DataServiceImpl;
import com.das.modules.equipment.domain.dto.SysIotModelDto;
import com.das.modules.equipment.domain.dto.SysIotModelFieldDto;
import com.das.modules.equipment.domain.dto.SysIotModelServiceDto;
@ -26,8 +27,7 @@ import com.das.modules.equipment.mapper.SysIotModelFieldMapper;
import com.das.modules.equipment.mapper.SysIotModelMapper;
import com.das.modules.equipment.mapper.SysIotModelServiceMapper;
import com.das.modules.equipment.service.SysIotModelService;
import com.das.modules.node.service.TDEngineService;
import com.das.modules.node.service.impl.DataServiceImpl;
import com.das.modules.data.service.TDEngineService;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
@ -45,7 +45,6 @@ import java.util.stream.Collectors;
@Service
@Slf4j
public class SysIotModelServiceImpl implements SysIotModelService {
public static final int COMMIT_COUNT = 1000;
@Autowired
private SysIotModelMapper sysIotModelMapper;

View File

@ -2,7 +2,7 @@ package com.das.modules.node.command;
import com.das.modules.node.constant.NodeConstant;
import com.das.modules.node.domain.bo.TerminalMessage;
import com.das.modules.node.service.DataService;
import com.das.modules.node.service.NodeMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -12,12 +12,12 @@ import org.springframework.stereotype.Service;
public class AnalogDataCommand implements BaseCommand {
@Autowired
DataService dataService;
NodeMessageService nodeMessageService;
@Override
public void doCommand(TerminalMessage data) {
try {
//analogData值只存入redis
dataService.handleData(data);
nodeMessageService.handleData(data);
} catch (Exception e) {
log.error("解析数据异常", e);

View File

@ -2,7 +2,7 @@ package com.das.modules.node.command;
import com.das.modules.node.constant.NodeConstant;
import com.das.modules.node.domain.bo.TerminalMessage;
import com.das.modules.node.service.DataService;
import com.das.modules.node.service.NodeMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -12,12 +12,12 @@ import org.springframework.stereotype.Service;
public class HisHighSpeedCommand implements BaseCommand {
@Autowired
DataService dataService;
NodeMessageService nodeMessageService;
@Override
public void doCommand(TerminalMessage data) {
try {
//analogData值只存入redis
dataService.handleHighSpeed(data);
nodeMessageService.handleHighSpeed(data);
} catch (Exception e) {
log.error("解析数据异常", e);

View File

@ -2,7 +2,7 @@ package com.das.modules.node.command;
import com.das.modules.node.constant.NodeConstant;
import com.das.modules.node.domain.bo.TerminalMessage;
import com.das.modules.node.service.DataService;
import com.das.modules.node.service.NodeMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -12,12 +12,12 @@ import org.springframework.stereotype.Service;
public class HisLowSpeedCommand implements BaseCommand {
@Autowired
DataService dataService;
NodeMessageService nodeMessageService;
@Override
public void doCommand(TerminalMessage data) {
try {
//analogData值只存入redis
dataService.handleLowSpeed(data);
nodeMessageService.handleLowSpeed(data);
} catch (Exception e) {
log.error("解析数据异常", e);

View File

@ -2,7 +2,7 @@ package com.das.modules.node.command;
import com.das.modules.node.constant.NodeConstant;
import com.das.modules.node.domain.bo.TerminalMessage;
import com.das.modules.node.service.DataService;
import com.das.modules.node.service.NodeMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -12,13 +12,13 @@ import org.springframework.stereotype.Service;
public class StateDataCommand implements BaseCommand {
@Autowired
DataService dataService;
NodeMessageService nodeMessageService;
@Override
public void doCommand(TerminalMessage data) {
try {
//只存入redis
dataService.handleData(data);
nodeMessageService.handleData(data);
} catch (Exception e) {
log.error("解析数据异常", e);
}

View File

@ -8,7 +8,7 @@ import com.das.common.result.R;
import com.das.common.utils.PageDataInfo;
import com.das.modules.node.domain.dto.*;
import com.das.modules.node.domain.vo.*;
import com.das.modules.node.service.DataService;
import com.das.modules.node.service.NodeMessageService;
import com.das.modules.node.service.SysNodeService;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
@ -31,7 +31,7 @@ public class SysNodeController {
private SysNodeService sysNodeService;
@Autowired
private DataService dataService;
private NodeMessageService nodeMessageService;
@ -85,7 +85,7 @@ public class SysNodeController {
/** 配置下发 */
@PostMapping("/configUpdate")
public R<?> configUpdate(@RequestBody SysNodeDto sysNodeDto) {
dataService.sendTerminalConfig(sysNodeDto.getId());
nodeMessageService.sendTerminalConfig(sysNodeDto.getId());
return R.success();
}

View File

@ -4,7 +4,7 @@ import com.das.common.utils.JsonUtils;
import com.das.common.utils.SpringUtil;
import com.das.modules.node.constant.NodeConstant;
import com.das.modules.node.domain.bo.TerminalMessage;
import com.das.modules.node.service.DataService;
import com.das.modules.node.service.NodeMessageService;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
@ -22,7 +22,7 @@ public class NodeMessageHandler extends TextWebSocketHandler {
public static final Long HEARTBEAT_TIMEOUT = 1000 * 60 * 1L;
private ConcurrentHashMap<Long, ConcurrentWebSocketSessionDecorator> onlineSessions = new ConcurrentHashMap<>(16);
private DataService dataService;
private NodeMessageService nodeMessageService;
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, "");
@ -44,10 +44,10 @@ public class NodeMessageHandler extends TextWebSocketHandler {
// 如果version是0则需要调用一次configUpdate配置更新
if (version == 0){
if (dataService == null){
dataService = SpringUtil.getBean(DataService.class);
if (nodeMessageService == null){
nodeMessageService = SpringUtil.getBean(NodeMessageService.class);
}
dataService.sendTerminalConfig(Long.valueOf(nodeId));
nodeMessageService.sendTerminalConfig(Long.valueOf(nodeId));
}
}
@ -59,11 +59,11 @@ public class NodeMessageHandler extends TextWebSocketHandler {
String cmd = msg.getCmd();
JsonNode data = msg.getData();
log.info("收到 Node:{} 命令: {}", nodeId, cmd);
if (dataService == null){
dataService = SpringUtil.getBean(DataService.class);
if (nodeMessageService == null){
nodeMessageService = SpringUtil.getBean(NodeMessageService.class);
}
if (dataService != null){
dataService.pushMessage(msg);
if (nodeMessageService != null){
nodeMessageService.pushMessage(msg);
}
}

View File

@ -6,18 +6,15 @@ import com.fasterxml.jackson.databind.JsonNode;
import java.util.List;
public interface DataService {
public interface NodeMessageService {
void pushMessage(TerminalMessage msg);
JsonNode sendTerminalConfig(Long nodeId);
void createTdStable();
void handleData(TerminalMessage data);
void handleHighSpeed(TerminalMessage data);
void handleLowSpeed(TerminalMessage data);
void updateCalFieldData(List<CalculateRTData> values);
}

View File

@ -16,8 +16,8 @@ import com.das.modules.node.domain.vo.*;
import com.das.modules.node.handler.NodeMessageHandler;
import com.das.modules.node.mapper.SysCommunicationLinkMapper;
import com.das.modules.node.mapper.SysImpTabMappingMapper;
import com.das.modules.node.service.DataService;
import com.das.modules.node.service.TDEngineService;
import com.das.modules.node.service.NodeMessageService;
import com.das.modules.data.service.TDEngineService;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@ -39,15 +39,13 @@ import java.util.stream.Collectors;
@Slf4j
@Service
public class DataServiceImpl implements DataService {
public class NodeMessageServiceImpl implements NodeMessageService {
private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
private Disruptor<TerminalMessage> disruptor = null;
private RingBuffer<TerminalMessage> ringBuffer = null;
public static final int COMMIT_NUMBER = 1000;
@Resource
TerminalMessageEventHandler terminalMessageEventHandler;
@ -69,20 +67,7 @@ public class DataServiceImpl implements DataService {
@Autowired
TDEngineService tdEngineService;
//key:deviceId value:modelCode
public ConcurrentHashMap<String, String> deviceModelMap = new ConcurrentHashMap<>(10000);
//key:modelId value:modelCode
public ConcurrentHashMap<String, String> iotModelMap = new ConcurrentHashMap<>(10000);
//key:modelCode value:Filed:Code,dataType
public ConcurrentHashMap<String, Map<String, Object>> highIotFieldMap = new ConcurrentHashMap<>(10000);
//key:modelCode value:Filed:Code,dataType
public ConcurrentHashMap<String, Map<String, Object>> lowIotFieldMap = new ConcurrentHashMap<>(10000);
//key:modelCode value:Filed:Code,dataType
public ConcurrentHashMap<String, Map<String, String>> calculateIotFieldMap = new ConcurrentHashMap<>(10000);
@PostConstruct
public void init() {
@ -224,31 +209,6 @@ public class DataServiceImpl implements DataService {
return jsonNode;
}
@Override
public void createTdStable() {
List<IotModelFieldVo> allIotModel = sysIotModelMapper.getAllIotModel();
for (IotModelFieldVo item : allIotModel) {
String key = String.valueOf(item.getId());
iotModelMap.put(key, item.getIotModelCode());
List<SysIotModelField> allIotModelField = sysIotModelMapper.getAllIotModelField(item.getId());
Map<String, String> LowModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 0 && field.getAttributeType() != 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1));
Map<String, String> HighModelFieldList = allIotModelField.stream().filter(field -> field.getHighSpeed() == 1 && field.getAttributeType() != 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1));
Map<String, String> calculateFieldList = allIotModelField.stream().filter(field -> field.getAttributeType() == 199).collect(Collectors.toMap(SysIotModelField::getAttributeCode, SysIotModelField::getDataType, (value1, value2) -> value1));
Map<String, Object> map = new HashMap<>();
for (String field : HighModelFieldList.keySet()) {
map.put(field, HighModelFieldList.get(field));
}
highIotFieldMap.put(item.getIotModelCode(), map);
Map<String, Object> lowMap = new HashMap<>();
for (String field : LowModelFieldList.keySet()) {
lowMap.put(field, LowModelFieldList.get(field));
}
lowIotFieldMap.put(item.getIotModelCode(), lowMap);
calculateIotFieldMap.put(item.getIotModelCode(), calculateFieldList);
}
tdEngineService.initIotModel(allIotModel, highIotFieldMap, lowIotFieldMap, calculateIotFieldMap);
}
@Override
public void handleData(TerminalMessage data) {
JsonNode jsonNode = data.getData();
@ -320,18 +280,6 @@ public class DataServiceImpl implements DataService {
tdEngineService.updateYCLowValues(highList, iotModelCode);
}
@Override
public void updateCalFieldData(List<CalculateRTData> calValues) {
//更新数据至redis,TD
ListUtil.page(calValues, COMMIT_NUMBER, list -> {
Map<String, Object> keyValueMap = new HashMap<>();
for (CalculateRTData value : list) {
String key = String.format("RT:%s:%s", value.getDeviceId(), value.getIotModelField().toLowerCase());
keyValueMap.put(key, value.getDataValue());
}
adminRedisTemplate.mSet(keyValueMap);
tdEngineService.updateCalFieldValues(list);
});
}
}

View File

@ -3,12 +3,11 @@ package com.das.modules.page.service;
import com.das.common.constant.EquipmentTypeIds;
import com.das.common.exceptions.ServiceException;
import com.das.modules.data.domain.SnapshotValueQueryParam;
import com.das.modules.data.service.DataService;
import com.das.modules.data.service.impl.DataServiceImpl;
import com.das.modules.equipment.domain.dto.SysEquipmentDto;
import com.das.modules.equipment.domain.vo.SysEquipmentVo;
import com.das.modules.equipment.mapper.SysEquipmentMapper;
import com.das.modules.node.domain.dto.DeviceCommandDto;
import com.das.modules.node.service.SysNodeService;
import com.das.modules.operation.service.OperationService;
import com.das.modules.page.domian.WindTurbinesPageVo;
import lombok.extern.slf4j.Slf4j;
@ -31,7 +30,7 @@ public class WindTurbinesPageService {
SysEquipmentMapper sysEquipmentMapper;
@Autowired
private DataService dataService;
private DataServiceImpl dataServiceImpl;
@Autowired
OperationService optService;
@ -107,7 +106,7 @@ public class WindTurbinesPageService {
}
//获取设备测点数据
Map<String, Map<String, Object>> map = dataService.querySnapshotValues(paramList);
Map<String, Map<String, Object>> map = dataServiceImpl.querySnapshotValues(paramList);
for (WindTurbinesPageVo item : windTurbinesPageVos) {
item.setAttributeMap(map.get(item.getIrn().toString()));
}

View File

@ -3,7 +3,7 @@ package com.das.modules.page.service.impl;
import com.das.common.constant.EquipmentTypeIds;
import com.das.modules.data.domain.SnapshotValueQueryParam;
import com.das.modules.data.service.DataService;
import com.das.modules.data.service.impl.DataServiceImpl;
import com.das.modules.equipment.domain.dto.SysEquipmentDto;
import com.das.modules.equipment.domain.vo.SysEquipmentVo;
import com.das.modules.equipment.mapper.SysEquipmentMapper;
@ -25,7 +25,7 @@ public class HomeServiceImpl implements HomeService {
SysEquipmentMapper sysEquipmentMapper;
@Autowired
private DataService dataService;
private DataServiceImpl dataServiceImpl;
//缺省风电场对象
private long defaultWindFarmId = 0;
@ -80,7 +80,7 @@ public class HomeServiceImpl implements HomeService {
homeWindRealTimeVoList.add(homeWindRealTimeVoResult);
}
//获取设备测点数据
Map<String, Map<String, Object>> map = dataService.querySnapshotValues(paramList);
Map<String, Map<String, Object>> map = dataServiceImpl.querySnapshotValues(paramList);
for (HomeWindTurbineMatrixDataVoVo item : homeWindRealTimeVoList) {
item.setAttributeMap(map.get(item.getIrn().toString()));
}
@ -137,7 +137,7 @@ public class HomeServiceImpl implements HomeService {
snapshotValueQueryParam.setDeviceId(windFarmId.toString());
paramList.add(snapshotValueQueryParam);
//获取设备测点数据
Map<String, Map<String, Object>> map = dataService.querySnapshotValues(paramList);
Map<String, Map<String, Object>> map = dataServiceImpl.querySnapshotValues(paramList);
HomeWindFarmRealDataVo homeWindFarmRealDataVo = new HomeWindFarmRealDataVo();
homeWindFarmRealDataVo.setWindFarmId(windFarmId);
if (map !=null){