实时数据查询,历史区间数据查询接口新增

This commit is contained in:
huguanghan 2024-10-21 17:04:29 +08:00
parent f58f0cc26a
commit e3c6d293b6
7 changed files with 417 additions and 12 deletions

View File

@ -0,0 +1,57 @@
package com.das.modules.data.controller;
import com.das.common.result.R;
import com.das.common.utils.JsonUtils;
import com.das.modules.data.domain.SnapshotValueQueryParam;
import com.das.modules.data.domain.TSValueQueryParam;
import com.das.modules.data.service.DataService;
import jakarta.validation.Valid;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import java.util.Map;
/**
* 数据查询controller
*/
@Slf4j
@RequestMapping("/api/data")
@RestController
public class DataController {
@Autowired
DataService dataService;
/**
* 实时数据查询
* @param param 查询数据请求体
* @return redis数据
*/
@PostMapping("/snapshot")
public R<Map<String,Map<String,Object>>> querySnapshotValues(@RequestBody @Valid List<SnapshotValueQueryParam> param) {
if (log.isDebugEnabled()){
log.debug("/api/rtdbsvr/snapshot is calling");
log.debug(JsonUtils.toJsonString(param));
}
return R.success(dataService.querySnapshotValues(param));
}
/**
* 历史区间数据查询
* @param param
* @return
*/
@PostMapping("/history")
public R<Map<String, Map<String, Map<String, Object>>>> queryTimeSeriesValues(@RequestBody @Valid TSValueQueryParam param) {
if (log.isDebugEnabled()){
log.debug("/api/rtdbsvr/timeseries is calling");
log.debug(JsonUtils.toJsonString(param));
}
return R.success(dataService.queryTimeSeriesValues(param));
}
}

View File

@ -0,0 +1,20 @@
package com.das.modules.data.domain;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import java.util.List;
@JsonIgnoreProperties(ignoreUnknown = true)
@Data
public class SnapshotValueQueryParam
{
/**
* 测点irn列表
*/
@NotNull
private String deviceId;
private List<String> attributes;
}

View File

@ -0,0 +1,39 @@
package com.das.modules.data.domain;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import java.util.Date;
import java.util.List;
/**
* 时序数据查询实体
*/
@Data
public class TSValueQueryParam
{
/**
* 开始时间
*/
private String startTime;
/**
* 结束时间
*/
private String endTime;
/**
* 间隔
*/
private String interval;
/**
* 填充模式
*/
private String fill;
/**
* 设备属性列表
*/
private List<SnapshotValueQueryParam> devices;
}

View File

@ -0,0 +1,125 @@
package com.das.modules.data.service;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil;
import com.das.common.exceptions.ServiceException;
import com.das.common.utils.AdminRedisTemplate;
import com.das.modules.data.domain.SnapshotValueQueryParam;
import com.das.modules.data.domain.TSValueQueryParam;
import com.das.modules.equipment.entity.SysIotModelField;
import com.das.modules.equipment.mapper.SysIotModelFieldMapper;
import com.das.modules.node.service.TDEngineService;
import com.das.modules.node.service.impl.DataServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.*;
@Slf4j
@Service
public class DataService {
public static final int COMMIT_COUNT = 1000;
@Autowired
AdminRedisTemplate adminRedisTemplate;
@Autowired
private SysIotModelFieldMapper sysIotModelFieldMapper;
@Autowired
private TDEngineService tdEngineService;
@Autowired
private DataServiceImpl dataService;
// 读取实时数据快照
public Map<String,Map<String,Object>> querySnapshotValues(List<SnapshotValueQueryParam> paramList){
long start = System.currentTimeMillis();
Map<String,Map<String,Object>> result = new HashMap<>(paramList.size());
Map<String,Map<String,Object>> finalResult = result;
ListUtil.page(paramList, COMMIT_COUNT, list->{
List<String> keyList = new ArrayList<>();
for (int i = 0; i < list.size(); i++) {
SnapshotValueQueryParam snapshotValueQueryParam = list.get(i);
List<String> attributes = snapshotValueQueryParam.getAttributes();
if (CollectionUtils.isEmpty(attributes)){
//为空查全部
List<String> sysIotModelFields = sysIotModelFieldMapper.queryAllFiledNames(Long.valueOf(snapshotValueQueryParam.getDeviceId()));
for (String item : sysIotModelFields){
String key = String.format("RT:[%s]:[%s]", snapshotValueQueryParam.getDeviceId(), item);
keyList.add(key);
}
}else {
for (String item :attributes){
String key = String.format("RT:[%s]:[%s]", snapshotValueQueryParam.getDeviceId(), item);
keyList.add(key);
}
}
}
List<Object> dataList = adminRedisTemplate.mGet(keyList);
for (int i = 0; i < keyList.size(); i++){
String key = keyList.get(i);
int firstColonIndex = key.indexOf('[');
int firstIndex = key.indexOf(']');
int secondIndex = key.indexOf(']',firstIndex + 1);
int secondColonIndex = key.indexOf('[', firstColonIndex + 1);
String deviceId = key.substring(firstColonIndex + 1, firstIndex);
String fieldName = key.substring(secondColonIndex + 1,secondIndex);
if (finalResult.get(deviceId) == null){
Map<String,Object> valueMap = new HashMap<>();
valueMap.put(fieldName,dataList.get(i));
finalResult.put(deviceId,valueMap);
}else {
finalResult.get(deviceId).put(fieldName,dataList.get(i));
}
}
});
long end = System.currentTimeMillis();
log.debug("读取快照{}个,耗时: {}秒", paramList.size(), (end-start)/ 1000.0);
return finalResult;
}
public Map<String, Map<String, Map<String, Object>>> queryTimeSeriesValues(TSValueQueryParam param) {
if(CollectionUtil.isEmpty(param.getDevices()) || (param.getStartTime() == null && param.getEndTime() == null)){
throw new ServiceException("必要参数缺失");
}
Date startTime = new Date(Long.parseLong(param.getStartTime()));
Date endTime = new Date(Long.parseLong(param.getEndTime()));
Map<String, Map<String, Map<String, Object>>> result = new HashMap<>(param.getDevices().size());
List<SnapshotValueQueryParam> deviceFieldList = param.getDevices();
for (SnapshotValueQueryParam item : deviceFieldList){
//field分为高频和低频查询
Map<String, Map<String, Map<String, Object>>> values = queryHistoryCurveValues(Long.valueOf(item.getDeviceId()), startTime, endTime, param.getInterval(), param.getFill(),item.getAttributes());
result.putAll(values);
}
return result;
}
private Map<String, Map<String, Map<String, Object>>> queryHistoryCurveValues(Long irn, Date startTime, Date endTime, String interval, String fill,List<String> attributes ){
String iotModelCode = sysIotModelFieldMapper.queryModelCodeByDeviceId(irn);
Map<String, Object> highSpeedFieldMap = dataService.highIotFieldMap.get(iotModelCode);
Map<String, Object> lowSpeedFieldMap = dataService.lowIotFieldMap.get(iotModelCode);
List<String> highSpeedField = new ArrayList<>();
List<String> lowSpeedField = new ArrayList<>();
for (String field : attributes){
if (highSpeedFieldMap.containsKey(field)){
highSpeedField.add(field);
}
if (lowSpeedFieldMap.containsKey(field)){
lowSpeedField.add(field);
}
}
Map<String, Map<String, Map<String, Object>>> lowHistoryCurve = tdEngineService.fetchLowHistoryCurve(irn, startTime, endTime, interval, lowSpeedField);
Map<String, Map<String, Map<String, Object>>> highHistoryCurve = tdEngineService.fetchHighHistoryCurve(irn, startTime, endTime, interval, highSpeedField);
Map<String, Map<String, Map<String, Object>>> result = new HashMap<>(lowHistoryCurve);
result.get(irn.toString()).putAll(highHistoryCurve.get(irn.toString()));
return result;
}
}

View File

@ -9,6 +9,8 @@ import com.das.modules.equipment.entity.SysIotModelField;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
@Mapper
public interface SysIotModelFieldMapper extends BaseMapperPlus<SysIotModelField, SysIotModelField> {
@ -17,4 +19,8 @@ public interface SysIotModelFieldMapper extends BaseMapperPlus<SysIotModelField,
Long querySysIotModelFieldByModelId(Long id);
SysIotModelFieldVo selectByAttributeCode(Long iotModelId, String code);
List<String> queryAllFiledNames(@Param("deviceId") Long deviceId);
String queryModelCodeByDeviceId(@Param("deviceId") Long deviceId);
}

View File

@ -1,6 +1,7 @@
package com.das.modules.node.service;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.StrUtil;
import com.das.modules.equipment.domain.vo.IotModelFieldVo;
import com.das.modules.equipment.mapper.SysIotModelMapper;
import com.das.modules.node.domain.bo.RTData;
@ -13,13 +14,12 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@ -131,7 +131,8 @@ public class TDEngineService {
sb.setLength(0);
sb.append("ALTER STABLE ");
sb.append(stableName);
sb.append(" DROP COLUMN");;
sb.append(" DROP COLUMN");
;
sb.append(fieldCode);
sb.append(";");
try {
@ -140,7 +141,7 @@ public class TDEngineService {
log.error("删除超级表列失败:{},失败原因{}", sb.toString(), e);
}
}catch (Exception ignored){
} catch (Exception ignored) {
}
}
@ -148,7 +149,7 @@ public class TDEngineService {
/**
* 删除超级表
*/
public void deleteStable(String stableName){
public void deleteStable(String stableName) {
try (Connection conn = hikariDataSource.getConnection();
Statement pstmt = conn.createStatement()) {
StringBuilder sb = new StringBuilder(1024 * 1024);
@ -162,7 +163,7 @@ public class TDEngineService {
log.error("删除超级表失败:{},失败原因{}", sb.toString(), e);
}
}catch (Exception ignored){
} catch (Exception ignored) {
}
}
@ -191,7 +192,7 @@ public class TDEngineService {
for (String key : map.keySet()) {
sb.append(", ");
sb.append(key);
sb.append(" "+map.get(key));
sb.append(" " + map.get(key));
}
sb.append(") TAGS (`deviceid` BIGINT);");
try {
@ -215,7 +216,7 @@ public class TDEngineService {
for (String key : map.keySet()) {
sb.append(", ");
sb.append(key);
sb.append(" "+map.get(key));
sb.append(" " + map.get(key));
}
sb.append(") TAGS (`deviceid` BIGINT);");
try {
@ -342,6 +343,153 @@ public class TDEngineService {
}
}
public Map<String, Map<String, Map<String, Object>>> fetchHighHistoryCurve(Long irn, Date startTime, Date endTime, String interval, List<String> fieldList) {
SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String tbName = String.format("h%d", irn);
Date now = new Date();
if (endTime.after(now)) {
endTime = now;
}
Map<String, Map<String, Map<String, Object>>> result = new HashMap<>();
Map<String, Map<String, Object>> valueMap = new HashMap<>();
StringBuffer sb = new StringBuffer(2048);
if (StrUtil.isNotBlank(interval)) {
String startTimeStr = SIMPLE_DATE_FORMAT.format(startTime);
String endTimeStr = SIMPLE_DATE_FORMAT.format(endTime);
String timeStr = String.format("'%s','%s'", startTimeStr, endTimeStr);
String intervalStr = convertInterval(interval);
sb.append("select _irowts updatetime");
fieldList.forEach(field ->
sb.append(" ,").append("interp(").append(field).append(") ").append(field)
);
sb.append(" from ");
sb.append(tbName);
sb.append(String.format(" range(%s)", timeStr));
sb.append(String.format(" every(%s)", intervalStr));
sb.append(String.format(" FILL(%s)", "PREV"));
sb.append(" order by updatetime");
} else {
sb.append("select updatetime, datavalue from ");
fieldList.forEach(field ->
sb.append(", ").append(field)
);
sb.append(" from ");
sb.append(tbName);
sb.append(" where ");
sb.append(String.format(" updatetime >= %d and updatetime < %d ", startTime.getTime(), endTime.getTime()));
sb.append(" order by updatetime");
}
log.debug(sb.toString());
try (Connection conn = hikariDataSource.getConnection();
Statement smt = conn.createStatement();
ResultSet rs = smt.executeQuery(sb.toString())) {
while (rs.next()) {
for (int i = 0; i < fieldList.size(); i++){
if (valueMap.get(fieldList.get(i)) == null){
Map<String,Object> map = new HashMap<>();
List<Long> timeList = new ArrayList<>();
timeList.add(rs.getTimestamp(1).getTime());
List<Object> valueList = new ArrayList<>();
valueList.add(rs.getObject(fieldList.get(i)));
map.put("times",timeList);
map.put("values",valueList);
valueMap.put(fieldList.get(i),map);
}else {
Map<String, Object> map = valueMap.get(fieldList.get(i));
List<Long> times = (List<Long>) map.get("times");
List<Object> values = (List<Object>) map.get("values");
times.add(rs.getTimestamp(1).getTime());
values.add(rs.getObject(fieldList.get(i)));
}
}
}
result.put(irn.toString(),valueMap);
} catch (Exception e) {
log.error("获取数据异常", e);
return result;
}
return result;
}
public Map<String, Map<String, Map<String, Object>>> fetchLowHistoryCurve(Long irn, Date startTime, Date endTime, String interval, List<String> fieldList) {
SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String tbName = String.format("l%d", irn);
Date now = new Date();
if (endTime.after(now)) {
endTime = now;
}
Map<String, Map<String, Map<String, Object>>> result = new HashMap<>();
Map<String, Map<String, Object>> valueMap = new HashMap<>();
StringBuffer sb = new StringBuffer(2048);
if (StrUtil.isNotBlank(interval)) {
String startTimeStr = SIMPLE_DATE_FORMAT.format(startTime);
String endTimeStr = SIMPLE_DATE_FORMAT.format(endTime);
String timeStr = String.format("'%s','%s'", startTimeStr, endTimeStr);
String intervalStr = convertInterval(interval);
sb.append("select _irowts updatetime");
fieldList.forEach(field ->
sb.append(" ,").append("interp(").append(field).append(") ").append(field)
);
sb.append(" from ");
sb.append(tbName);
sb.append(String.format(" range(%s)", timeStr));
sb.append(String.format(" every(%s)", intervalStr));
sb.append(String.format(" FILL(%s)", "PREV"));
sb.append(" order by updatetime");
} else {
sb.append("select updatetime, datavalue from ");
fieldList.forEach(field ->
sb.append(", ").append(field)
);
sb.append(" from ");
sb.append(tbName);
sb.append(" where ");
sb.append(String.format(" updatetime >= %d and updatetime < %d ", startTime.getTime(), endTime.getTime()));
sb.append(" order by updatetime");
}
log.debug(sb.toString());
try (Connection conn = hikariDataSource.getConnection();
Statement smt = conn.createStatement();
ResultSet rs = smt.executeQuery(sb.toString())) {
while (rs.next()) {
for (int i = 0; i < fieldList.size(); i++){
if (valueMap.get(fieldList.get(i)) == null){
Map<String,Object> map = new HashMap<>();
List<Long> timeList = new ArrayList<>();
timeList.add(rs.getTimestamp(1).getTime());
List<Object> valueList = new ArrayList<>();
valueList.add(rs.getObject(fieldList.get(i)));
map.put("times",timeList);
map.put("values",valueList);
valueMap.put(fieldList.get(i),map);
}else {
Map<String, Object> map = valueMap.get(fieldList.get(i));
List<Long> times = (List<Long>) map.get("times");
List<Object> values = (List<Object>) map.get("values");
times.add(rs.getTimestamp(1).getTime());
values.add(rs.getObject(fieldList.get(i)));
}
}
}
result.put(irn.toString(),valueMap);
} catch (Exception e) {
log.error("获取数据异常", e);
return result;
}
return result;
}
private String convertInterval(String interval) {
if (!StringUtils.hasText(interval)) {
interval = "1m";
}
return interval;
}
@PreDestroy
public void free() {

View File

@ -40,6 +40,16 @@
<select id="selectByAttributeCode" resultMap="SysIotModelFieldMap">
select * from sys_iot_model_field simf where simf.iot_model_id = #{iotModelId} and upper(simf.attribute_code) = upper(#{code})
</select>
<select id="queryAllFiledNames" resultType="java.lang.String">
select simf.attribute_code from sys_iot_model_field simf
left join sys_equipment se on se.iot_model_id = simf.iot_model_id
where se.id = #{deviceId}
</select>
<select id="queryModelCodeByDeviceId" resultType="java.lang.String">
select sim.iot_model_code from sys_equipment se
left join sys_iot_model sim on se.iot_model_id = sim.id
where se.id = #{deviceId}
</select>
</mapper>