From 50fe838c6fab947271241572ee72f77e31fea1bf Mon Sep 17 00:00:00 2001 From: huguanghan Date: Wed, 18 Dec 2024 21:09:11 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=AA=97=E5=8F=A3=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../data/controller/DataController.java | 14 ++ .../data/domain/WindowValueQueryParam.java | 39 ++++ .../das/modules/data/service/DataService.java | 3 + .../modules/data/service/TDEngineService.java | 183 +++++++++++++++++- .../data/service/impl/DataServiceImpl.java | 93 +++++++++ 5 files changed, 331 insertions(+), 1 deletion(-) create mode 100644 das/src/main/java/com/das/modules/data/domain/WindowValueQueryParam.java diff --git a/das/src/main/java/com/das/modules/data/controller/DataController.java b/das/src/main/java/com/das/modules/data/controller/DataController.java index f03c75b0..76808959 100644 --- a/das/src/main/java/com/das/modules/data/controller/DataController.java +++ b/das/src/main/java/com/das/modules/data/controller/DataController.java @@ -4,6 +4,7 @@ import com.das.common.result.R; import com.das.common.utils.JsonUtils; import com.das.modules.data.domain.SnapshotValueQueryParam; import com.das.modules.data.domain.TSValueQueryParam; +import com.das.modules.data.domain.WindowValueQueryParam; import com.das.modules.data.service.DataService; import com.das.modules.data.service.impl.DataServiceImpl; import jakarta.validation.Valid; @@ -53,4 +54,17 @@ public class DataController { } return R.success(dataService.queryTimeSeriesValues(param)); } + + /** + * 区间聚合函数 + * @param param 查询条件 + * @return TD数据库数据 + */ + @PostMapping("/windows") + public R>>> queryWindowsValues(@RequestBody @Valid WindowValueQueryParam param) { + if (log.isDebugEnabled()){ + log.debug("/api/rtdbsvr/timeseries is calling"); + } + return R.success(dataService.queryWindowsValues(param)); + } } diff --git a/das/src/main/java/com/das/modules/data/domain/WindowValueQueryParam.java b/das/src/main/java/com/das/modules/data/domain/WindowValueQueryParam.java new file mode 100644 index 00000000..50c8bef4 --- /dev/null +++ b/das/src/main/java/com/das/modules/data/domain/WindowValueQueryParam.java @@ -0,0 +1,39 @@ +package com.das.modules.data.domain; + +import lombok.Data; + +import java.util.List; + +/** + * 时序数据查询实体 + */ +@Data +public class WindowValueQueryParam +{ + /** + * 开始时间 + */ + private String startTime; + + /** + * 结束时间 + */ + private String endTime; + + /** + * 间隔 + */ + private String interval; + + /** + * 填充模式 + */ + private String fill; + + /** + * 设备属性列表 + */ + private List devices; + + private String calFunction; +} diff --git a/das/src/main/java/com/das/modules/data/service/DataService.java b/das/src/main/java/com/das/modules/data/service/DataService.java index 7628653f..72b94837 100644 --- a/das/src/main/java/com/das/modules/data/service/DataService.java +++ b/das/src/main/java/com/das/modules/data/service/DataService.java @@ -2,6 +2,7 @@ package com.das.modules.data.service; import com.das.modules.data.domain.SnapshotValueQueryParam; import com.das.modules.data.domain.TSValueQueryParam; +import com.das.modules.data.domain.WindowValueQueryParam; import com.das.modules.node.domain.bo.CalculateRTData; import java.util.List; @@ -13,6 +14,8 @@ public interface DataService { Map>> queryTimeSeriesValues(TSValueQueryParam param); + Map>> queryWindowsValues(WindowValueQueryParam param); + void createTdStable(); void updateCalFieldData(List values); diff --git a/das/src/main/java/com/das/modules/data/service/TDEngineService.java b/das/src/main/java/com/das/modules/data/service/TDEngineService.java index 639eb10b..7d85db4e 100644 --- a/das/src/main/java/com/das/modules/data/service/TDEngineService.java +++ b/das/src/main/java/com/das/modules/data/service/TDEngineService.java @@ -506,6 +506,188 @@ public class TDEngineService { return result; } + public Map>> fetchHighWindowsCurve(Long irn, Date startTime, Date endTime, String interval, List fieldList,String calFunction) { + String tbName = String.format("h%d", irn); + Date now = new Date(); + if (endTime.after(now)) { + endTime = now; + } + Map>> result = new HashMap<>(); + Map> valueMap = new HashMap<>(); + for (String item : fieldList) { + Map timeValueMap = new HashMap<>(); + List times = new ArrayList<>(); + List objects = new ArrayList<>(); + timeValueMap.put("times", times); + timeValueMap.put("values", objects); + valueMap.put(item, timeValueMap); + } + StringBuffer sb = new StringBuffer(2048); + if (!(StrUtil.isNotBlank(interval) && interval.equals("NONE"))) { + String intervalStr = convertInterval(interval); + + sb.append("select _WSTART, _WEND, updatetime"); + fieldList.forEach(field -> + sb.append(" ,").append(calFunction).append("(").append(field).append(") ") + ); + sb.append(" from "); + sb.append(tbName); + sb.append(" where "); + sb.append(String.format(" updatetime >= %d and updatetime < %d ", startTime.getTime(), endTime.getTime())); + sb.append(String.format(" INTERVAL(%s)", intervalStr)); + sb.append(String.format(" FILL(%s)", "NONE")); + } + log.debug(sb.toString()); + try (Connection conn = hikariDataSource.getConnection(); + Statement smt = conn.createStatement(); + ResultSet rs = smt.executeQuery(sb.toString())) { + while (rs.next()) { + for (int i = 0; i < fieldList.size(); i++) { + if (valueMap.get(fieldList.get(i)) == null) { + Map map = new HashMap<>(); + List timeList = new ArrayList<>(); + timeList.add(rs.getTimestamp(3).getTime()); + List valueList = new ArrayList<>(); + valueList.add(rs.getObject(fieldList.get(i).toLowerCase())); + map.put("times", timeList); + map.put("values", valueList); + valueMap.put(fieldList.get(i), map); + } else { + Map map = valueMap.get(fieldList.get(i)); + List times = (List) map.get("times"); + List values = (List) map.get("values"); + times.add(rs.getTimestamp(3).getTime()); + values.add(rs.getObject(fieldList.get(i).toLowerCase())); + } + } + } + result.put(irn.toString(), valueMap); + } catch (Exception e) { + log.error("获取数据异常", e); + return result; + } + return result; + } + + public Map>> fetchLowWindowsCurve(Long irn, Date startTime, Date endTime, String interval, List fieldList,String calFunction) { + String tbName = String.format("l%d", irn); + Date now = new Date(); + if (endTime.after(now)) { + endTime = now; + } + Map>> result = new HashMap<>(); + Map> valueMap = new HashMap<>(); + for (String item : fieldList) { + Map timeValueMap = new HashMap<>(); + List times = new ArrayList<>(); + List objects = new ArrayList<>(); + timeValueMap.put("times", times); + timeValueMap.put("values", objects); + valueMap.put(item, timeValueMap); + } + StringBuffer sb = new StringBuffer(2048); + if (!(StrUtil.isNotBlank(interval) && interval.equals("NONE"))) { + String intervalStr = convertInterval(interval); + + sb.append("select _WSTART, _WEND,updatetime"); + fieldList.forEach(field -> + sb.append(" ,").append(calFunction).append("(").append(field).append(") ") + ); + sb.append(" from "); + sb.append(tbName); + sb.append(" where "); + sb.append(String.format(" updatetime >= %d and updatetime < %d ", startTime.getTime(), endTime.getTime())); + sb.append(String.format(" INTERVAL(%s)", intervalStr)); + sb.append(String.format(" FILL(%s)", "NONE")); + } + log.debug(sb.toString()); + try (Connection conn = hikariDataSource.getConnection(); + Statement smt = conn.createStatement(); + ResultSet rs = smt.executeQuery(sb.toString())) { + while (rs.next()) { + for (int i = 0; i < fieldList.size(); i++) { + if (valueMap.get(fieldList.get(i)) == null) { + Map map = new HashMap<>(); + List timeList = new ArrayList<>(); + timeList.add(rs.getTimestamp(3).getTime()); + List valueList = new ArrayList<>(); + valueList.add(rs.getObject(fieldList.get(i).toLowerCase())); + map.put("times", timeList); + map.put("values", valueList); + valueMap.put(fieldList.get(i), map); + } else { + Map map = valueMap.get(fieldList.get(i)); + List times = (List) map.get("times"); + List values = (List) map.get("values"); + times.add(rs.getTimestamp(3).getTime()); + values.add(rs.getObject(fieldList.get(i).toLowerCase())); + } + } + } + result.put(irn.toString(), valueMap); + } catch (Exception e) { + log.error("获取数据异常", e); + return result; + } + return result; + } + + public Map>> fetchCalWindowsCurve(Long irn, Date startTime, Date endTime, String interval, String calFieldCode,String calFunction) { + Date now = new Date(); + if (endTime.after(now)) { + endTime = now; + } + Map>> result = new HashMap<>(); + Map> valueMap = new HashMap<>(); + Map timeValueMap = new HashMap<>(); + List times = new ArrayList<>(); + List objects = new ArrayList<>(); + timeValueMap.put("times", times); + timeValueMap.put("values", objects); + valueMap.put(calFieldCode, timeValueMap); + + StringBuffer sb = new StringBuffer(2048); + if (!(StrUtil.isNotBlank(interval) && interval.equals("NONE"))) { + sb.append("select _WSTART, _WEND,updatetime,"); + sb.append(calFunction).append("(datavalue) as datavalue"); + sb.append(" from c_"); + sb.append(irn).append("_").append(calFieldCode); + sb.append(" where "); + sb.append(String.format(" updatetime >= %d and updatetime < %d ", startTime.getTime(), endTime.getTime())); + sb.append(String.format(" INTERVAL(%s)", interval)); + sb.append(String.format(" FILL(%s)", "NONE")); + } + log.debug(sb.toString()); + try (Connection conn = hikariDataSource.getConnection(); + Statement smt = conn.createStatement(); + ResultSet rs = smt.executeQuery(sb.toString())) { + while (rs.next()) { + if (valueMap.get(calFieldCode) == null) { + Map map = new HashMap<>(); + List timeList = new ArrayList<>(); + timeList.add(rs.getTimestamp(3).getTime()); + List valueList = new ArrayList<>(); + valueList.add(rs.getObject("datavalue")); + map.put("times", timeList); + map.put("values", valueList); + valueMap.put(calFieldCode, map); + } else { + Map map = valueMap.get(calFieldCode); + List timeList = (List) map.get("times"); + List values = (List) map.get("values"); + timeList.add(rs.getTimestamp(3).getTime()); + values.add(rs.getObject("datavalue")); + } + + } + result.put(irn.toString(), valueMap); + } catch (Exception e) { + log.error("获取数据异常", e); + return result; + } + return result; + } + public Map>> fetchLowHistoryCurve(Long irn, Date startTime, Date endTime, String interval, List fieldList) { SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String tbName = String.format("l%d", irn); @@ -614,7 +796,6 @@ public class TDEngineService { sb.append(" order by updatetime"); } else { sb.append("select updatetime, datavalue"); - sb.append(" from "); sb.append(" from c_"); sb.append(irn).append("_").append(calFieldCode); sb.append(" where "); diff --git a/das/src/main/java/com/das/modules/data/service/impl/DataServiceImpl.java b/das/src/main/java/com/das/modules/data/service/impl/DataServiceImpl.java index 43fc2235..92ebb8e2 100644 --- a/das/src/main/java/com/das/modules/data/service/impl/DataServiceImpl.java +++ b/das/src/main/java/com/das/modules/data/service/impl/DataServiceImpl.java @@ -4,10 +4,12 @@ import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.ListUtil; import com.das.common.exceptions.ServiceException; import com.das.common.utils.AdminRedisTemplate; +import com.das.common.utils.StringUtils; import com.das.modules.cache.domain.DeviceInfoCache; import com.das.modules.cache.service.CacheService; import com.das.modules.data.domain.SnapshotValueQueryParam; import com.das.modules.data.domain.TSValueQueryParam; +import com.das.modules.data.domain.WindowValueQueryParam; import com.das.modules.data.service.DataService; import com.das.modules.data.service.TDEngineService; import com.das.modules.equipment.domain.vo.IotModelFieldVo; @@ -151,6 +153,27 @@ public class DataServiceImpl implements DataService { return result; } + @Override + public Map>> queryWindowsValues(WindowValueQueryParam param) { + Long start = System.currentTimeMillis(); + if (CollectionUtil.isEmpty(param.getDevices()) || (param.getStartTime() == null && param.getEndTime() == null && param.getCalFunction() == null)) { + throw new ServiceException("必要参数缺失"); + } + Date startTime = new Date(Long.parseLong(param.getStartTime())); + Date endTime = new Date(Long.parseLong(param.getEndTime())); + String windowType = param.getCalFunction(); + Map>> result = new HashMap<>(param.getDevices().size()); + List deviceFieldList = param.getDevices(); + for (SnapshotValueQueryParam item : deviceFieldList) { + //field分为高频和低频查询 + Map>> values = queryWindowsCurveValues(Long.valueOf(item.getDeviceId()), startTime, endTime, param.getInterval(), param.getFill(), item.getAttributes(),windowType); + result.putAll(values); + } + Long end = System.currentTimeMillis(); + log.debug("读取快照{}个,耗时: {}秒", param.getDevices().size(), (end-start)/ 1000.0); + return result; + } + private Map>> queryHistoryCurveValues(Long irn, Date startTime, Date endTime, String interval, String fill, List attributes) { StopWatch stopWatch = new StopWatch(); stopWatch.start("prepare resources"); @@ -208,6 +231,67 @@ public class DataServiceImpl implements DataService { return result; } + private Map>> queryWindowsCurveValues(Long irn, Date startTime, Date endTime, String interval, String fill, List attributes,String windowType) { + StopWatch stopWatch = new StopWatch(); + stopWatch.start("prepare resources"); + String function = mappingFunction(windowType); + if (StringUtils.isEmpty(function)){ + throw new ServiceException("计算方法参数不正确,请检查参数"); + } + String iotModelCode = sysIotModelFieldMapper.queryModelCodeByDeviceId(irn); + Map highSpeedFieldMap = highIotFieldMap.get(iotModelCode); + Map lowSpeedFieldMap = lowIotFieldMap.get(iotModelCode); + Map calFieldMap = calculateIotFieldMap.get(iotModelCode); + List highSpeedField = new ArrayList<>(); + List lowSpeedField = new ArrayList<>(); + List calField = new ArrayList<>(); + for (String field : attributes) { + if (highSpeedFieldMap.containsKey(field)) { + highSpeedField.add(field); + } + if (lowSpeedFieldMap.containsKey(field)) { + lowSpeedField.add(field); + } + if (calFieldMap.containsKey(field)){ + calField.add(field); + } + } + stopWatch.stop(); + stopWatch.start("HighSpeedValues"); + Map>> result = new HashMap<>(); + if (!CollectionUtils.isEmpty(highSpeedField)) { + Map>> highHistoryCurve = tdEngineService.fetchHighWindowsCurve(irn, startTime, endTime, interval, highSpeedField,function); + result.putAll(highHistoryCurve); + } + stopWatch.stop(); + stopWatch.start("LowSpeedValues"); + if (!CollectionUtils.isEmpty(lowSpeedField)) { + Map>> lowHistoryCurve = tdEngineService.fetchLowWindowsCurve(irn, startTime, endTime, interval, lowSpeedField,function); + if (result.get(irn.toString()) == null) { + result.putAll(lowHistoryCurve); + } else { + result.get(irn.toString()).putAll(lowHistoryCurve.get(irn.toString())); + } + } + stopWatch.stop(); + stopWatch.start("CalculateValues"); + if (!CollectionUtils.isEmpty(calField)){ + ListUtil.page(calField,COMMIT_COUNT,list -> { + for (String item : list){ + Map>> calHistoryCurve = tdEngineService.fetchCalWindowsCurve(irn, startTime, endTime, interval, item,function); + if (result.get(irn.toString()) == null) { + result.putAll(calHistoryCurve); + } else { + result.get(irn.toString()).putAll(calHistoryCurve.get(irn.toString())); + } + } + }); + } + stopWatch.stop(); + log.debug("查询历史数据耗时: {}", stopWatch.prettyPrint()); + return result; + } + @Override public void updateCalFieldData(List calValues) { //更新数据至redis,TD @@ -320,4 +404,13 @@ public class DataServiceImpl implements DataService { } return tdEngineService.getTimeAvgValue(tableName, attr.toLowerCase(), startTime, endTime); } + + private String mappingFunction(String calFunction){ + return switch (calFunction) { + case "average" -> "AVG"; + case "max" -> "MAX"; + case "min" -> "MIN"; + default -> ""; + }; + } }