增加窗口查询接口

This commit is contained in:
huguanghan 2024-12-18 21:09:11 +08:00
parent ea23961d0d
commit 50fe838c6f
5 changed files with 331 additions and 1 deletions

View File

@ -4,6 +4,7 @@ import com.das.common.result.R;
import com.das.common.utils.JsonUtils;
import com.das.modules.data.domain.SnapshotValueQueryParam;
import com.das.modules.data.domain.TSValueQueryParam;
import com.das.modules.data.domain.WindowValueQueryParam;
import com.das.modules.data.service.DataService;
import com.das.modules.data.service.impl.DataServiceImpl;
import jakarta.validation.Valid;
@ -53,4 +54,17 @@ public class DataController {
}
return R.success(dataService.queryTimeSeriesValues(param));
}
/**
* 区间聚合函数
* @param param 查询条件
* @return TD数据库数据
*/
@PostMapping("/windows")
public R<Map<String, Map<String, Map<String, Object>>>> queryWindowsValues(@RequestBody @Valid WindowValueQueryParam param) {
if (log.isDebugEnabled()){
log.debug("/api/rtdbsvr/timeseries is calling");
}
return R.success(dataService.queryWindowsValues(param));
}
}

View File

@ -0,0 +1,39 @@
package com.das.modules.data.domain;
import lombok.Data;
import java.util.List;
/**
* 时序数据查询实体
*/
@Data
public class WindowValueQueryParam
{
/**
* 开始时间
*/
private String startTime;
/**
* 结束时间
*/
private String endTime;
/**
* 间隔
*/
private String interval;
/**
* 填充模式
*/
private String fill;
/**
* 设备属性列表
*/
private List<SnapshotValueQueryParam> devices;
private String calFunction;
}

View File

@ -2,6 +2,7 @@ package com.das.modules.data.service;
import com.das.modules.data.domain.SnapshotValueQueryParam;
import com.das.modules.data.domain.TSValueQueryParam;
import com.das.modules.data.domain.WindowValueQueryParam;
import com.das.modules.node.domain.bo.CalculateRTData;
import java.util.List;
@ -13,6 +14,8 @@ public interface DataService {
Map<String, Map<String, Map<String, Object>>> queryTimeSeriesValues(TSValueQueryParam param);
Map<String, Map<String, Map<String, Object>>> queryWindowsValues(WindowValueQueryParam param);
void createTdStable();
void updateCalFieldData(List<CalculateRTData> values);

View File

@ -506,6 +506,188 @@ public class TDEngineService {
return result;
}
public Map<String, Map<String, Map<String, Object>>> fetchHighWindowsCurve(Long irn, Date startTime, Date endTime, String interval, List<String> fieldList,String calFunction) {
String tbName = String.format("h%d", irn);
Date now = new Date();
if (endTime.after(now)) {
endTime = now;
}
Map<String, Map<String, Map<String, Object>>> result = new HashMap<>();
Map<String, Map<String, Object>> valueMap = new HashMap<>();
for (String item : fieldList) {
Map<String, Object> timeValueMap = new HashMap<>();
List<Long> times = new ArrayList<>();
List<Object> objects = new ArrayList<>();
timeValueMap.put("times", times);
timeValueMap.put("values", objects);
valueMap.put(item, timeValueMap);
}
StringBuffer sb = new StringBuffer(2048);
if (!(StrUtil.isNotBlank(interval) && interval.equals("NONE"))) {
String intervalStr = convertInterval(interval);
sb.append("select _WSTART, _WEND, updatetime");
fieldList.forEach(field ->
sb.append(" ,").append(calFunction).append("(").append(field).append(") ")
);
sb.append(" from ");
sb.append(tbName);
sb.append(" where ");
sb.append(String.format(" updatetime >= %d and updatetime < %d ", startTime.getTime(), endTime.getTime()));
sb.append(String.format(" INTERVAL(%s)", intervalStr));
sb.append(String.format(" FILL(%s)", "NONE"));
}
log.debug(sb.toString());
try (Connection conn = hikariDataSource.getConnection();
Statement smt = conn.createStatement();
ResultSet rs = smt.executeQuery(sb.toString())) {
while (rs.next()) {
for (int i = 0; i < fieldList.size(); i++) {
if (valueMap.get(fieldList.get(i)) == null) {
Map<String, Object> map = new HashMap<>();
List<Long> timeList = new ArrayList<>();
timeList.add(rs.getTimestamp(3).getTime());
List<Object> valueList = new ArrayList<>();
valueList.add(rs.getObject(fieldList.get(i).toLowerCase()));
map.put("times", timeList);
map.put("values", valueList);
valueMap.put(fieldList.get(i), map);
} else {
Map<String, Object> map = valueMap.get(fieldList.get(i));
List<Long> times = (List<Long>) map.get("times");
List<Object> values = (List<Object>) map.get("values");
times.add(rs.getTimestamp(3).getTime());
values.add(rs.getObject(fieldList.get(i).toLowerCase()));
}
}
}
result.put(irn.toString(), valueMap);
} catch (Exception e) {
log.error("获取数据异常", e);
return result;
}
return result;
}
public Map<String, Map<String, Map<String, Object>>> fetchLowWindowsCurve(Long irn, Date startTime, Date endTime, String interval, List<String> fieldList,String calFunction) {
String tbName = String.format("l%d", irn);
Date now = new Date();
if (endTime.after(now)) {
endTime = now;
}
Map<String, Map<String, Map<String, Object>>> result = new HashMap<>();
Map<String, Map<String, Object>> valueMap = new HashMap<>();
for (String item : fieldList) {
Map<String, Object> timeValueMap = new HashMap<>();
List<Long> times = new ArrayList<>();
List<Object> objects = new ArrayList<>();
timeValueMap.put("times", times);
timeValueMap.put("values", objects);
valueMap.put(item, timeValueMap);
}
StringBuffer sb = new StringBuffer(2048);
if (!(StrUtil.isNotBlank(interval) && interval.equals("NONE"))) {
String intervalStr = convertInterval(interval);
sb.append("select _WSTART, _WEND,updatetime");
fieldList.forEach(field ->
sb.append(" ,").append(calFunction).append("(").append(field).append(") ")
);
sb.append(" from ");
sb.append(tbName);
sb.append(" where ");
sb.append(String.format(" updatetime >= %d and updatetime < %d ", startTime.getTime(), endTime.getTime()));
sb.append(String.format(" INTERVAL(%s)", intervalStr));
sb.append(String.format(" FILL(%s)", "NONE"));
}
log.debug(sb.toString());
try (Connection conn = hikariDataSource.getConnection();
Statement smt = conn.createStatement();
ResultSet rs = smt.executeQuery(sb.toString())) {
while (rs.next()) {
for (int i = 0; i < fieldList.size(); i++) {
if (valueMap.get(fieldList.get(i)) == null) {
Map<String, Object> map = new HashMap<>();
List<Long> timeList = new ArrayList<>();
timeList.add(rs.getTimestamp(3).getTime());
List<Object> valueList = new ArrayList<>();
valueList.add(rs.getObject(fieldList.get(i).toLowerCase()));
map.put("times", timeList);
map.put("values", valueList);
valueMap.put(fieldList.get(i), map);
} else {
Map<String, Object> map = valueMap.get(fieldList.get(i));
List<Long> times = (List<Long>) map.get("times");
List<Object> values = (List<Object>) map.get("values");
times.add(rs.getTimestamp(3).getTime());
values.add(rs.getObject(fieldList.get(i).toLowerCase()));
}
}
}
result.put(irn.toString(), valueMap);
} catch (Exception e) {
log.error("获取数据异常", e);
return result;
}
return result;
}
public Map<String, Map<String, Map<String, Object>>> fetchCalWindowsCurve(Long irn, Date startTime, Date endTime, String interval, String calFieldCode,String calFunction) {
Date now = new Date();
if (endTime.after(now)) {
endTime = now;
}
Map<String, Map<String, Map<String, Object>>> result = new HashMap<>();
Map<String, Map<String, Object>> valueMap = new HashMap<>();
Map<String, Object> timeValueMap = new HashMap<>();
List<Long> times = new ArrayList<>();
List<Object> objects = new ArrayList<>();
timeValueMap.put("times", times);
timeValueMap.put("values", objects);
valueMap.put(calFieldCode, timeValueMap);
StringBuffer sb = new StringBuffer(2048);
if (!(StrUtil.isNotBlank(interval) && interval.equals("NONE"))) {
sb.append("select _WSTART, _WEND,updatetime,");
sb.append(calFunction).append("(datavalue) as datavalue");
sb.append(" from c_");
sb.append(irn).append("_").append(calFieldCode);
sb.append(" where ");
sb.append(String.format(" updatetime >= %d and updatetime < %d ", startTime.getTime(), endTime.getTime()));
sb.append(String.format(" INTERVAL(%s)", interval));
sb.append(String.format(" FILL(%s)", "NONE"));
}
log.debug(sb.toString());
try (Connection conn = hikariDataSource.getConnection();
Statement smt = conn.createStatement();
ResultSet rs = smt.executeQuery(sb.toString())) {
while (rs.next()) {
if (valueMap.get(calFieldCode) == null) {
Map<String, Object> map = new HashMap<>();
List<Long> timeList = new ArrayList<>();
timeList.add(rs.getTimestamp(3).getTime());
List<Object> valueList = new ArrayList<>();
valueList.add(rs.getObject("datavalue"));
map.put("times", timeList);
map.put("values", valueList);
valueMap.put(calFieldCode, map);
} else {
Map<String, Object> map = valueMap.get(calFieldCode);
List<Long> timeList = (List<Long>) map.get("times");
List<Object> values = (List<Object>) map.get("values");
timeList.add(rs.getTimestamp(3).getTime());
values.add(rs.getObject("datavalue"));
}
}
result.put(irn.toString(), valueMap);
} catch (Exception e) {
log.error("获取数据异常", e);
return result;
}
return result;
}
public Map<String, Map<String, Map<String, Object>>> fetchLowHistoryCurve(Long irn, Date startTime, Date endTime, String interval, List<String> fieldList) {
SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String tbName = String.format("l%d", irn);
@ -614,7 +796,6 @@ public class TDEngineService {
sb.append(" order by updatetime");
} else {
sb.append("select updatetime, datavalue");
sb.append(" from ");
sb.append(" from c_");
sb.append(irn).append("_").append(calFieldCode);
sb.append(" where ");

View File

@ -4,10 +4,12 @@ import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil;
import com.das.common.exceptions.ServiceException;
import com.das.common.utils.AdminRedisTemplate;
import com.das.common.utils.StringUtils;
import com.das.modules.cache.domain.DeviceInfoCache;
import com.das.modules.cache.service.CacheService;
import com.das.modules.data.domain.SnapshotValueQueryParam;
import com.das.modules.data.domain.TSValueQueryParam;
import com.das.modules.data.domain.WindowValueQueryParam;
import com.das.modules.data.service.DataService;
import com.das.modules.data.service.TDEngineService;
import com.das.modules.equipment.domain.vo.IotModelFieldVo;
@ -151,6 +153,27 @@ public class DataServiceImpl implements DataService {
return result;
}
@Override
public Map<String, Map<String, Map<String, Object>>> queryWindowsValues(WindowValueQueryParam param) {
Long start = System.currentTimeMillis();
if (CollectionUtil.isEmpty(param.getDevices()) || (param.getStartTime() == null && param.getEndTime() == null && param.getCalFunction() == null)) {
throw new ServiceException("必要参数缺失");
}
Date startTime = new Date(Long.parseLong(param.getStartTime()));
Date endTime = new Date(Long.parseLong(param.getEndTime()));
String windowType = param.getCalFunction();
Map<String, Map<String, Map<String, Object>>> result = new HashMap<>(param.getDevices().size());
List<SnapshotValueQueryParam> deviceFieldList = param.getDevices();
for (SnapshotValueQueryParam item : deviceFieldList) {
//field分为高频和低频查询
Map<String, Map<String, Map<String, Object>>> values = queryWindowsCurveValues(Long.valueOf(item.getDeviceId()), startTime, endTime, param.getInterval(), param.getFill(), item.getAttributes(),windowType);
result.putAll(values);
}
Long end = System.currentTimeMillis();
log.debug("读取快照{}个,耗时: {}秒", param.getDevices().size(), (end-start)/ 1000.0);
return result;
}
private Map<String, Map<String, Map<String, Object>>> queryHistoryCurveValues(Long irn, Date startTime, Date endTime, String interval, String fill, List<String> attributes) {
StopWatch stopWatch = new StopWatch();
stopWatch.start("prepare resources");
@ -208,6 +231,67 @@ public class DataServiceImpl implements DataService {
return result;
}
private Map<String, Map<String, Map<String, Object>>> queryWindowsCurveValues(Long irn, Date startTime, Date endTime, String interval, String fill, List<String> attributes,String windowType) {
StopWatch stopWatch = new StopWatch();
stopWatch.start("prepare resources");
String function = mappingFunction(windowType);
if (StringUtils.isEmpty(function)){
throw new ServiceException("计算方法参数不正确,请检查参数");
}
String iotModelCode = sysIotModelFieldMapper.queryModelCodeByDeviceId(irn);
Map<String, Object> highSpeedFieldMap = highIotFieldMap.get(iotModelCode);
Map<String, Object> lowSpeedFieldMap = lowIotFieldMap.get(iotModelCode);
Map<String, String> calFieldMap = calculateIotFieldMap.get(iotModelCode);
List<String> highSpeedField = new ArrayList<>();
List<String> lowSpeedField = new ArrayList<>();
List<String> calField = new ArrayList<>();
for (String field : attributes) {
if (highSpeedFieldMap.containsKey(field)) {
highSpeedField.add(field);
}
if (lowSpeedFieldMap.containsKey(field)) {
lowSpeedField.add(field);
}
if (calFieldMap.containsKey(field)){
calField.add(field);
}
}
stopWatch.stop();
stopWatch.start("HighSpeedValues");
Map<String, Map<String, Map<String, Object>>> result = new HashMap<>();
if (!CollectionUtils.isEmpty(highSpeedField)) {
Map<String, Map<String, Map<String, Object>>> highHistoryCurve = tdEngineService.fetchHighWindowsCurve(irn, startTime, endTime, interval, highSpeedField,function);
result.putAll(highHistoryCurve);
}
stopWatch.stop();
stopWatch.start("LowSpeedValues");
if (!CollectionUtils.isEmpty(lowSpeedField)) {
Map<String, Map<String, Map<String, Object>>> lowHistoryCurve = tdEngineService.fetchLowWindowsCurve(irn, startTime, endTime, interval, lowSpeedField,function);
if (result.get(irn.toString()) == null) {
result.putAll(lowHistoryCurve);
} else {
result.get(irn.toString()).putAll(lowHistoryCurve.get(irn.toString()));
}
}
stopWatch.stop();
stopWatch.start("CalculateValues");
if (!CollectionUtils.isEmpty(calField)){
ListUtil.page(calField,COMMIT_COUNT,list -> {
for (String item : list){
Map<String, Map<String, Map<String, Object>>> calHistoryCurve = tdEngineService.fetchCalWindowsCurve(irn, startTime, endTime, interval, item,function);
if (result.get(irn.toString()) == null) {
result.putAll(calHistoryCurve);
} else {
result.get(irn.toString()).putAll(calHistoryCurve.get(irn.toString()));
}
}
});
}
stopWatch.stop();
log.debug("查询历史数据耗时: {}", stopWatch.prettyPrint());
return result;
}
@Override
public void updateCalFieldData(List<CalculateRTData> calValues) {
//更新数据至redis,TD
@ -320,4 +404,13 @@ public class DataServiceImpl implements DataService {
}
return tdEngineService.getTimeAvgValue(tableName, attr.toLowerCase(), startTime, endTime);
}
private String mappingFunction(String calFunction){
return switch (calFunction) {
case "average" -> "AVG";
case "max" -> "MAX";
case "min" -> "MIN";
default -> "";
};
}
}