diff --git a/das/pom.xml b/das/pom.xml index 75a72cb3..e76654c8 100644 --- a/das/pom.xml +++ b/das/pom.xml @@ -28,6 +28,7 @@ 4.8.6 2.3.14.Final 5.3.0 + 3.2.10 @@ -92,6 +93,11 @@ ${hutool.version} + + com.taosdata.jdbc + taos-jdbcdriver + ${taosdata.verson} + diff --git a/das/src/main/java/com/das/DasApplication.java b/das/src/main/java/com/das/DasApplication.java index fadd7792..3440e850 100644 --- a/das/src/main/java/com/das/DasApplication.java +++ b/das/src/main/java/com/das/DasApplication.java @@ -1,5 +1,8 @@ package com.das; +import com.das.modules.node.service.TDEngineService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @@ -10,10 +13,17 @@ import org.springframework.scheduling.annotation.EnableScheduling; */ @EnableScheduling @SpringBootApplication -public class DasApplication { - +public class DasApplication implements CommandLineRunner { + @Autowired + TDEngineService tDEngineService; public static void main(String[] args) { SpringApplication.run(DasApplication.class, args); } + @Override + public void run(String... args) { + tDEngineService.init(); + //数据服务初始化 + tDEngineService.initIotModel(); + } } diff --git a/das/src/main/java/com/das/modules/equipment/mapper/SysIotModelMapper.java b/das/src/main/java/com/das/modules/equipment/mapper/SysIotModelMapper.java index 8f7d0506..6368dc0e 100644 --- a/das/src/main/java/com/das/modules/equipment/mapper/SysIotModelMapper.java +++ b/das/src/main/java/com/das/modules/equipment/mapper/SysIotModelMapper.java @@ -25,4 +25,8 @@ public interface SysIotModelMapper extends BaseMapper { String getIotModelServiceCode(Integer objectType); + List getAllIotModel(); + + String getIotModel(Long id); + } diff --git a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java index c1e8e4c9..615cd9ba 100644 --- a/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/AnalogDataCommand.java @@ -2,27 +2,56 @@ package com.das.modules.node.command; import cn.hutool.core.collection.ListUtil; import com.das.common.utils.AdminRedisTemplate; +import com.das.modules.equipment.entity.SysIotModel; +import com.das.modules.equipment.mapper.SysIotModelMapper; +import com.das.modules.node.constant.NodeConstant; +import com.das.modules.node.domain.bo.RTData; import com.das.modules.node.domain.bo.TerminalMessage; +import com.das.modules.node.service.TDEngineService; import com.fasterxml.jackson.databind.JsonNode; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +@Service(value = NodeConstant.ANALOG_DATA) +@Slf4j public class AnalogDataCommand implements BaseCommand{ - public static final String YC_KEY_NAME = "RDB:YC:VALUES"; @Autowired AdminRedisTemplate adminRedisTemplate; + + @Autowired + SysIotModelMapper sysIotModelMapper; + + @Autowired + TDEngineService tdEngineService; @Override public void doCommand(TerminalMessage data) { JsonNode jsonNode = data.getData(); - Map values = Map.of( - "deviceId", jsonNode.get("deviceId").asLong(), - "dataTime", data.getTime(), - "values", ListUtil.toList(jsonNode.get("value").asText()) - ); - adminRedisTemplate.set(YC_KEY_NAME, values); + String deviceId = jsonNode.get("deviceId").asText(); + + Long dataTime = jsonNode.get("deviceId").asLong(); + adminRedisTemplate.set(deviceId, ListUtil.toList(jsonNode.get("values").asText())); // 存入td库 + // 根据设备ID获取对应的物模型属性 + String iotModelCode= sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong()); + + ArrayList values = ListUtil.toList(jsonNode.get("values").asText()); + + List list = new ArrayList<>(); + + RTData rtData = RTData.builder() + .dataTime(dataTime) + .deviceId(Long.valueOf(deviceId)) + .values(values) + .build(); + + list.add(rtData); + tdEngineService.updateYCValues(list, iotModelCode); } } diff --git a/das/src/main/java/com/das/modules/node/command/HistoryStateDataCommand.java b/das/src/main/java/com/das/modules/node/command/HistoryStateDataCommand.java deleted file mode 100644 index f058e1f3..00000000 --- a/das/src/main/java/com/das/modules/node/command/HistoryStateDataCommand.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.das.modules.node.command; - -import com.das.modules.node.domain.bo.TerminalMessage; - -public class HistoryStateDataCommand implements BaseCommand{ - @Override - public void doCommand(TerminalMessage data) { - // 更新td数据库 - } -} diff --git a/das/src/main/java/com/das/modules/node/command/InitDeviceDataCommand.java b/das/src/main/java/com/das/modules/node/command/InitDeviceDataCommand.java index 5f23f25f..3cdde656 100644 --- a/das/src/main/java/com/das/modules/node/command/InitDeviceDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/InitDeviceDataCommand.java @@ -1,10 +1,30 @@ package com.das.modules.node.command; +import cn.hutool.core.collection.ListUtil; +import com.das.common.utils.AdminRedisTemplate; +import com.das.modules.node.constant.NodeConstant; import com.das.modules.node.domain.bo.TerminalMessage; +import com.fasterxml.jackson.databind.JsonNode; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +@Service(value = NodeConstant.INIT_DEVICE_DATA) +@Slf4j public class InitDeviceDataCommand implements BaseCommand{ + + @Autowired + AdminRedisTemplate adminRedisTemplate; @Override public void doCommand(TerminalMessage data) { + log.info("收到设备初始化数据"); + // 存入redis + JsonNode dataInfo = data.getData(); + if (!dataInfo.isEmpty()) { + String deviceId = dataInfo.get("deviceId").asText(); + // 存入redis + adminRedisTemplate.set(deviceId, ListUtil.toList(dataInfo.get("values").asText())); + } } } diff --git a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java index e83b2a20..e3ca82d0 100644 --- a/das/src/main/java/com/das/modules/node/command/StateDataCommand.java +++ b/das/src/main/java/com/das/modules/node/command/StateDataCommand.java @@ -2,26 +2,51 @@ package com.das.modules.node.command; import cn.hutool.core.collection.ListUtil; import com.das.common.utils.AdminRedisTemplate; +import com.das.modules.equipment.mapper.SysIotModelMapper; +import com.das.modules.node.constant.NodeConstant; +import com.das.modules.node.domain.bo.RTData; import com.das.modules.node.domain.bo.TerminalMessage; +import com.das.modules.node.service.TDEngineService; import com.fasterxml.jackson.databind.JsonNode; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +@Service(value = NodeConstant.STATE_DATA) +@Slf4j public class StateDataCommand implements BaseCommand{ - public static final String YM_KEY_NAME = "RDB:YM:VALUES"; + @Autowired + TDEngineService tdEngineService; @Autowired AdminRedisTemplate adminRedisTemplate; + + @Autowired + SysIotModelMapper sysIotModelMapper; @Override public void doCommand(TerminalMessage data) { JsonNode jsonNode = data.getData(); - Map values = Map.of( - "deviceId", jsonNode.get("deviceId").asLong(), - "dataTime", data.getTime(), - "values", ListUtil.toList(jsonNode.get("value").asText()) - ); - adminRedisTemplate.set(YM_KEY_NAME, values); + String deviceId = jsonNode.get("deviceId").asText(); + adminRedisTemplate.set(deviceId, ListUtil.toList(jsonNode.get("values").asText())); + String iotModelCode= sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong()); + Long dataTime = jsonNode.get("deviceId").asLong(); + // 存入td库 + ArrayList values = ListUtil.toList(jsonNode.get("values").asText()); + + List list = new ArrayList<>(); + + RTData rtData = RTData.builder() + .dataTime(dataTime) + .deviceId(Long.valueOf(deviceId)) + .values(values) + .build(); + + list.add(rtData); + tdEngineService.updateStataValues(list, iotModelCode); } } diff --git a/das/src/main/java/com/das/modules/node/constant/NodeConstant.java b/das/src/main/java/com/das/modules/node/constant/NodeConstant.java index 964b5c03..9e1f93cf 100644 --- a/das/src/main/java/com/das/modules/node/constant/NodeConstant.java +++ b/das/src/main/java/com/das/modules/node/constant/NodeConstant.java @@ -9,4 +9,12 @@ public interface NodeConstant { String LAST_PONG_TIME = "LastPongTime"; String HEARTBEAT = "heartbeat"; + + String DEVICE_CONTROL = "deviceControl"; + + String INIT_DEVICE_DATA = "initDeviceData"; + + String ANALOG_DATA = "analogData"; + + String STATE_DATA = "stateData"; } diff --git a/das/src/main/java/com/das/modules/node/controller/SysNodeController.java b/das/src/main/java/com/das/modules/node/controller/SysNodeController.java index 4c1268c2..fabc90b3 100644 --- a/das/src/main/java/com/das/modules/node/controller/SysNodeController.java +++ b/das/src/main/java/com/das/modules/node/controller/SysNodeController.java @@ -8,7 +8,9 @@ import com.das.common.result.R; import com.das.common.utils.PageDataInfo; import com.das.modules.node.domain.dto.*; import com.das.modules.node.domain.vo.*; +import com.das.modules.node.service.DataService; import com.das.modules.node.service.SysNodeService; +import com.fasterxml.jackson.databind.JsonNode; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; import org.springframework.beans.factory.annotation.Autowired; @@ -34,6 +36,10 @@ public class SysNodeController { @Autowired private SysNodeService sysNodeService; + @Autowired + private DataService dataService; + + /** 获取节点列表 */ @PostMapping("/list") @@ -84,8 +90,9 @@ public class SysNodeController { /** 配置下发 */ @PostMapping("/configUpdate") - public R configUpdate() { - return R.success(); + public R configUpdate() { + JsonNode configUpdateData = dataService.getConfigUpdateInfo(Long.valueOf(1)); + return R.success(configUpdateData); } @@ -229,5 +236,11 @@ public class SysNodeController { } return R.success("导入失败"); } + + /** 遥控遥调 */ + @PostMapping("/link/deviceControl") + public void deviceControl(@RequestBody DeviceControlDto device) { + sysNodeService.deviceControl(device); + } } diff --git a/das/src/main/java/com/das/modules/node/domain/bo/RTData.java b/das/src/main/java/com/das/modules/node/domain/bo/RTData.java new file mode 100644 index 00000000..4d6e8b26 --- /dev/null +++ b/das/src/main/java/com/das/modules/node/domain/bo/RTData.java @@ -0,0 +1,19 @@ +package com.das.modules.node.domain.bo; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class RTData { + @JsonSerialize(using = ToStringSerializer.class) + private Long deviceId; + private Long dataTime; + private Object values; +} diff --git a/das/src/main/java/com/das/modules/node/domain/dto/DeviceControlDto.java b/das/src/main/java/com/das/modules/node/domain/dto/DeviceControlDto.java new file mode 100644 index 00000000..1eb7df2f --- /dev/null +++ b/das/src/main/java/com/das/modules/node/domain/dto/DeviceControlDto.java @@ -0,0 +1,20 @@ +package com.das.modules.node.domain.dto; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; +import lombok.Data; + +@Data +public class DeviceControlDto { + + + @JsonSerialize(using = ToStringSerializer.class) + private Long nodeId; + + @JsonSerialize(using = ToStringSerializer.class) + private Long deviceId; + + private String serviceName; + + private Integer opValue; +} diff --git a/das/src/main/java/com/das/modules/node/domain/vo/SysCommunicationLinkVo.java b/das/src/main/java/com/das/modules/node/domain/vo/SysCommunicationLinkVo.java index eba34c4e..71eb226d 100644 --- a/das/src/main/java/com/das/modules/node/domain/vo/SysCommunicationLinkVo.java +++ b/das/src/main/java/com/das/modules/node/domain/vo/SysCommunicationLinkVo.java @@ -30,7 +30,7 @@ public class SysCommunicationLinkVo { /** * 协议参数 */ - private Object params; + private String params; /** * 所属系统节点 diff --git a/das/src/main/java/com/das/modules/node/service/SysNodeService.java b/das/src/main/java/com/das/modules/node/service/SysNodeService.java index 07cc1a70..bc0c4d9e 100644 --- a/das/src/main/java/com/das/modules/node/service/SysNodeService.java +++ b/das/src/main/java/com/das/modules/node/service/SysNodeService.java @@ -42,4 +42,6 @@ public interface SysNodeService { void exportMappingList(SysImptabmappingDto sysImptabmappingDto,HttpServletRequest request, HttpServletResponse response); boolean importMappingList(String linkId, MultipartFile file); + + void deviceControl(DeviceControlDto device); } diff --git a/das/src/main/java/com/das/modules/node/service/TDEngineService.java b/das/src/main/java/com/das/modules/node/service/TDEngineService.java new file mode 100644 index 00000000..e1c7993f --- /dev/null +++ b/das/src/main/java/com/das/modules/node/service/TDEngineService.java @@ -0,0 +1,171 @@ +package com.das.modules.node.service; + +import cn.hutool.core.collection.ListUtil; +import com.das.modules.equipment.domain.vo.SysIotModelVo; +import com.das.modules.equipment.mapper.SysIotModelMapper; +import com.das.modules.equipment.service.SysIotModelService; +import com.das.modules.node.domain.bo.RTData; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.stereotype.Service; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +@Service +@EnableAsync +public class TDEngineService { + + + private HikariDataSource hikariDataSource; + + @Autowired + private SysIotModelMapper sysIotModelMapper; + + @Value("${tdengine.url}") + private String url; + + @Value("${tdengine.username}") + private String username; + + @Value("${tdengine.password}") + private String password; + + @Value("${tdengine.batch-size:10000}") + private int batchSize; + + private ConcurrentHashMap iotModelMap = new ConcurrentHashMap<>(10000); + public void init() { + if (hikariDataSource == null) { + HikariConfig config = new HikariConfig(); + config.setJdbcUrl(url); + config.setUsername(username); + config.setPassword(password); + config.setConnectionTestQuery("select server_status()"); + config.setMinimumIdle(10); //minimum number of idle connection + config.setMaximumPoolSize(20); //maximum number of connection in the pool + config.setMaxLifetime(0); // maximum life time for each connection +// config.setIdleTimeout(0); // max idle time for recycle idle connection + log.info("=======>TDEngineUrl:" + config.getJdbcUrl()); + hikariDataSource = new HikariDataSource(config); + } + } + + // 遍历所有的物模型存入内存中 + public void initIotModel() { + //查询所有的物模型 + List allIotModel = sysIotModelMapper.getAllIotModel(); + for (SysIotModelVo item : allIotModel) { + String key = String.valueOf(item.getId()); + iotModelMap.put(key, item.getIotModelCode()); + } + // 创建物模型超级表 + StringBuilder sb = new StringBuilder(1024*1024); + try (Connection conn = hikariDataSource.getConnection(); + Statement pstmt = conn.createStatement()) { + + ListUtil.page(allIotModel, batchSize, (list)->{ + sb.setLength(0); + for (SysIotModelVo info : list) { + sb.append("CREATE STABLE IF NOT EXISTS "); + sb.append(info.getIotModelCode()); + sb.append(" (`updatetime` TIMESTAMP, `datavalue` DOUBLE,`attributecode` VARCHAR(100)) TAGS (`sourcetype` BIGINT); "); + + } + try { + pstmt.executeUpdate(sb.toString()); + } catch (SQLException ex) { + log.error("save yx error", ex); + } + + }); + } catch (SQLException ex) { + log.error(ex.getMessage()); + } + + } + + @Async + public void updateYCValues(List values, String iotModelCode) { + StringBuilder sb = new StringBuilder(1024*1024); + try (Connection conn = hikariDataSource.getConnection(); + Statement pstmt = conn.createStatement()) { + ListUtil.page(values, batchSize, (list)->{ + sb.setLength(0); + sb.append("insert into "); + for (RTData dv : list) { + sb.append(iotModelCode); + sb.append(dv.getDeviceId()); + sb.append(" using analog tags ("); + sb.append(dv.getDeviceId()); + sb.append(","); + sb.append(1L); + sb.append(") values ("); + sb.append(dv.getDataTime()); + sb.append(","); + sb.append(dv.getValues()); + sb.append(")"); + } + try { + pstmt.executeUpdate(sb.toString()); + } catch (SQLException ex) { + log.error("save yc error", ex); + } + }); + } catch (SQLException ex) { + log.error(ex.getMessage()); + } + } + + @Async + public void updateStataValues(List values, String iotModelCode) { + StringBuilder sb = new StringBuilder(1024*1024); + try (Connection conn = hikariDataSource.getConnection(); + Statement pstmt = conn.createStatement()) { + ListUtil.page(values, batchSize, (list)->{ + sb.setLength(0); + sb.append("insert into "); + for (RTData dv : list) { + sb.append(iotModelCode); + sb.append(dv.getDeviceId()); + sb.append(" using analog tags ("); + sb.append(dv.getDeviceId()); + sb.append(","); + sb.append(1L); + sb.append(") values ("); + sb.append(dv.getDataTime()); + sb.append(","); + sb.append(dv.getValues()); + sb.append(")"); + } + try { + pstmt.executeUpdate(sb.toString()); + } catch (SQLException ex) { + log.error("save yc error", ex); + } + }); + } catch (SQLException ex) { + log.error(ex.getMessage()); + } + } + + + @PreDestroy + public void free() { + if (hikariDataSource != null) { + log.info("释放TDEngine资源"); + hikariDataSource.close(); + } + } +} diff --git a/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java b/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java index 154f3a0c..2f2ef4ea 100644 --- a/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java +++ b/das/src/main/java/com/das/modules/node/service/impl/DataServiceImpl.java @@ -9,6 +9,7 @@ import com.das.modules.node.mapper.SysImptabmappingMapper; import com.das.modules.node.service.DataService; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import jakarta.annotation.PostConstruct; @@ -20,6 +21,7 @@ import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -52,31 +54,28 @@ public class DataServiceImpl implements DataService { } - @PreDestroy public void destroy() { - if (ringBuffer != null){ + if (ringBuffer != null) { ringBuffer = null; } - if (disruptor != null){ + if (disruptor != null) { disruptor.shutdown(); } } @Override public void pushMessage(TerminalMessage msg) { - if (ringBuffer == null){ + if (ringBuffer == null) { return; } long seq = ringBuffer.next(); - try{ + try { TerminalMessage terminalMessage = ringBuffer.get(seq); terminalMessage.from(msg); - } - catch (Exception e){ - log.error("发送消息失败",e); - } - finally { + } catch (Exception e) { + log.error("发送消息失败", e); + } finally { ringBuffer.publish(seq); } @@ -88,54 +87,67 @@ public class DataServiceImpl implements DataService { List links = new ArrayList<>(); List equipments = new ArrayList<>(); List equipmentList = new ArrayList<>(); - // 获取所有的链路信息 - List sysCommunicationLinkVoList = sysCommunicationLinkMapper.querySysCommunicationLink(nodeId); - for (SysCommunicationLinkVo sysCommunicationLinkVo : sysCommunicationLinkVoList) { - LinkVo linkVo = new LinkVo(); - linkVo.setLinkId(sysCommunicationLinkVo.getId()); - linkVo.setLinkName(sysCommunicationLinkVo.getLinkName()); - linkVo.setParams(sysCommunicationLinkVo.getParams()); - linkVo.setProtocol(sysCommunicationLinkVo.getProtocol()); - List stringList = new ArrayList<>(); - // 获取关联的设备Id - equipmentList = sysImptabmappingMapper.getEquipmentId(sysCommunicationLinkVo.getId()); - for (Long equipmentId : equipmentList) { - stringList.add(String.valueOf(equipmentId)); - } - String[] stringArray = stringList.toArray(new String[0]); - linkVo.setDevices(stringArray); - links.add(linkVo); - } - for (Long equipmentId : equipmentList) { - List newIotModelFieldList = new ArrayList<>(); - List newIotModelServiceList = new ArrayList<>(); - // 获取设备IOT地址 - String iotAddr = sysImptabmappingMapper.getIotAddrByEquipmentId(equipmentId); - // 根据设备Id获取对应的物模型属性和动作 - List iotModelFieldList = sysImptabmappingMapper.getIotModelFieldByEquipmentId(equipmentId); - if (!CollectionUtils.isEmpty(iotModelFieldList)) { - for (IotModelVo info : iotModelFieldList) { - if(info.getParams() == null){ -// info.setParams(); + try { + // 获取所有的链路信息 + List sysCommunicationLinkVoList = sysCommunicationLinkMapper.querySysCommunicationLink(nodeId); + for (SysCommunicationLinkVo sysCommunicationLinkVo : sysCommunicationLinkVoList) { + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode jsonNode = objectMapper.readTree(sysCommunicationLinkVo.getParams()); + LinkVo linkVo = new LinkVo(); + linkVo.setLinkId(sysCommunicationLinkVo.getId()); + linkVo.setLinkName(sysCommunicationLinkVo.getLinkName()); + linkVo.setParams(jsonNode); + linkVo.setProtocol(sysCommunicationLinkVo.getProtocol()); + List stringList = new ArrayList<>(); + // 获取关联的设备Id + equipmentList = sysImptabmappingMapper.getEquipmentId(sysCommunicationLinkVo.getId()); + for (Long equipmentId : equipmentList) { + stringList.add(String.valueOf(equipmentId)); + } + String[] stringArray = stringList.toArray(new String[0]); + linkVo.setDevices(stringArray); + links.add(linkVo); + for (Long equipmentId : equipmentList) { + HashMap map = new HashMap<>(); + ObjectMapper equipObjectMapper = new ObjectMapper(); + ObjectNode equipJsonNode = equipObjectMapper.convertValue(map, ObjectNode.class); + List newIotModelFieldList = new ArrayList<>(); + List newIotModelServiceList = new ArrayList<>(); + // 获取设备IOT地址 + String iotAddr = sysImptabmappingMapper.getIotAddrByEquipmentId(equipmentId); + // 根据设备Id获取对应的物模型属性和动作 + List iotModelFieldList = sysImptabmappingMapper.getIotModelFieldByEquipmentId(equipmentId); + if (!CollectionUtils.isEmpty(iotModelFieldList)) { + for (IotModelVo info : iotModelFieldList) { + if (info.getParams() == null) { + info.setParams(equipJsonNode); + } else { + info.setParams(equipObjectMapper.readTree(info.getParams().toString())); + } + newIotModelFieldList.add(info); + } } - newIotModelFieldList.add(info); + List iotModelServiceList = sysImptabmappingMapper.getIotModelServiceByEquipmentId(equipmentId); + if (!CollectionUtils.isEmpty(iotModelServiceList)) { + for (IotModelVo info : iotModelServiceList) { + if (info.getParams() == null) { + info.setParams(equipJsonNode); + }else { + info.setParams(equipObjectMapper.readTree(info.getParams().toString())); + } + newIotModelServiceList.add(info); + } + } + EquipmentVo equipment = new EquipmentVo(); + equipment.setAddr(iotAddr); + equipment.setId(equipmentId); + equipment.setAttrs(newIotModelFieldList); + equipment.setServices(newIotModelServiceList); + equipments.add(equipment); } } - List iotModelServiceList = sysImptabmappingMapper.getIotModelServiceByEquipmentId(equipmentId); - if (!CollectionUtils.isEmpty(iotModelServiceList)) { - for (IotModelVo info : iotModelServiceList) { - if(info.getParams() == null){ -// info.setParams(); - } - newIotModelServiceList.add(info); - } - } - EquipmentVo equipment = new EquipmentVo(); - equipment.setAddr(iotAddr); - equipment.setId(equipmentId); - equipment.setAttrs(newIotModelFieldList); - equipment.setServices(newIotModelServiceList); - equipments.add(equipment); + } catch (Exception e) { + log.error("获取设备配置信息失败", e); } configUpdateVo.setCreateTime(System.currentTimeMillis()); configUpdateVo.setNodeId(String.valueOf(nodeId)); diff --git a/das/src/main/java/com/das/modules/node/service/impl/SysNodeServiceImpl.java b/das/src/main/java/com/das/modules/node/service/impl/SysNodeServiceImpl.java index df6df401..ddbd4bbe 100644 --- a/das/src/main/java/com/das/modules/node/service/impl/SysNodeServiceImpl.java +++ b/das/src/main/java/com/das/modules/node/service/impl/SysNodeServiceImpl.java @@ -13,6 +13,9 @@ import com.das.modules.auth.entity.SysOrg; import com.das.modules.auth.mapper.SysOrgMapper; import com.das.modules.equipment.mapper.SysEquipmentMapper; import com.das.modules.equipment.mapper.SysIotModelMapper; +import com.das.modules.node.constant.NodeConstant; +import com.das.modules.node.disruptor.TerminalMessageEventHandler; +import com.das.modules.node.domain.bo.TerminalMessage; import com.das.modules.node.domain.dto.*; import com.das.modules.node.domain.vo.ImptabmappingVo; import com.das.modules.node.domain.vo.SysCommunicationLinkVo; @@ -26,6 +29,7 @@ import com.das.modules.node.mapper.SysImptabmappingMapper; import com.das.modules.node.mapper.SysNodeMapper; import com.das.modules.node.service.SysNodeService; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; import lombok.extern.slf4j.Slf4j; @@ -44,6 +48,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; import java.util.List; @Transactional(rollbackFor = Exception.class) @@ -70,6 +75,8 @@ public class SysNodeServiceImpl implements SysNodeService { @Autowired private SysIotModelMapper sysIotModelMapper; + TerminalMessageEventHandler terminalMessageEventHandler; + @Override public List querySysNodeList() { List sysNodeVoList = sysNodeMapper.querySysNodeList(); @@ -320,6 +327,33 @@ public class SysNodeServiceImpl implements SysNodeService { return flag; } + @Override + public void deviceControl(DeviceControlDto device) { + try { + HashMap map = new HashMap<>(); + ObjectMapper objectMapper = new ObjectMapper(); + String cmd = NodeConstant.DEVICE_CONTROL; + Long time = System.currentTimeMillis(); + + map.put("deviceId", device.getDeviceId()); + map.put("serviceName", device.getServiceName()); + map.put("opValue", device.getOpValue()); + + // 将 HashMap 转换为 JsonNode + ObjectNode jsonNode = objectMapper.convertValue(map, ObjectNode.class); + TerminalMessage configUpdate = TerminalMessage.builder() + .cmd(cmd) + .cmdId(String.valueOf(device.getDeviceId())) + .time(time) + .data(jsonNode) + .build(); + terminalMessageEventHandler.sendTerminalMessageWithResult(device.getNodeId(), configUpdate); + } catch (Exception e) { + log.error("设备控制失败 ", e); + } + + } + private void addSysImptabmapping(List equipmentId, Long linkId, SysUserVo sysUserVo, List addList) { int index = 0; for (BindEquipmentInfoDto info : equipmentId) { diff --git a/das/src/main/resources/application.yml b/das/src/main/resources/application.yml index bb25f22a..e178e838 100644 --- a/das/src/main/resources/application.yml +++ b/das/src/main/resources/application.yml @@ -92,4 +92,9 @@ das: logging: level: com: - das: DEBUG \ No newline at end of file + das: DEBUG + +tdengine: + password: taosdata + url: jdbc:TAOS-RS://192.168.109.160:6041/das + username: root \ No newline at end of file diff --git a/das/src/main/resources/mapper/SysCommunicationLinkMapper.xml b/das/src/main/resources/mapper/SysCommunicationLinkMapper.xml index facc13b1..9b302a14 100644 --- a/das/src/main/resources/mapper/SysCommunicationLinkMapper.xml +++ b/das/src/main/resources/mapper/SysCommunicationLinkMapper.xml @@ -30,7 +30,7 @@ diff --git a/das/src/main/resources/mapper/SysImptabmappingMapper.xml b/das/src/main/resources/mapper/SysImptabmappingMapper.xml index abb989a0..e8e0c105 100644 --- a/das/src/main/resources/mapper/SysImptabmappingMapper.xml +++ b/das/src/main/resources/mapper/SysImptabmappingMapper.xml @@ -57,7 +57,7 @@ - select sims.service_name as name ,sims.service_type as type ,to_json(si.params::json) AS params from sys_imptabmapping si + select sims.service_name as name ,sims.service_type as type ,si.params from sys_imptabmapping si left join sys_equipment se on si.equipment_id = se.id left join sys_iot_model_service sims on se.iot_model_id = sims.iot_model_id where si.equipment_id = #{equipmentId} diff --git a/das/src/main/resources/mapper/SysIotModelMapper.xml b/das/src/main/resources/mapper/SysIotModelMapper.xml index 8d24ebd0..2d61f3ac 100644 --- a/das/src/main/resources/mapper/SysIotModelMapper.xml +++ b/das/src/main/resources/mapper/SysIotModelMapper.xml @@ -54,5 +54,15 @@ + + + + +