diff --git a/das/src/main/java/com/das/modules/data/controller/DataController.java b/das/src/main/java/com/das/modules/data/controller/DataController.java new file mode 100644 index 00000000..f312ec6f --- /dev/null +++ b/das/src/main/java/com/das/modules/data/controller/DataController.java @@ -0,0 +1,57 @@ +package com.das.modules.data.controller; + +import com.das.common.result.R; +import com.das.common.utils.JsonUtils; +import com.das.modules.data.domain.SnapshotValueQueryParam; +import com.das.modules.data.domain.TSValueQueryParam; +import com.das.modules.data.service.DataService; +import jakarta.validation.Valid; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; +import java.util.Map; + +/** + * 数据查询controller + */ +@Slf4j +@RequestMapping("/api/data") +@RestController +public class DataController { + + @Autowired + DataService dataService; + + /** + * 实时数据查询 + * @param param 查询数据请求体 + * @return redis数据 + */ + @PostMapping("/snapshot") + public R>> querySnapshotValues(@RequestBody @Valid List param) { + if (log.isDebugEnabled()){ + log.debug("/api/rtdbsvr/snapshot is calling"); + log.debug(JsonUtils.toJsonString(param)); + } + return R.success(dataService.querySnapshotValues(param)); + } + + /** + * 历史区间数据查询 + * @param param + * @return + */ + @PostMapping("/history") + public R>>> queryTimeSeriesValues(@RequestBody @Valid TSValueQueryParam param) { + if (log.isDebugEnabled()){ + log.debug("/api/rtdbsvr/timeseries is calling"); + log.debug(JsonUtils.toJsonString(param)); + } + return R.success(dataService.queryTimeSeriesValues(param)); + } +} diff --git a/das/src/main/java/com/das/modules/data/domain/SnapshotValueQueryParam.java b/das/src/main/java/com/das/modules/data/domain/SnapshotValueQueryParam.java new file mode 100644 index 00000000..c431d698 --- /dev/null +++ b/das/src/main/java/com/das/modules/data/domain/SnapshotValueQueryParam.java @@ -0,0 +1,20 @@ +package com.das.modules.data.domain; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +import java.util.List; + +@JsonIgnoreProperties(ignoreUnknown = true) +@Data +public class SnapshotValueQueryParam +{ + /** + * 测点irn列表 + */ + @NotNull + private String deviceId; + + private List attributes; +} diff --git a/das/src/main/java/com/das/modules/data/domain/TSValueQueryParam.java b/das/src/main/java/com/das/modules/data/domain/TSValueQueryParam.java new file mode 100644 index 00000000..1905bab7 --- /dev/null +++ b/das/src/main/java/com/das/modules/data/domain/TSValueQueryParam.java @@ -0,0 +1,39 @@ +package com.das.modules.data.domain; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + +import java.util.Date; +import java.util.List; + +/** + * 时序数据查询实体 + */ +@Data +public class TSValueQueryParam +{ + /** + * 开始时间 + */ + private String startTime; + + /** + * 结束时间 + */ + private String endTime; + + /** + * 间隔 + */ + private String interval; + + /** + * 填充模式 + */ + private String fill; + + /** + * 设备属性列表 + */ + private List devices; +} diff --git a/das/src/main/java/com/das/modules/data/service/DataService.java b/das/src/main/java/com/das/modules/data/service/DataService.java new file mode 100644 index 00000000..7d1e75e6 --- /dev/null +++ b/das/src/main/java/com/das/modules/data/service/DataService.java @@ -0,0 +1,134 @@ +package com.das.modules.data.service; + +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.collection.ListUtil; +import com.das.common.exceptions.ServiceException; +import com.das.common.utils.AdminRedisTemplate; +import com.das.modules.data.domain.SnapshotValueQueryParam; +import com.das.modules.data.domain.TSValueQueryParam; +import com.das.modules.equipment.entity.SysIotModelField; +import com.das.modules.equipment.mapper.SysIotModelFieldMapper; +import com.das.modules.node.service.TDEngineService; +import com.das.modules.node.service.impl.DataServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import java.util.*; + +@Slf4j +@Service +public class DataService { + + public static final int COMMIT_COUNT = 1000; + + @Autowired + AdminRedisTemplate adminRedisTemplate; + + @Autowired + private SysIotModelFieldMapper sysIotModelFieldMapper; + + @Autowired + private TDEngineService tdEngineService; + + @Autowired + private DataServiceImpl dataService; + + // 读取实时数据快照 + public Map> querySnapshotValues(List paramList) { + long start = System.currentTimeMillis(); + Map> result = new HashMap<>(paramList.size()); + Map> finalResult = result; + ListUtil.page(paramList, COMMIT_COUNT, list -> { + List keyList = new ArrayList<>(); + for (int i = 0; i < list.size(); i++) { + SnapshotValueQueryParam snapshotValueQueryParam = list.get(i); + List attributes = snapshotValueQueryParam.getAttributes(); + if (CollectionUtils.isEmpty(attributes)) { + //为空查全部 + List sysIotModelFields = sysIotModelFieldMapper.queryAllFiledNames(Long.valueOf(snapshotValueQueryParam.getDeviceId())); + for (String item : sysIotModelFields) { + String key = String.format("RT:[%s]:[%s]", snapshotValueQueryParam.getDeviceId(), item); + keyList.add(key); + } + } else { + for (String item : attributes) { + String key = String.format("RT:[%s]:[%s]", snapshotValueQueryParam.getDeviceId(), item); + keyList.add(key); + } + } + } + List dataList = adminRedisTemplate.mGet(keyList); + for (int i = 0; i < keyList.size(); i++) { + String key = keyList.get(i); + int firstColonIndex = key.indexOf('['); + int firstIndex = key.indexOf(']'); + int secondIndex = key.indexOf(']', firstIndex + 1); + int secondColonIndex = key.indexOf('[', firstColonIndex + 1); + String deviceId = key.substring(firstColonIndex + 1, firstIndex); + String fieldName = key.substring(secondColonIndex + 1, secondIndex); + if (finalResult.get(deviceId) == null) { + Map valueMap = new HashMap<>(); + valueMap.put(fieldName, dataList.get(i)); + finalResult.put(deviceId, valueMap); + } else { + finalResult.get(deviceId).put(fieldName, dataList.get(i)); + } + } + }); + long end = System.currentTimeMillis(); + log.debug("读取快照{}个,耗时: {}秒", paramList.size(), (end - start) / 1000.0); + return finalResult; + } + + + public Map>> queryTimeSeriesValues(TSValueQueryParam param) { + if (CollectionUtil.isEmpty(param.getDevices()) || (param.getStartTime() == null && param.getEndTime() == null)) { + throw new ServiceException("必要参数缺失"); + } + Date startTime = new Date(Long.parseLong(param.getStartTime())); + Date endTime = new Date(Long.parseLong(param.getEndTime())); + Map>> result = new HashMap<>(param.getDevices().size()); + List deviceFieldList = param.getDevices(); + for (SnapshotValueQueryParam item : deviceFieldList) { + //field分为高频和低频查询 + Map>> values = queryHistoryCurveValues(Long.valueOf(item.getDeviceId()), startTime, endTime, param.getInterval(), param.getFill(), item.getAttributes()); + result.putAll(values); + } + return result; + } + + private Map>> queryHistoryCurveValues(Long irn, Date startTime, Date endTime, String interval, String fill, List attributes) { + + String iotModelCode = sysIotModelFieldMapper.queryModelCodeByDeviceId(irn); + Map highSpeedFieldMap = dataService.highIotFieldMap.get(iotModelCode); + Map lowSpeedFieldMap = dataService.lowIotFieldMap.get(iotModelCode); + List highSpeedField = new ArrayList<>(); + List lowSpeedField = new ArrayList<>(); + for (String field : attributes) { + if (highSpeedFieldMap.containsKey(field)) { + highSpeedField.add(field); + } + if (lowSpeedFieldMap.containsKey(field)) { + lowSpeedField.add(field); + } + } + Map>> result = new HashMap<>(); + if (!CollectionUtils.isEmpty(highSpeedField)) { + Map>> highHistoryCurve = tdEngineService.fetchHighHistoryCurve(irn, startTime, endTime, interval, highSpeedField); + result.putAll(highHistoryCurve); + } + if (!CollectionUtils.isEmpty(lowSpeedField)) { + Map>> lowHistoryCurve = tdEngineService.fetchLowHistoryCurve(irn, startTime, endTime, interval, lowSpeedField); + if (result.get(irn.toString()) == null) { + result.putAll(lowHistoryCurve); + } else { + result.get(irn.toString()).putAll(lowHistoryCurve.get(irn.toString())); + } + } + return result; + } + + +} diff --git a/das/src/main/java/com/das/modules/equipment/mapper/SysIotModelFieldMapper.java b/das/src/main/java/com/das/modules/equipment/mapper/SysIotModelFieldMapper.java index a0117ee4..9f2da697 100644 --- a/das/src/main/java/com/das/modules/equipment/mapper/SysIotModelFieldMapper.java +++ b/das/src/main/java/com/das/modules/equipment/mapper/SysIotModelFieldMapper.java @@ -9,6 +9,8 @@ import com.das.modules.equipment.entity.SysIotModelField; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; +import java.util.List; + @Mapper public interface SysIotModelFieldMapper extends BaseMapperPlus { @@ -17,4 +19,8 @@ public interface SysIotModelFieldMapper extends BaseMapperPlus queryAllFiledNames(@Param("deviceId") Long deviceId); + + String queryModelCodeByDeviceId(@Param("deviceId") Long deviceId); } diff --git a/das/src/main/java/com/das/modules/node/service/TDEngineService.java b/das/src/main/java/com/das/modules/node/service/TDEngineService.java index 0438de97..6acb3dcf 100644 --- a/das/src/main/java/com/das/modules/node/service/TDEngineService.java +++ b/das/src/main/java/com/das/modules/node/service/TDEngineService.java @@ -1,6 +1,7 @@ package com.das.modules.node.service; import cn.hutool.core.collection.ListUtil; +import cn.hutool.core.util.StrUtil; import com.das.modules.equipment.domain.vo.IotModelFieldVo; import com.das.modules.equipment.mapper.SysIotModelMapper; import com.das.modules.node.domain.bo.RTData; @@ -13,13 +14,12 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.sql.*; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.Date; import java.util.concurrent.ConcurrentHashMap; @Slf4j @@ -131,7 +131,8 @@ public class TDEngineService { sb.setLength(0); sb.append("ALTER STABLE "); sb.append(stableName); - sb.append(" DROP COLUMN");; + sb.append(" DROP COLUMN"); + ; sb.append(fieldCode); sb.append(";"); try { @@ -140,7 +141,7 @@ public class TDEngineService { log.error("删除超级表列失败:{},失败原因{}", sb.toString(), e); } - }catch (Exception ignored){ + } catch (Exception ignored) { } } @@ -148,7 +149,7 @@ public class TDEngineService { /** * 删除超级表 */ - public void deleteStable(String stableName){ + public void deleteStable(String stableName) { try (Connection conn = hikariDataSource.getConnection(); Statement pstmt = conn.createStatement()) { StringBuilder sb = new StringBuilder(1024 * 1024); @@ -162,7 +163,7 @@ public class TDEngineService { log.error("删除超级表失败:{},失败原因{}", sb.toString(), e); } - }catch (Exception ignored){ + } catch (Exception ignored) { } } @@ -191,7 +192,7 @@ public class TDEngineService { for (String key : map.keySet()) { sb.append(", "); sb.append(key); - sb.append(" "+map.get(key)); + sb.append(" " + map.get(key)); } sb.append(") TAGS (`deviceid` BIGINT);"); try { @@ -215,7 +216,7 @@ public class TDEngineService { for (String key : map.keySet()) { sb.append(", "); sb.append(key); - sb.append(" "+map.get(key)); + sb.append(" " + map.get(key)); } sb.append(") TAGS (`deviceid` BIGINT);"); try { @@ -342,6 +343,153 @@ public class TDEngineService { } } + public Map>> fetchHighHistoryCurve(Long irn, Date startTime, Date endTime, String interval, List fieldList) { + SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + String tbName = String.format("h%d", irn); + Date now = new Date(); + if (endTime.after(now)) { + endTime = now; + } + Map>> result = new HashMap<>(); + Map> valueMap = new HashMap<>(); + StringBuffer sb = new StringBuffer(2048); + if (StrUtil.isNotBlank(interval)) { + String startTimeStr = SIMPLE_DATE_FORMAT.format(startTime); + String endTimeStr = SIMPLE_DATE_FORMAT.format(endTime); + String timeStr = String.format("'%s','%s'", startTimeStr, endTimeStr); + String intervalStr = convertInterval(interval); + + sb.append("select _irowts updatetime"); + fieldList.forEach(field -> + sb.append(" ,").append("interp(").append(field).append(") ").append(field) + ); + sb.append(" from "); + sb.append(tbName); + sb.append(String.format(" range(%s)", timeStr)); + sb.append(String.format(" every(%s)", intervalStr)); + sb.append(String.format(" FILL(%s)", "PREV")); + sb.append(" order by updatetime"); + } else { + sb.append("select updatetime, datavalue from "); + fieldList.forEach(field -> + sb.append(", ").append(field) + ); + sb.append(" from "); + sb.append(tbName); + sb.append(" where "); + sb.append(String.format(" updatetime >= %d and updatetime < %d ", startTime.getTime(), endTime.getTime())); + sb.append(" order by updatetime"); + } + log.debug(sb.toString()); + try (Connection conn = hikariDataSource.getConnection(); + Statement smt = conn.createStatement(); + ResultSet rs = smt.executeQuery(sb.toString())) { + while (rs.next()) { + for (int i = 0; i < fieldList.size(); i++){ + if (valueMap.get(fieldList.get(i)) == null){ + Map map = new HashMap<>(); + List timeList = new ArrayList<>(); + timeList.add(rs.getTimestamp(1).getTime()); + List valueList = new ArrayList<>(); + valueList.add(rs.getObject(fieldList.get(i))); + map.put("times",timeList); + map.put("values",valueList); + valueMap.put(fieldList.get(i),map); + }else { + Map map = valueMap.get(fieldList.get(i)); + List times = (List) map.get("times"); + List values = (List) map.get("values"); + times.add(rs.getTimestamp(1).getTime()); + values.add(rs.getObject(fieldList.get(i))); + } + } + } + result.put(irn.toString(),valueMap); + } catch (Exception e) { + log.error("获取数据异常", e); + return result; + } + return result; + } + + public Map>> fetchLowHistoryCurve(Long irn, Date startTime, Date endTime, String interval, List fieldList) { + SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + String tbName = String.format("l%d", irn); + Date now = new Date(); + if (endTime.after(now)) { + endTime = now; + } + Map>> result = new HashMap<>(); + Map> valueMap = new HashMap<>(); + StringBuffer sb = new StringBuffer(2048); + if (StrUtil.isNotBlank(interval)) { + String startTimeStr = SIMPLE_DATE_FORMAT.format(startTime); + String endTimeStr = SIMPLE_DATE_FORMAT.format(endTime); + String timeStr = String.format("'%s','%s'", startTimeStr, endTimeStr); + String intervalStr = convertInterval(interval); + + sb.append("select _irowts updatetime"); + fieldList.forEach(field -> + sb.append(" ,").append("interp(").append(field).append(") ").append(field) + ); + sb.append(" from "); + sb.append(tbName); + sb.append(String.format(" range(%s)", timeStr)); + sb.append(String.format(" every(%s)", intervalStr)); + sb.append(String.format(" FILL(%s)", "PREV")); + sb.append(" order by updatetime"); + } else { + sb.append("select updatetime, datavalue from "); + fieldList.forEach(field -> + sb.append(", ").append(field) + ); + sb.append(" from "); + sb.append(tbName); + sb.append(" where "); + sb.append(String.format(" updatetime >= %d and updatetime < %d ", startTime.getTime(), endTime.getTime())); + sb.append(" order by updatetime"); + } + log.debug(sb.toString()); + try (Connection conn = hikariDataSource.getConnection(); + Statement smt = conn.createStatement(); + ResultSet rs = smt.executeQuery(sb.toString())) { + while (rs.next()) { + for (int i = 0; i < fieldList.size(); i++){ + if (valueMap.get(fieldList.get(i)) == null){ + Map map = new HashMap<>(); + List timeList = new ArrayList<>(); + timeList.add(rs.getTimestamp(1).getTime()); + List valueList = new ArrayList<>(); + valueList.add(rs.getObject(fieldList.get(i))); + map.put("times",timeList); + map.put("values",valueList); + valueMap.put(fieldList.get(i),map); + }else { + Map map = valueMap.get(fieldList.get(i)); + List times = (List) map.get("times"); + List values = (List) map.get("values"); + times.add(rs.getTimestamp(1).getTime()); + values.add(rs.getObject(fieldList.get(i))); + } + } + } + result.put(irn.toString(),valueMap); + } catch (Exception e) { + log.error("获取数据异常", e); + return result; + } + return result; + } + + private String convertInterval(String interval) { + if (!StringUtils.hasText(interval)) { + interval = "1m"; + } + return interval; + } + @PreDestroy public void free() { diff --git a/das/src/main/resources/mapper/SysIotModelFieldMapper.xml b/das/src/main/resources/mapper/SysIotModelFieldMapper.xml index 743f11d4..4f92fa5d 100644 --- a/das/src/main/resources/mapper/SysIotModelFieldMapper.xml +++ b/das/src/main/resources/mapper/SysIotModelFieldMapper.xml @@ -40,6 +40,16 @@ + + diff --git a/ui/dasadmin/src/views/backend/auth/menu/index.vue b/ui/dasadmin/src/views/backend/auth/menu/index.vue index 8623a39a..69693364 100644 --- a/ui/dasadmin/src/views/backend/auth/menu/index.vue +++ b/ui/dasadmin/src/views/backend/auth/menu/index.vue @@ -89,7 +89,7 @@ @@ -139,7 +139,7 @@ @@ -150,7 +150,7 @@ diff --git a/ui/dasadmin/src/views/backend/auth/model/index.vue b/ui/dasadmin/src/views/backend/auth/model/index.vue index 4aa99dfc..b80854cf 100644 --- a/ui/dasadmin/src/views/backend/auth/model/index.vue +++ b/ui/dasadmin/src/views/backend/auth/model/index.vue @@ -32,8 +32,8 @@ @@ -245,8 +245,8 @@ @@ -268,8 +268,8 @@ @@ -528,9 +528,16 @@ const changeTabs = (name: any) => { getServiceList() } } -const sortData = reactive<{ orderColumn: string | undefined; orderType: 'desc' | 'asc' | undefined }>({ - orderColumn: undefined, - orderType: undefined, +const sortData = reactive<{ + attributeOrderColumn: string | undefined + attributeOrderType: 'desc' | 'asc' | undefined + serviceOrderColumn: string | undefined + serviceOrderType: 'desc' | 'asc' | undefined +}>({ + attributeOrderColumn: undefined, + attributeOrderType: undefined, + serviceOrderColumn: undefined, + serviceOrderType: undefined, }) const sortChange = ({ prop, @@ -548,12 +555,15 @@ const sortChange = ({ serviceName: 'service_name', serviceTypeName: 'service_type', } - const orderType = order === 'ascending' ? 'asc' : order === 'descending' ? 'desc' : undefined - const filed = propEnums[prop as keyof typeof propEnums] - sortData.orderColumn = orderType ? filed : undefined - sortData.orderType = orderType + if (ModelTabs.value === 'attribute') { + sortData.attributeOrderColumn = orderType ? filed : undefined + sortData.attributeOrderType = orderType + } else if (ModelTabs.value === 'service') { + sortData.serviceOrderColumn = orderType ? filed : undefined + sortData.serviceOrderType = orderType + } if (ModelTabs.value === 'attribute') { getAttributeList() @@ -586,20 +596,16 @@ const delAttributeForm = (data: AddModelAttributeType & UpdateModelAttributeType const getAttributeList = ({ type, value, - order, - orderFiled, }: { type?: radioGroupType value?: string - order?: 'asc' | 'desc' | undefined - orderFiled?: keyof typeof ModelAttributeFieldsEnums } = {}) => { const requestData: GetModelAttributeType = { iotModelId: curContextMenuTreeData.value!.id!, pageNum: currentPage.value, pageSize: currentPageSize.value, - orderColumn: sortData.orderColumn, - orderType: sortData.orderType, + orderColumn: sortData.attributeOrderColumn, + orderType: sortData.attributeOrderType, } if (type === 'Name') { requestData.attributeName = value @@ -650,8 +656,8 @@ const getServiceList = ({ iotModelId: curContextMenuTreeData.value!.id!, pageNum: currentPage.value, pageSize: currentPageSize.value, - orderColumn: sortData.orderColumn, - orderType: sortData.orderType, + orderColumn: sortData.serviceOrderColumn, + orderType: sortData.serviceOrderType, } if (type === 'Name') { requestData.serviceName = value diff --git a/ui/dasadmin/src/views/backend/auth/org/index.vue b/ui/dasadmin/src/views/backend/auth/org/index.vue index 5d464af1..b4f947bb 100644 --- a/ui/dasadmin/src/views/backend/auth/org/index.vue +++ b/ui/dasadmin/src/views/backend/auth/org/index.vue @@ -51,8 +51,8 @@ diff --git a/ui/dasadmin/src/views/backend/auth/role/index.vue b/ui/dasadmin/src/views/backend/auth/role/index.vue index 56697eca..c9442f74 100644 --- a/ui/dasadmin/src/views/backend/auth/role/index.vue +++ b/ui/dasadmin/src/views/backend/auth/role/index.vue @@ -23,8 +23,8 @@ diff --git a/ui/dasadmin/src/views/backend/auth/user/index.vue b/ui/dasadmin/src/views/backend/auth/user/index.vue index dca5b392..c26c7d44 100644 --- a/ui/dasadmin/src/views/backend/auth/user/index.vue +++ b/ui/dasadmin/src/views/backend/auth/user/index.vue @@ -95,7 +95,7 @@ @@ -144,7 +144,7 @@ @@ -155,7 +155,7 @@ @@ -179,8 +179,8 @@ diff --git a/ui/dasadmin/src/views/backend/equipment/equipmentManagement/index.vue b/ui/dasadmin/src/views/backend/equipment/equipmentManagement/index.vue index 4d7e2f15..cba1ed83 100644 --- a/ui/dasadmin/src/views/backend/equipment/equipmentManagement/index.vue +++ b/ui/dasadmin/src/views/backend/equipment/equipmentManagement/index.vue @@ -231,8 +231,8 @@ @@ -402,8 +402,8 @@ @@ -413,7 +413,7 @@ diff --git a/ui/dasadmin/src/views/backend/node/index.vue b/ui/dasadmin/src/views/backend/node/index.vue index f3e03f8b..bd6211cb 100644 --- a/ui/dasadmin/src/views/backend/node/index.vue +++ b/ui/dasadmin/src/views/backend/node/index.vue @@ -20,8 +20,8 @@ @@ -40,8 +40,8 @@