diff --git a/das/pom.xml b/das/pom.xml index 61fde9d6..df873674 100644 --- a/das/pom.xml +++ b/das/pom.xml @@ -31,6 +31,8 @@ 3.2.10 3.4.4 5.4.3 + 8.4.3 + 1.5.3 @@ -201,6 +203,17 @@ + + + io.minio + minio + ${minio.version} + + + org.jfree + jfreechart + ${jfreechart.version} + diff --git a/das/src/main/java/com/das/common/config/WebsocketConfig.java b/das/src/main/java/com/das/common/config/WebsocketConfig.java index f28aee31..95361632 100644 --- a/das/src/main/java/com/das/common/config/WebsocketConfig.java +++ b/das/src/main/java/com/das/common/config/WebsocketConfig.java @@ -1,10 +1,9 @@ package com.das.common.config; -import com.das.modules.node.handler.NodeHandshakeInterceptor; -import com.das.modules.node.handler.NodeMessageHandler; +import com.das.modules.node.handler.NodeWebsocketHandshakeInterceptor; +import com.das.modules.node.service.impl.NodeMessageServiceImpl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; -import org.springframework.stereotype.Component; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; @@ -12,15 +11,13 @@ import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry @EnableWebSocket @Configuration public class WebsocketConfig implements WebSocketConfigurer { - @Autowired - NodeHandshakeInterceptor nodeHandshakeInterceptor; @Autowired - NodeMessageHandler nodeMessageHandler; + NodeMessageServiceImpl nodeMessageService; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { - registry.addHandler(nodeMessageHandler, "/node/{nodeId}/{version}") + registry.addHandler(nodeMessageService, "/node/{nodeId}/{version}") .setAllowedOrigins("*") - .addInterceptors(nodeHandshakeInterceptor); + .addInterceptors(new NodeWebsocketHandshakeInterceptor()); } } diff --git a/das/src/main/java/com/das/common/constant/FileConstants.java b/das/src/main/java/com/das/common/constant/FileConstants.java new file mode 100644 index 00000000..656d425c --- /dev/null +++ b/das/src/main/java/com/das/common/constant/FileConstants.java @@ -0,0 +1,27 @@ +package com.das.common.constant; + +/** + * @Author liuyuxia + * @ClassName FilesvrConstants + * @Date 2018/12/10 0010 11:02 + * @Version 2.5 + * @Description 文件服务基础常量 + **/ +public class FileConstants { + + public static final String FILE_SEPARATOR = "/"; + + public static final String FILE_CHARSET = "UTF-8"; + + /** + * 递归 + */ + public static final Integer YES_RECURSIVE=1; + public static final Integer NO_RECURSIVE=0; + + public static final int META_R = 1001; + public static final int META_W = 1002; + + public static final String META_R_NAME = "读"; + public static final String META_W_NAME = "写"; +} diff --git a/das/src/main/java/com/das/modules/equipment/entity/SysEquipment.java b/das/src/main/java/com/das/modules/equipment/entity/SysEquipment.java index 53f888db..8b1a83f6 100644 --- a/das/src/main/java/com/das/modules/equipment/entity/SysEquipment.java +++ b/das/src/main/java/com/das/modules/equipment/entity/SysEquipment.java @@ -137,4 +137,10 @@ public class SysEquipment extends BaseEntity { */ @TableField(value = "nominal_capacity") private Double nominalCapacity; + + /** + * 故障录波格式 + */ + @TableField(value = "fdr_format") + private String fdrFormat; } diff --git a/das/src/main/java/com/das/modules/equipment/service/impl/SysIotModelServiceImpl.java b/das/src/main/java/com/das/modules/equipment/service/impl/SysIotModelServiceImpl.java index 67750e0c..0476b8c3 100644 --- a/das/src/main/java/com/das/modules/equipment/service/impl/SysIotModelServiceImpl.java +++ b/das/src/main/java/com/das/modules/equipment/service/impl/SysIotModelServiceImpl.java @@ -469,7 +469,7 @@ public class SysIotModelServiceImpl implements SysIotModelService { addModelFieldCache(highCreateList.get(i)); } } - for (SysIotModelField item : calCreateList){ + for (SysIotModelField item : calCreateList) { createTdStableOrColumn(item); addModelFieldCache(item); } @@ -582,7 +582,7 @@ public class SysIotModelServiceImpl implements SysIotModelService { if (sysIotModelField.getAttributeType() == 199) { Long iotModelId = sysIotModelField.getIotModelId(); String modelCode = dataService.iotModelMap.get(iotModelId.toString()); - tdEngineService.deleteStable("c_" + modelCode +"_"+ sysIotModelField.getAttributeCode()); + tdEngineService.deleteStable("c_" + modelCode + "_" + sysIotModelField.getAttributeCode()); } else { String stableName = null; SysIotModel sysIotModel = sysIotModelMapper.selectById(sysIotModelField.getIotModelId()); @@ -611,6 +611,15 @@ public class SysIotModelServiceImpl implements SysIotModelService { private void addModelFieldCache(SysIotModelField sysIotModelField) { //获取物模型编码 String modelCode = dataService.iotModelMap.get(sysIotModelField.getIotModelId().toString()); + Map fieldCodeNameMap = dataService.fieldCodeNameMap.get(modelCode); + if (fieldCodeNameMap == null) { + Map fieldCodeName = new HashMap<>(); + fieldCodeName.put(sysIotModelField.getAttributeCode(),sysIotModelField.getAttributeName()); + dataService.fieldCodeNameMap.put(modelCode,fieldCodeName); + } + else { + fieldCodeNameMap.put(sysIotModelField.getAttributeCode(), sysIotModelField.getAttributeName()); + } if (sysIotModelField.getAttributeType() == 199) { Map map = dataService.calculateIotFieldMap.get(modelCode); if (map == null) { @@ -657,10 +666,12 @@ public class SysIotModelServiceImpl implements SysIotModelService { private void deleteModelFieldCache(SysIotModelField sysIotModelField) { //获取物模型编码 String modelCode = dataService.iotModelMap.get(sysIotModelField.getIotModelId().toString()); - if (sysIotModelField.getAttributeType() == 199){ + Map fieldCodeName = dataService.fieldCodeNameMap.get(modelCode); + fieldCodeName.remove(sysIotModelField.getAttributeCode()); + if (sysIotModelField.getAttributeType() == 199) { Map map = dataService.calculateIotFieldMap.get(modelCode); map.remove(sysIotModelField.getAttributeCode()); - }else { + } else { if (sysIotModelField.getHighSpeed() == 0) { Map map = dataService.lowIotFieldMap.get(modelCode); map.remove(sysIotModelField.getAttributeCode()); diff --git a/das/src/main/java/com/das/modules/fdr/config/MinioConfig.java b/das/src/main/java/com/das/modules/fdr/config/MinioConfig.java new file mode 100644 index 00000000..92941ef5 --- /dev/null +++ b/das/src/main/java/com/das/modules/fdr/config/MinioConfig.java @@ -0,0 +1,71 @@ +package com.das.modules.fdr.config; + +import io.minio.*; +import io.minio.errors.*; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +@Slf4j +@Configuration +@ConditionalOnClass(MinioClient.class) +public class MinioConfig { + + @Resource + private MinioProperties minioAutoProperties; + @Bean + public MinioClient minioClient() { + log.info("开始初始化MinioClient, url为{}, accessKey为:{}", minioAutoProperties.getUrl(), minioAutoProperties.getAccessKey()); + MinioClient minioClient = MinioClient + .builder() + .endpoint(minioAutoProperties.getUrl()) + .credentials(minioAutoProperties.getAccessKey(), minioAutoProperties.getSecretKey()) + .build(); + + minioClient.setTimeout( + minioAutoProperties.getConnectTimeout(), + minioAutoProperties.getWriteTimeout(), + minioAutoProperties.getReadTimeout() + ); + // Start detection + if (minioAutoProperties.isCheckBucket()) { + log.info("checkBucket为{}, 开始检测桶是否存在", minioAutoProperties.isCheckBucket()); + String bucketName = minioAutoProperties.getBucket(); + if (!checkBucket(bucketName, minioClient)) { + log.info("文件桶[{}]不存在, 开始检查是否可以新建桶", bucketName); + if (minioAutoProperties.isCreateBucket()) { + log.info("createBucket为{},开始新建文件桶", minioAutoProperties.isCreateBucket()); + createBucket(bucketName, minioClient); + } + } + log.info("文件桶[{}]已存在, minio客户端连接成功!", bucketName); + } else { + throw new RuntimeException("桶不存在, 请检查桶名称是否正确或者将checkBucket属性改为false"); + } + return minioClient; + } + + private boolean checkBucket(String bucketName, MinioClient minioClient) { + boolean isExists = false; + try { + isExists = minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build()); + } catch (Exception e) { + throw new RuntimeException("failed to check if the bucket exists", e); + } + return isExists; + } + + private void createBucket(String bucketName, MinioClient minioClient) { + try { + minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucketName).build()); + log.info("文件桶[{}]新建成功, minio客户端已连接", bucketName); + } catch (Exception e) { + throw new RuntimeException("failed to create default bucket", e); + } + } + +} diff --git a/das/src/main/java/com/das/modules/fdr/config/MinioProperties.java b/das/src/main/java/com/das/modules/fdr/config/MinioProperties.java new file mode 100644 index 00000000..3ce7cfed --- /dev/null +++ b/das/src/main/java/com/das/modules/fdr/config/MinioProperties.java @@ -0,0 +1,70 @@ +package com.das.modules.fdr.config; + +import jakarta.validation.constraints.NotEmpty; +import lombok.Data; +import org.hibernate.validator.constraints.URL; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; +import org.springframework.validation.annotation.Validated; + +@Data +@Validated +@Component +public class MinioProperties { + /** + * 服务地址 + */ + @NotEmpty(message = "minio服务地址不可为空") + @URL(message = "minio服务地址格式错误") + @Value("${minio.url}") + private String url; + + /** + * 认证账户 + */ + @NotEmpty(message = "minio认证账户不可为空") + @Value("${minio.accessKey}") + private String accessKey; + + /** + * 认证密码 + */ + @NotEmpty(message = "minio认证密码不可为空") + @Value("${minio.secretKey}") + private String secretKey; + + /** + * 桶名称, 优先级最低 + */ + @Value("${minio.bucket}") + private String bucket; + + /** + * 桶不在的时候是否新建桶 + */ + private boolean createBucket = true; + + /** + * 启动的时候检查桶是否存在 + */ + private boolean checkBucket = true; + + /** + * 设置HTTP连接、写入和读取超时。值为0意味着没有超时 + * HTTP连接超时,以毫秒为单位。 + */ + private long connectTimeout; + + /** + * 设置HTTP连接、写入和读取超时。值为0意味着没有超时 + * HTTP写超时,以毫秒为单位。 + */ + private long writeTimeout; + + /** + * 设置HTTP连接、写入和读取超时。值为0意味着没有超时 + * HTTP读取超时,以毫秒为单位。 + */ + private long readTimeout; +} diff --git a/das/src/main/java/com/das/modules/fdr/controller/FaultRecorderController.java b/das/src/main/java/com/das/modules/fdr/controller/FaultRecorderController.java new file mode 100644 index 00000000..17e22d98 --- /dev/null +++ b/das/src/main/java/com/das/modules/fdr/controller/FaultRecorderController.java @@ -0,0 +1,48 @@ +package com.das.modules.fdr.controller; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.das.common.result.R; +import com.das.modules.equipment.entity.SysEquipment; +import com.das.modules.fdr.domain.FileNode; +import com.das.modules.fdr.service.FaultRecorderService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * 故障录波controller + */ +@Slf4j +@RequestMapping("/api/fdr") +@RestController +public class FaultRecorderController { + + @Autowired + private FaultRecorderService faultRecorderService; + + @RequestMapping(value = "/files", method = RequestMethod.POST) + public R> findList(@RequestBody JSONObject jsonObject) { + String code = jsonObject.getString("deviceCode"); + String startTime = jsonObject.getString("startTime"); + String endTime = jsonObject.getString("endTime"); + List result = faultRecorderService.getDirOrFileList(code,startTime,endTime); + return R.success(result); + } + + @RequestMapping(value = "/parseData", method = RequestMethod.POST) + public R>> parseData(@RequestBody JSONObject jsonObject) throws IOException { + Map> dataCurve = faultRecorderService.getDataCurve(jsonObject.getString("url"), jsonObject.getString("deviceCode")); + return R.success(dataCurve); + } + + @RequestMapping(value = "/updateFdrConfig", method = RequestMethod.POST) + public void updateFdrConfig(@RequestBody SysEquipment sysEquipment){ + + faultRecorderService.updateFdrConfig(sysEquipment); + } +} diff --git a/das/src/main/java/com/das/modules/fdr/domain/FileNode.java b/das/src/main/java/com/das/modules/fdr/domain/FileNode.java new file mode 100644 index 00000000..8a2894b5 --- /dev/null +++ b/das/src/main/java/com/das/modules/fdr/domain/FileNode.java @@ -0,0 +1,22 @@ +package com.das.modules.fdr.domain; + +import lombok.*; + +@Getter +@Setter +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class FileNode { + //节点名称 + private String name; + + // 0代表文件夹,1代表文件 + private int type; + + private String size; + + private String lastModified; + + private String path; +} diff --git a/das/src/main/java/com/das/modules/fdr/domain/vo/FdrFormatVo.java b/das/src/main/java/com/das/modules/fdr/domain/vo/FdrFormatVo.java new file mode 100644 index 00000000..1ad14109 --- /dev/null +++ b/das/src/main/java/com/das/modules/fdr/domain/vo/FdrFormatVo.java @@ -0,0 +1,17 @@ +package com.das.modules.fdr.domain.vo; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class FdrFormatVo { + + private String timeFormat; + + private String delimiter; + + private Integer validStartLine; +} diff --git a/das/src/main/java/com/das/modules/fdr/service/FaultRecorderService.java b/das/src/main/java/com/das/modules/fdr/service/FaultRecorderService.java new file mode 100644 index 00000000..2add8e12 --- /dev/null +++ b/das/src/main/java/com/das/modules/fdr/service/FaultRecorderService.java @@ -0,0 +1,28 @@ +package com.das.modules.fdr.service; + +import com.das.modules.equipment.entity.SysEquipment; +import com.das.modules.fdr.domain.FileNode; +import org.springframework.web.multipart.MultipartFile; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; + +public interface FaultRecorderService { + List getDirOrFileList(String name,String startTime, String endTime); + + Map> getDataCurve(String url, String deviceCode) throws IOException; + + void updateFdrConfig(SysEquipment sysEquipment); + + String upload(String parent, String folderName, MultipartFile file); + + void readFileToSteam(String path, OutputStream stream); + + void download(String path, Path tempDir); + + + +} diff --git a/das/src/main/java/com/das/modules/fdr/service/MinioViewsServcie.java b/das/src/main/java/com/das/modules/fdr/service/MinioViewsServcie.java new file mode 100644 index 00000000..30204f0e --- /dev/null +++ b/das/src/main/java/com/das/modules/fdr/service/MinioViewsServcie.java @@ -0,0 +1,232 @@ +package com.das.modules.fdr.service; + +import cn.hutool.core.io.FileUtil; +import com.das.common.constant.FileConstants; +import com.das.modules.fdr.config.MinioProperties; +import com.das.modules.fdr.domain.FileNode; +import io.micrometer.common.util.StringUtils; +import io.minio.*; +import io.minio.errors.*; +import io.minio.messages.Item; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.web.multipart.MultipartFile; + +import java.io.*; +import java.nio.file.Path; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +@Service +@Slf4j +public class MinioViewsServcie { + @Autowired + private MinioClient minioClient; + + @Autowired + private MinioProperties minioProperties; + + /** + * 删除文件 + * + * @param bucketName 存储桶 + * @param objectName 文件名称 + */ + public void removeFile(String bucketName, String objectName, Boolean recursive) throws Exception { + Iterable> results = minioClient.listObjects( + ListObjectsArgs.builder().bucket(bucketName).prefix(objectName).recursive(true).build()); + List> list = StreamSupport.stream(results.spliterator(), false) + .collect(Collectors.toList()); + if (list.size() >= 2 && !recursive) { + throw new IOException("请清空文件后再删除目录"); + } + for (Result result : results) { + Item item = result.get(); + minioClient.removeObject( + RemoveObjectArgs.builder() + .bucket(bucketName) + .object(item.objectName()) + .build()); + } + } + + + public Boolean deleteFileViews(String path, Boolean recursive) throws IOException { + if (StringUtils.isBlank(path)) { + throw new IOException("请确认删除文件路径"); + } + Boolean success = null; + try { + path = path.substring(path.indexOf("/") + 1); + removeFile(minioProperties.getBucket(), path, recursive); + } catch (Exception e) { + throw new RuntimeException(e); + } + return success; + } + + public boolean deleteFile(File file) { + return file.delete(); + } + + + public String upload(String path, String folderName,MultipartFile file) { + String targetFile = null; + try { + // 上传一个空对象来模拟文件夹 + if (!StringUtils.isBlank(folderName)){ + targetFile = path + folderName + FileConstants.FILE_SEPARATOR; + ByteArrayInputStream bais = new ByteArrayInputStream(new byte[0]); + minioClient.putObject( + PutObjectArgs.builder() + .bucket(minioProperties.getBucket()) + .object(targetFile) + .stream(bais, 0, -1) + .build()); + } + else { + targetFile= path +"/" + file.getOriginalFilename(); + uploadFile(minioProperties.getBucket(), file, targetFile, "application/octet-stream"); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return targetFile; + } + + + /** + * 使用MultipartFile进行文件上传 + * + * @param bucketName 存储桶 + * @param file 文件名 + * @param objectName 对象名 + * @param contentType 类型 + * @throws Exception + */ + public void uploadFile(String bucketName, MultipartFile file, String objectName, String contentType) throws Exception { + InputStream inputStream = file.getInputStream(); + try{ + minioClient.putObject( + PutObjectArgs.builder() + .bucket(bucketName) + .object(objectName) + .contentType(contentType) + .stream(inputStream, inputStream.available(), -1) + .build()); + }catch (Exception e){ + log.error("minio文件上传失败{}", e); + } + + } + + //获取路径下的文件夹文件列表 + public List getFileTree(String directoryName) { + List fileNodes = new ArrayList<>(); + ListObjectsArgs build; + + try { + if (StringUtils.isBlank(directoryName)) { + build = ListObjectsArgs.builder().bucket(minioProperties.getBucket()).recursive(false).build(); + } else { + build = ListObjectsArgs.builder().bucket(minioProperties.getBucket()).prefix(directoryName+"/").recursive(false).build(); + } + Iterable> results = minioClient.listObjects(build); + for (Result result : results) { + Item item = result.get(); + String itemName = item.objectName(); + boolean isDir = item.isDir(); + String size = FileUtil.readableFileSize(item.size()); + String relativePath = null; + String[] parts = null; + if (!StringUtils.isBlank(directoryName)){ + relativePath = itemName.substring(directoryName.length()); + parts = relativePath.split("/"); + } + else { + parts = itemName.split("/"); + } + String lastModifyTime = null; + DateTimeFormatter dateFormat =DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"); + if (!isDir){ + ZonedDateTime zonedDateTime = item.lastModified(); + lastModifyTime = zonedDateTime.format(dateFormat); + } + if (parts.length > 0) { + String nodeName = parts[0]; + int type = isDir ? 0 : 1; + itemName= isDir ? itemName.substring(0,itemName.lastIndexOf("/")) : itemName; + FileNode node = new FileNode(nodeName, type,size,lastModifyTime,"/"+itemName); + if (!fileNodes.contains(node)) { + fileNodes.add(node); + } + } + } + } catch (Exception e) { + log.error("minio获取树列表失败", e); + } + return fileNodes; + } + + public void readFileToStream(String path, OutputStream stream) { + + try ( GetObjectResponse res = minioClient.getObject( + GetObjectArgs.builder().bucket(minioProperties.getBucket()).object(path).build())){ + res.transferTo(stream); + } catch (Exception e) { + log.error("minio读取文件失败", e); + } + } + + public void download(String path, Path tempDir) { + + try (InputStream inputStream = minioClient.getObject(GetObjectArgs.builder() + .bucket(minioProperties.getBucket()) + .object(path) + .build())) { + + // 保存到临时文件夹 + File tempFile = tempDir.resolve(tempDir+path).toFile(); + FileUtil.writeFromStream(inputStream,tempFile); + } + catch (Exception ignored){ + + } + } + + // 递归方式 计算文件的大小 + public long getTotalSizeOfFilesInDir(File file) { + if (file.isFile()) { + return file.length(); + } + + File[] children = file.listFiles(); + long total = 0; + if (children != null) { + + for (final File child : children) { + total += getTotalSizeOfFilesInDir(child); + } + } + return total; + } + + public InputStream getFileStream(String url){ + + InputStream inputStream = null; + try { + inputStream = minioClient.getObject(GetObjectArgs.builder().bucket(minioProperties.getBucket()).object(url).build()); + } catch (Exception e) { + log.error("获取文件失败"); + } + return inputStream; + } + +} diff --git a/das/src/main/java/com/das/modules/fdr/service/impl/FaultRecorderServiceImpl.java b/das/src/main/java/com/das/modules/fdr/service/impl/FaultRecorderServiceImpl.java new file mode 100644 index 00000000..6c980999 --- /dev/null +++ b/das/src/main/java/com/das/modules/fdr/service/impl/FaultRecorderServiceImpl.java @@ -0,0 +1,195 @@ +package com.das.modules.fdr.service.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.das.common.constant.FileConstants; +import com.das.common.exceptions.ServiceException; +import com.das.modules.equipment.entity.SysEquipment; +import com.das.modules.equipment.mapper.SysEquipmentMapper; +import com.das.modules.fdr.config.MinioProperties; +import com.das.modules.fdr.domain.FileNode; +import com.das.modules.fdr.domain.vo.FdrFormatVo; +import com.das.modules.fdr.service.FaultRecorderService; +import com.das.modules.fdr.service.MinioViewsServcie; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.micrometer.common.util.StringUtils; +import io.minio.GetObjectArgs; +import io.minio.MinioClient; +import lombok.extern.slf4j.Slf4j; +import org.checkerframework.checker.units.qual.A; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.web.multipart.MultipartFile; + +import java.io.*; +import java.nio.file.Path; +import java.rmi.ServerException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.stream.Collectors; + +@Service +@Slf4j +public class FaultRecorderServiceImpl implements FaultRecorderService { + + @Autowired + private MinioViewsServcie minioViewsServcie; + + @Autowired + MinioClient minioClient; + + @Autowired + private MinioProperties minioProperties; + + @Autowired + private SysEquipmentMapper sysEquipmentMapper; + + + @Override + public List getDirOrFileList(String name, String startTime, String endTime) { + List fileResult = new ArrayList<>(); + List monthsBetween = getMonthsBetween(startTime, endTime); + for (String item : monthsBetween) { + String directoryName = name + FileConstants.FILE_SEPARATOR + item.substring(0, item.indexOf("-")) + FileConstants.FILE_SEPARATOR + item.substring(item.indexOf("-") + 1); + List fileTree = minioViewsServcie.getFileTree(directoryName); + fileResult.addAll(fileTree); + } + Comparator fileNodeComparator = Comparator.comparing(FileNode::getLastModified) + .thenComparing(FileNode::getName); + fileResult.sort(fileNodeComparator); + return fileResult; + } + + @Override + public String upload(String parent, String folderName, MultipartFile file) { + return minioViewsServcie.upload(parent, folderName, file); + } + + @Override + public void readFileToSteam(String path, OutputStream stream) { + minioViewsServcie.readFileToStream(path, stream); + } + + @Override + public void download(String path, Path tempDir) { + minioViewsServcie.download(path, tempDir); + } + + + private List getMonthsBetween(String startTime, String endTime) { + List months = new ArrayList<>(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + LocalDate start = LocalDate.parse(startTime + "-01", formatter); + LocalDate end = LocalDate.parse(endTime + "-01", formatter); + + DateTimeFormatter monthFormatter = DateTimeFormatter.ofPattern("yyyy-MM"); + + while (!start.isAfter(end)) { + months.add(start.format(monthFormatter)); + start = start.plusMonths(1); + } + return months; + } + + @Override + public Map> getDataCurve(String url, String deviceCode) throws IOException { + Map> resultMap = null; + try (InputStream fileStream = minioViewsServcie.getFileStream(url)) { + //根据device Code查询故障录波格式 + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.eq("CODE", deviceCode); + SysEquipment sysEquipment = sysEquipmentMapper.selectOne(queryWrapper); + if (sysEquipment == null) { + throw new ServiceException("设备不存在,请选择正确设备"); + } + + if (StringUtils.isBlank(sysEquipment.getFdrFormat())){ + throw new ServerException("请添加设备故障录波配置"); + } + FdrFormatVo fdrFormatVo = JSON.parseObject(sysEquipment.getFdrFormat(), FdrFormatVo.class); + + + // 解析文件内容 + resultMap = parseFile(fileStream, fdrFormatVo.getTimeFormat(), fdrFormatVo.getDelimiter(), fdrFormatVo.getValidStartLine()); + + } catch (Exception e) { + e.printStackTrace(); + } + return resultMap; + } + + @Override + public void updateFdrConfig(SysEquipment sysEquipment) { + sysEquipmentMapper.updateById(sysEquipment); + } + + public Map> parseFile(InputStream inputStream, String timeFormat, String delimiter, int validStartLine) { + + + List> result = new ArrayList<>(); + Map> stringListMap = null; + try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { + String line; + int lineNumber = 0; + + while ((line = reader.readLine()) != null) { + lineNumber++; + // 忽略有效行之前的行 + if (lineNumber < validStartLine) { + continue; + } + // 按照分隔符分割行数据 + List lineData = Arrays.stream(line.split(delimiter)).toList(); + result.add(lineData); + } + stringListMap = parseDataCurve(result, timeFormat); + } catch (Exception e) { + log.error("文件解析失败{}", e); + } + return stringListMap; + } + + public Map> parseDataCurve(List> data, String timeFormat) throws ParseException { + List listField = data.get(0); + Map> map = new HashMap<>(); + data.remove(0); + for (List item : data) { + for (int i = 0; i < item.size(); i++) { + if (map.get(listField.get(i)) == null) { + if (i == 0){ + List timeList = new ArrayList<>(); + long timestamp = convertToTimestamp(item.get(i), timeFormat); + timeList.add(timestamp); + map.put(listField.get(i),timeList); + }else { + List valueList = new ArrayList<>(); + valueList.add(Double.valueOf(item.get(i))); + map.put(listField.get(i), valueList); + } + + } else { + List valueList = map.get(listField.get(i)); + if (i == 0){ + valueList.add(convertToTimestamp(item.get(i),timeFormat)); + + } + else { + valueList.add(Double.valueOf(item.get(i))); + } + + } + } + } + return map; + } + + public long convertToTimestamp(String time, String pattern) throws ParseException { + SimpleDateFormat sdf = new SimpleDateFormat(pattern); + return sdf.parse(time).getTime(); + } +} diff --git a/das/src/main/java/com/das/modules/node/disruptor/MessageEventFactory.java b/das/src/main/java/com/das/modules/node/disruptor/MessageEventFactory.java deleted file mode 100644 index 9a3a65f2..00000000 --- a/das/src/main/java/com/das/modules/node/disruptor/MessageEventFactory.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.das.modules.node.disruptor; - -import com.das.modules.node.domain.bo.TerminalMessage; -import com.lmax.disruptor.EventFactory; - -public class MessageEventFactory implements EventFactory { - @Override - public TerminalMessage newInstance() { - return TerminalMessage.builder().build(); - } -} diff --git a/das/src/main/java/com/das/modules/node/disruptor/TerminalMessageEventHandler.java b/das/src/main/java/com/das/modules/node/disruptor/TerminalMessageEventHandler.java deleted file mode 100644 index 0234d9d0..00000000 --- a/das/src/main/java/com/das/modules/node/disruptor/TerminalMessageEventHandler.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.das.modules.node.disruptor; - -import com.das.common.utils.SpringUtils; -import com.das.modules.node.command.BaseCommand; -import com.das.modules.node.domain.bo.TerminalMessage; -import com.das.modules.node.handler.NodeMessageHandler; -import com.lmax.disruptor.EventHandler; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.util.concurrent.*; - -@Slf4j -@Component -public class TerminalMessageEventHandler implements EventHandler { - - @Autowired - NodeMessageHandler nodeMessageHandler; - - private ConcurrentHashMap> callbackMap = new ConcurrentHashMap<>(16); - @Override - public void onEvent(TerminalMessage terminalMessage, long sequence, boolean endOfBatch) throws Exception { -// log.info("收到消息: {}", terminalMessage.toJsonString()); - if (callbackMap.containsKey(terminalMessage.getCmdId())){ - //如果是回调函数,推送到回调函数 - callbackMap.get(terminalMessage.getCmdId()).complete(terminalMessage); - } else{ - String cmd = terminalMessage.getCmd(); - BaseCommand commander = null; - try { - commander = SpringUtils.getBean(cmd); - } catch (Exception e) { - log.debug("当前未找到执行command容器"); - } - if (commander != null) { - try { - commander.doCommand(terminalMessage); - } catch (Exception ex) { - log.error(String.format("命令 - %s 处理失败", cmd), ex); - } - } else { - log.error("命令[{}]无效, 未发现实现适配器!", cmd); - } - } - } - - - public void sendTerminalMessageWithResult(Long nodeId, TerminalMessage message) throws ExecutionException, InterruptedException, TimeoutException { - nodeMessageHandler.sendActionMessage(nodeId, message); - - CompletableFuture future = new CompletableFuture<>(); - callbackMap.put(message.getCmdId(), future); - - TerminalMessage result = future.get(10, TimeUnit.SECONDS); - } -} diff --git a/das/src/main/java/com/das/modules/node/disruptor/TerminalMessageWorkerHandler.java b/das/src/main/java/com/das/modules/node/disruptor/TerminalMessageWorkerHandler.java new file mode 100644 index 00000000..2ea1a071 --- /dev/null +++ b/das/src/main/java/com/das/modules/node/disruptor/TerminalMessageWorkerHandler.java @@ -0,0 +1,32 @@ +package com.das.modules.node.disruptor; + +import com.das.common.utils.SpringUtils; +import com.das.modules.node.command.BaseCommand; +import com.das.modules.node.domain.bo.TerminalMessage; +import com.lmax.disruptor.WorkHandler; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class TerminalMessageWorkerHandler implements WorkHandler { + + + @Override + public void onEvent(TerminalMessage event) throws Exception { + String cmd = event.getCmd(); + BaseCommand commander = null; + try { + commander = SpringUtils.getBean(cmd); + } catch (Exception e) { + log.debug("当前未找到执行command容器"); + } + if (commander != null) { + try { + commander.doCommand(event); + } catch (Exception ex) { + log.error(String.format("命令 - %s 处理失败", cmd), ex); + } + } else { + log.error("命令[{}]无效, 未发现实现适配器!", cmd); + } + } +} diff --git a/das/src/main/java/com/das/modules/node/domain/bo/TerminalMessage.java b/das/src/main/java/com/das/modules/node/domain/bo/TerminalMessage.java index e90ae54f..16951875 100644 --- a/das/src/main/java/com/das/modules/node/domain/bo/TerminalMessage.java +++ b/das/src/main/java/com/das/modules/node/domain/bo/TerminalMessage.java @@ -6,8 +6,8 @@ import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; - @Builder +@NoArgsConstructor @AllArgsConstructor @Data public class TerminalMessage { diff --git a/das/src/main/java/com/das/modules/node/handler/NodeMessageHandler.java b/das/src/main/java/com/das/modules/node/handler/NodeMessageHandler.java deleted file mode 100644 index 1c7c2262..00000000 --- a/das/src/main/java/com/das/modules/node/handler/NodeMessageHandler.java +++ /dev/null @@ -1,150 +0,0 @@ -package com.das.modules.node.handler; - -import com.das.common.utils.JsonUtils; -import com.das.common.utils.SpringUtil; -import com.das.modules.node.constant.NodeConstant; -import com.das.modules.node.domain.bo.TerminalMessage; -import com.das.modules.node.service.NodeMessageService; -import com.fasterxml.jackson.databind.JsonNode; -import lombok.extern.slf4j.Slf4j; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; -import org.springframework.web.socket.*; -import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; -import org.springframework.web.socket.handler.TextWebSocketHandler; - -import java.util.concurrent.*; - -@Component -@Slf4j -public class NodeMessageHandler extends TextWebSocketHandler { - - public static final Long HEARTBEAT_TIMEOUT = 1000 * 60 * 1L; - private ConcurrentHashMap onlineSessions = new ConcurrentHashMap<>(16); - - private NodeMessageService nodeMessageService; - @Override - public void afterConnectionEstablished(WebSocketSession session) throws Exception { - String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, ""); - Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); - Long version = (Long)session.getAttributes().get(NodeConstant.VERSION); - long time = System.currentTimeMillis(); - log.debug("IP: {} 请求连接. sessionId: {}", remoteIp, session.getId()); - if (onlineSessions.containsKey(nodeId)){ - //如果终端节点已在线,则拒绝新的终端连接 - try { - session.close(CloseStatus.NOT_ACCEPTABLE); - } - catch (Exception ignore){} - } - else { - log.info("IP: {} 准许连接, NodeId:{}, Version: {}, sessionId: {}", remoteIp, nodeId, version, session.getId()); - onlineSessions.put(nodeId, new ConcurrentWebSocketSessionDecorator(session, 5 * 1000, 2 * 1024 * 1024)); - } - - // 如果version是0,则需要调用一次configUpdate配置更新 - if (version == 0){ - if (nodeMessageService == null){ - nodeMessageService = SpringUtil.getBean(NodeMessageService.class); - } - nodeMessageService.sendTerminalConfig(Long.valueOf(nodeId)); - } - - } - - @Override - protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { - TerminalMessage msg = JsonUtils.parseObject(message.getPayload(), TerminalMessage.class); - String nodeId = session.getAttributes().get(NodeConstant.NODE_ID).toString(); - String cmd = msg.getCmd(); - JsonNode data = msg.getData(); - log.info("收到 Node:{} 命令: {}", nodeId, cmd); - if (nodeMessageService == null){ - nodeMessageService = SpringUtil.getBean(NodeMessageService.class); - } - if (nodeMessageService != null){ - nodeMessageService.pushMessage(msg); - } - - } - - @Override - protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { - Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); - log.info("收到 Node:{} Pong Message", nodeId); - session.getAttributes().put(NodeConstant.LAST_PONG_TIME, System.currentTimeMillis()); - } - - @Override - public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { - log.error(String.format("通讯异常: NodeId: %s", session.getAttributes().get(NodeConstant.NODE_ID)), exception); - } - - @Override - public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { - log.info("IP: {} 已断开连接, NodeId:{}, Version: {}, sessionId:{}, 原因: {}", - session.getAttributes().get(NodeConstant.REMOTE_IP), - session.getAttributes().get(NodeConstant.NODE_ID), - session.getAttributes().get(NodeConstant.VERSION), - session.getId(), - status.toString()); - Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); - onlineSessions.remove(nodeId); - } - - - /** - * 定时发送心跳报文,并清理离线的终端 - */ - @Scheduled(cron = "0/15 * * * * ?") - public void sendHeartbeat(){ - for (ConcurrentWebSocketSessionDecorator session : onlineSessions.values()) { - //判断心跳是否超时,超时则主动断开连接 - Long lastPongTime = (Long) session.getAttributes().get(NodeConstant.LAST_PONG_TIME); - if (lastPongTime != null && ((System.currentTimeMillis() - lastPongTime) > HEARTBEAT_TIMEOUT)){ - closeSession(session); - return; - } - SendPingMessage(session); - } - } - - - private void closeSession(WebSocketSession session){ - try{ - session.close(CloseStatus.NO_CLOSE_FRAME); - } - catch (Exception ignore){} - - } - - /** - * 发送ping消息 - * @param session - */ - private void SendPingMessage(ConcurrentWebSocketSessionDecorator session){ - try { - session.sendMessage(new PingMessage()); - } - catch (Exception ignore){} - } - - /** - * 发送无返回值消息 - * @param nodeId - */ - public void sendActionMessage(Long nodeId, TerminalMessage message){ - ConcurrentWebSocketSessionDecorator session = onlineSessions.get(nodeId); - if (session != null){ - try { - session.sendMessage(new TextMessage(message.toJsonString())); - log.info("发送的消息为:{}", message.toJsonString()); - } - catch (Exception exception){ - log.error(String.format("发送消息失败: NodeId: %s", nodeId), exception); - closeSession(session); - } - } - } - -} diff --git a/das/src/main/java/com/das/modules/node/handler/NodeHandshakeInterceptor.java b/das/src/main/java/com/das/modules/node/handler/NodeWebsocketHandshakeInterceptor.java similarity index 93% rename from das/src/main/java/com/das/modules/node/handler/NodeHandshakeInterceptor.java rename to das/src/main/java/com/das/modules/node/handler/NodeWebsocketHandshakeInterceptor.java index 1cc543b2..f3c77ba6 100644 --- a/das/src/main/java/com/das/modules/node/handler/NodeHandshakeInterceptor.java +++ b/das/src/main/java/com/das/modules/node/handler/NodeWebsocketHandshakeInterceptor.java @@ -7,10 +7,8 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.stereotype.Component; -import org.springframework.util.MultiValueMap; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.HandshakeInterceptor; -import org.springframework.web.util.UriComponentsBuilder; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; @@ -22,8 +20,7 @@ import java.util.Map; * Websocket握手拦截器 */ @Slf4j -@Component -public class NodeHandshakeInterceptor implements HandshakeInterceptor { +public class NodeWebsocketHandshakeInterceptor implements HandshakeInterceptor { public String getRealIp(ServerHttpRequest request) { diff --git a/das/src/main/java/com/das/modules/node/service/NodeMessageService.java b/das/src/main/java/com/das/modules/node/service/NodeMessageService.java index ec1abb92..1953a9d5 100644 --- a/das/src/main/java/com/das/modules/node/service/NodeMessageService.java +++ b/das/src/main/java/com/das/modules/node/service/NodeMessageService.java @@ -1,14 +1,16 @@ package com.das.modules.node.service; -import com.das.modules.node.domain.bo.CalculateRTData; import com.das.modules.node.domain.bo.TerminalMessage; import com.fasterxml.jackson.databind.JsonNode; -import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +/** + * 节点消息处理服务 + */ public interface NodeMessageService { - void pushMessage(TerminalMessage msg); JsonNode sendTerminalConfig(Long nodeId); @@ -19,4 +21,26 @@ public interface NodeMessageService { void handleLowSpeed(TerminalMessage data); void handleDeviceEvent(TerminalMessage data); + + /** + * 向指定采集节点发送指令(无返回值) + * @param nodeId 节点ID + * @param message 指令 + */ + void sendActionMessage(Long nodeId, TerminalMessage message); + /** + * 向指定采集节点发送指令(有返回值) + * @param nodeId 节点ID + * @param message 指令 + * @return 采集节点返回的指令 + */ + TerminalMessage sendTerminalMessageWithResult(Long nodeId, TerminalMessage message) throws ExecutionException, InterruptedException, TimeoutException; + /** + * 向指定采集节点发送指令(有返回值) + * @param nodeId 节点ID + * @param message 指令 + * @param timeout 超时时间(秒) + * @return 采集节点返回的指令 + */ + TerminalMessage sendTerminalMessageWithResult(Long nodeId, TerminalMessage message, long timeout) throws ExecutionException, InterruptedException, TimeoutException; } diff --git a/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java index b71c256b..17c06659 100644 --- a/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java +++ b/das/src/main/java/com/das/modules/node/service/impl/NodeMessageServiceImpl.java @@ -1,64 +1,76 @@ package com.das.modules.node.service.impl; +import cn.hutool.core.util.IdUtil; import com.baomidou.mybatisplus.core.toolkit.IdWorker; +import com.das.common.constant.MeasType; +import com.das.common.utils.AdminRedisTemplate; +import com.das.common.utils.JsonUtils; import com.das.common.utils.StringUtils; import com.das.modules.cache.domain.DeviceInfoCache; import com.das.modules.cache.service.CacheService; import com.das.modules.data.domain.DeviceEventInfo; +import com.das.modules.data.service.TDEngineService; import com.das.modules.data.service.impl.DataServiceImpl; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.core.type.TypeReference; -import cn.hutool.core.util.IdUtil; -import com.das.common.constant.MeasType; -import com.das.common.utils.AdminRedisTemplate; import com.das.modules.equipment.mapper.SysIotModelMapper; -import com.das.modules.node.disruptor.MessageEventFactory; -import com.das.modules.node.disruptor.TerminalMessageEventHandler; +import com.das.modules.node.constant.NodeConstant; +import com.das.modules.node.disruptor.TerminalMessageWorkerHandler; import com.das.modules.node.domain.bo.RTData; import com.das.modules.node.domain.bo.TerminalMessage; import com.das.modules.node.domain.vo.*; -import com.das.modules.node.handler.NodeMessageHandler; import com.das.modules.node.mapper.SysCommunicationLinkMapper; import com.das.modules.node.mapper.SysImpTabMappingMapper; import com.das.modules.node.service.NodeMessageService; -import com.das.modules.data.service.TDEngineService; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import com.lmax.disruptor.util.DaemonThreadFactory; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; +import org.springframework.web.socket.*; +import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; +import org.springframework.web.socket.handler.TextWebSocketHandler; +import java.io.IOException; import java.util.*; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; +import java.util.concurrent.*; @Slf4j @Service -public class NodeMessageServiceImpl implements NodeMessageService { +public class NodeMessageServiceImpl extends TextWebSocketHandler implements NodeMessageService { + + public static final Long HEARTBEAT_TIMEOUT = 1000 * 60 * 1L; + + /** + * JSON 转换器 + */ private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); + /** + * websocket 会话集合 + */ + private final ConcurrentHashMap onlineWSSessions = new ConcurrentHashMap<>(16); + + private ConcurrentHashMap> responseCallback = new ConcurrentHashMap<>(16); + private Disruptor disruptor = null; - private RingBuffer ringBuffer = null; - - @Resource - TerminalMessageEventHandler terminalMessageEventHandler; - @Resource SysCommunicationLinkMapper sysCommunicationLinkMapper; @Resource SysImpTabMappingMapper sysImptabmappingMapper; - @Resource - private NodeMessageHandler nodeMessageHandler; - @Autowired AdminRedisTemplate adminRedisTemplate; @@ -79,27 +91,27 @@ public class NodeMessageServiceImpl implements NodeMessageService { @PostConstruct public void init() { //初始化高性能队列 - Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); - MessageEventFactory factory = new MessageEventFactory(); - Disruptor disruptor = new Disruptor<>(factory, 1024 * 256, executor); - disruptor.handleEventsWith(terminalMessageEventHandler); + int cpu = Runtime.getRuntime().availableProcessors(); + int bufferSize = 1024 * 4; + disruptor = new Disruptor<>(TerminalMessage::new, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, new YieldingWaitStrategy()); + // + TerminalMessageWorkerHandler[] workerHandlers = new TerminalMessageWorkerHandler[cpu]; + for (int i = 0; i < cpu; i++) { + workerHandlers[i] = new TerminalMessageWorkerHandler(); + } + disruptor.handleEventsWithWorkerPool(workerHandlers); disruptor.start(); - ringBuffer = disruptor.getRingBuffer(); } @PreDestroy public void destroy() { - if (ringBuffer != null) { - ringBuffer = null; - } if (disruptor != null) { disruptor.shutdown(); } } - - @Override public void pushMessage(TerminalMessage msg) { + RingBuffer ringBuffer = disruptor.getRingBuffer(); if (ringBuffer == null) { return; } @@ -212,7 +224,7 @@ public class NodeMessageServiceImpl implements NodeMessageService { .time(time) .data(jsonNode) .build(); - nodeMessageHandler.sendActionMessage(nodeId, configUpdate); + sendActionMessage(nodeId, configUpdate); return jsonNode; } @@ -347,4 +359,177 @@ public class NodeMessageServiceImpl implements NodeMessageService { default -> null; }; } + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, ""); + Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); + Long version = (Long)session.getAttributes().get(NodeConstant.VERSION); + if (nodeId == null || version == null) { + log.warn("检测到非法连接请求, IP: {}", remoteIp); + try { + session.close(CloseStatus.NOT_ACCEPTABLE); + } catch (IOException ignored) { + } + return; + } + if (onlineWSSessions.contains(nodeId)){ + log.warn("检测到同一节点连接请求,已断开. NodeId: {}, IP:{}", nodeId, remoteIp); + try { + session.close(CloseStatus.NOT_ACCEPTABLE); + } catch (IOException ignored) { + } + return; + } + ConcurrentWebSocketSessionDecorator concurrentWebSocketSessionDecorator = new ConcurrentWebSocketSessionDecorator(session, 5 * 1000, 2 * 1024 * 1024); + onlineWSSessions.put(nodeId, concurrentWebSocketSessionDecorator); + + //如果采集程序的版本是0,则直接下发当前配置。 + if (version == 0){ + sendTerminalConfig(nodeId); + } + } + + /** + * 收到节点Websocket报文回调函数 + * @param session websocket session + * @param message 报文内容 + * @throws Exception + */ + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { + TerminalMessage msg = JsonUtils.parseObject(message.getPayload(), TerminalMessage.class); + if (msg == null) { + log.warn("收到非法报文:{}", message.getPayload()); + return; + } + //如果是应答报文,跳过队列,直接异步返回 + if (responseCallback.contains(msg.getCmdId())){ + responseCallback.get(msg.getCmdId()).complete(msg); + } + else{ + //如果是主动请求报文,加入队列,等待处理 + String nodeId = session.getAttributes().get(NodeConstant.NODE_ID).toString(); + String cmd = msg.getCmd(); + JsonNode data = msg.getData(); + log.debug("收到 Node:{} WS 报文: {}", nodeId, cmd); + pushMessage(msg); + } + + } + + @Override + protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { + Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); + log.info("收到 Node:{} Pong Message", nodeId); + session.getAttributes().put(NodeConstant.LAST_PONG_TIME, System.currentTimeMillis()); + } + + @Override + public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { + String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, ""); + Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); + Long version = (Long)session.getAttributes().get(NodeConstant.VERSION); + log.error(String.format("IP: %s 通讯异常, NodeId:%d, Version: %d", remoteIp, nodeId, version), exception); + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { + String remoteIp = (String) session.getAttributes().getOrDefault(NodeConstant.REMOTE_IP, ""); + Long nodeId = (Long)session.getAttributes().get(NodeConstant.NODE_ID); + Long version = (Long)session.getAttributes().get(NodeConstant.VERSION); + log.info("IP: {} 已断开连接, NodeId:{}, Version: {}, sessionId:{}, 原因: {}", + remoteIp, + nodeId, + version, + session.getId(), + status.toString()); + onlineWSSessions.remove(nodeId); + } + + /** + * 定时发送心跳报文,并清理离线的终端 + */ + @Scheduled(cron = "0/15 * * * * ?") + public void sendHeartbeat(){ + for (ConcurrentWebSocketSessionDecorator session : onlineWSSessions.values()) { + //判断心跳是否超时,超时则主动断开连接 + Long lastPongTime = (Long) session.getAttributes().get(NodeConstant.LAST_PONG_TIME); + if (lastPongTime != null && ((System.currentTimeMillis() - lastPongTime) > HEARTBEAT_TIMEOUT)){ + closeSession(session); + return; + } + SendPingMessage(session); + } + } + + /** + * 发送ping消息 + * @param session + */ + private void SendPingMessage(ConcurrentWebSocketSessionDecorator session){ + try { + session.sendMessage(new PingMessage()); + } + catch (Exception ignore){} + } + + private void closeSession(WebSocketSession session){ + try{ + session.close(CloseStatus.NO_CLOSE_FRAME); + } + catch (Exception ignore){} + + } + + /** + * 向指定采集节点发送指令(无返回值) + * @param nodeId 节点ID + * @param message 指令 + */ + public void sendActionMessage(Long nodeId, TerminalMessage message){ + ConcurrentWebSocketSessionDecorator session = onlineWSSessions.get(nodeId); + if (session != null){ + try { + session.sendMessage(new TextMessage(message.toJsonString())); + log.info("发送的消息为:{}", message.toJsonString()); + } + catch (Exception exception){ + log.error(String.format("发送消息失败: NodeId: %s", nodeId), exception); + closeSession(session); + } + } + } + + /** + * 向指定采集节点发送指令(无返回值) + * @param nodeId 节点ID + * @param message 指令 + * @return + * @throws ExecutionException + * @throws InterruptedException + * @throws TimeoutException + */ + @Override + public TerminalMessage sendTerminalMessageWithResult(Long nodeId, TerminalMessage message) throws ExecutionException, InterruptedException, TimeoutException { + return sendTerminalMessageWithResult(nodeId,message, 10); + } + + /** + * 向指定采集节点发送指令(有返回值) + * @param nodeId 节点ID + * @param message 指令 + * @param timeout 超时时间(秒) + * @return + * @throws ExecutionException + * @throws InterruptedException + * @throws TimeoutException + */ + @Override + public TerminalMessage sendTerminalMessageWithResult(Long nodeId, TerminalMessage message, long timeout) throws ExecutionException, InterruptedException, TimeoutException { + sendActionMessage(nodeId, message); + CompletableFuture future = new CompletableFuture<>(); + responseCallback.put(message.getCmdId(), future); + return future.get(timeout, TimeUnit.SECONDS); + } } diff --git a/das/src/main/java/com/das/modules/node/service/impl/SysNodeServiceImpl.java b/das/src/main/java/com/das/modules/node/service/impl/SysNodeServiceImpl.java index 031bc7e5..6be4c0fa 100644 --- a/das/src/main/java/com/das/modules/node/service/impl/SysNodeServiceImpl.java +++ b/das/src/main/java/com/das/modules/node/service/impl/SysNodeServiceImpl.java @@ -2,11 +2,9 @@ package com.das.modules.node.service.impl; import cn.dev33.satoken.stp.StpUtil; import cn.hutool.core.io.IoUtil; -import cn.hutool.core.util.IdUtil; import com.baomidou.mybatisplus.core.metadata.IPage; import com.das.common.config.SessionUtil; import com.das.common.constant.MeasType; -import com.das.common.exceptions.ServiceException; import com.das.common.utils.BeanCopyUtils; import com.das.common.utils.PageDataInfo; import com.das.common.utils.PageQuery; @@ -19,11 +17,14 @@ import com.das.modules.equipment.domain.vo.SysIotModelServiceVo; import com.das.modules.equipment.mapper.SysEquipmentMapper; import com.das.modules.equipment.mapper.SysIotModelFieldMapper; import com.das.modules.equipment.mapper.SysIotModelServiceMapper; -import com.das.modules.node.constant.NodeConstant; -import com.das.modules.node.disruptor.TerminalMessageEventHandler; -import com.das.modules.node.domain.bo.TerminalMessage; -import com.das.modules.node.domain.dto.*; -import com.das.modules.node.domain.vo.*; +import com.das.modules.node.domain.dto.BindEquipmentInfoDto; +import com.das.modules.node.domain.dto.QueryTabMappingParamDto; +import com.das.modules.node.domain.dto.SysCommunicationLinkDto; +import com.das.modules.node.domain.dto.SysNodeDto; +import com.das.modules.node.domain.vo.EquipmentVo; +import com.das.modules.node.domain.vo.SysCommunicationLinkVo; +import com.das.modules.node.domain.vo.SysNodeVo; +import com.das.modules.node.domain.vo.SysTabMappingVo; import com.das.modules.node.entity.SysCommunicationLink; import com.das.modules.node.entity.SysNode; import com.das.modules.node.entity.SysTabMapping; @@ -32,7 +33,6 @@ import com.das.modules.node.mapper.SysImpTabMappingMapper; import com.das.modules.node.mapper.SysNodeMapper; import com.das.modules.node.service.SysNodeService; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; import lombok.extern.slf4j.Slf4j; @@ -42,7 +42,10 @@ import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; import org.springframework.web.multipart.MultipartFile; -import java.io.*; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.*; @@ -74,9 +77,6 @@ public class SysNodeServiceImpl implements SysNodeService { @Autowired SysIotModelServiceMapper iotModelServiceMapper; - @Autowired - TerminalMessageEventHandler terminalMessageEventHandler; - @Override public List querySysNodeList() { diff --git a/das/src/main/java/com/das/modules/operation/service/OperationService.java b/das/src/main/java/com/das/modules/operation/service/OperationService.java index 4aaac176..7252e0ec 100644 --- a/das/src/main/java/com/das/modules/operation/service/OperationService.java +++ b/das/src/main/java/com/das/modules/operation/service/OperationService.java @@ -8,10 +8,10 @@ import com.das.common.exceptions.ServiceException; import com.das.common.utils.AdminRedisTemplate; import com.das.modules.auth.domain.vo.SysUserVo; import com.das.modules.node.constant.NodeConstant; -import com.das.modules.node.disruptor.TerminalMessageEventHandler; import com.das.modules.node.domain.bo.TerminalMessage; import com.das.modules.node.domain.vo.SysNodeVo; import com.das.modules.node.mapper.SysNodeMapper; +import com.das.modules.node.service.NodeMessageService; import com.das.modules.operation.domain.dto.CommandInfoDto; import com.das.modules.operation.entity.SysManualStatus; import com.das.modules.operation.entity.SysOperationLog; @@ -49,10 +49,10 @@ public class OperationService { private SysOperationLogMapper sysOperationLogMapper; @Autowired - TerminalMessageEventHandler terminalMessageEventHandler; + AdminRedisTemplate adminRedisTemplate; @Autowired - AdminRedisTemplate adminRedisTemplate; + NodeMessageService nodeMessageService; @@ -127,7 +127,7 @@ public class OperationService { .time(time) .data(jsonNode) .build(); - terminalMessageEventHandler.sendTerminalMessageWithResult(activeNodeId, configUpdate); + nodeMessageService.sendTerminalMessageWithResult(activeNodeId, configUpdate); } catch (Exception e) { throw new ServiceException("设备控制失败 "+ e); } diff --git a/das/src/main/java/com/das/modules/page/controller/StatisticalAnalysisController.java b/das/src/main/java/com/das/modules/page/controller/StatisticalAnalysisController.java new file mode 100644 index 00000000..26cc5aae --- /dev/null +++ b/das/src/main/java/com/das/modules/page/controller/StatisticalAnalysisController.java @@ -0,0 +1,55 @@ +package com.das.modules.page.controller; + + +import com.das.modules.page.domian.dto.TrendAnalyseDto; +import com.das.modules.page.domian.dto.TrendContrastDto; +import com.das.modules.page.service.StatisticalAnalysisService; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +@Slf4j +@RequestMapping("/api/page/statistical") +@RestController +public class StatisticalAnalysisController { + + @Autowired + private StatisticalAnalysisService statisticalAnalysisService; + + + + /** + * 趋势分析Excel导出 + * @param param 查询条件 + */ + @PostMapping("/trendAnalyseExport") + public void trendAnalyseExport(@RequestBody List param ,HttpServletRequest request, HttpServletResponse response) { + statisticalAnalysisService.trendAnalyseExport(param, request, response); + } + + /** + * 功率曲线Excel导出 + * @param param 查询条件 + */ + @PostMapping("/powerCurveExport") + public void powerCurveExport(@RequestBody TrendAnalyseDto param ,HttpServletRequest request, HttpServletResponse response) { + statisticalAnalysisService.powerCurveExport(param, request, response); + } + + + /** + * 趋势对比Excel导出 + * @param param 查询条件 + */ + @PostMapping("/trendContrastExport") + public void trendContrastExport(@RequestBody TrendContrastDto param , HttpServletRequest request, HttpServletResponse response) { + statisticalAnalysisService.trendContrastExport(param, request, response); + } +} diff --git a/das/src/main/java/com/das/modules/page/domian/dto/TrendAnalyseDto.java b/das/src/main/java/com/das/modules/page/domian/dto/TrendAnalyseDto.java new file mode 100644 index 00000000..079fe844 --- /dev/null +++ b/das/src/main/java/com/das/modules/page/domian/dto/TrendAnalyseDto.java @@ -0,0 +1,55 @@ +package com.das.modules.page.domian.dto; + +import com.das.modules.data.domain.SnapshotValueQueryParam; +import lombok.Data; + +import java.util.List; + +/** + * 时序数据查询实体 + */ +@Data +public class TrendAnalyseDto +{ + /** + * 开始时间 + */ + private String startTime; + + /** + * 结束时间 + */ + private String endTime; + + /** + * 间隔 + */ + private String interval; + + /** + * 填充模式 + */ + private String fill; + + + /** + * 时间条件名称 + */ + private String timeName; + + + /** + * 设备属性列表 + */ + private List devices; + + /** + * 制造商 + */ + private String madeinfactory; + + /** + * 模型 + */ + private String model; +} diff --git a/das/src/main/java/com/das/modules/page/domian/dto/TrendContrastDto.java b/das/src/main/java/com/das/modules/page/domian/dto/TrendContrastDto.java new file mode 100644 index 00000000..9bf82c14 --- /dev/null +++ b/das/src/main/java/com/das/modules/page/domian/dto/TrendContrastDto.java @@ -0,0 +1,43 @@ +package com.das.modules.page.domian.dto; + +import com.das.modules.data.domain.SnapshotValueQueryParam; +import lombok.Data; + +import java.util.List; + +/** + * 时序数据查询实体 + */ +@Data +public class TrendContrastDto +{ + /** + * 开始时间 + */ + private String startTime; + + /** + * 结束时间 + */ + private String endTime; + + /** + * 间隔 + */ + private String interval; + + /** + * 填充模式 + */ + private String fill; + + + + + /** + * 设备属性列表 + */ + private List devices; + + +} diff --git a/das/src/main/java/com/das/modules/page/service/StatisticalAnalysisService.java b/das/src/main/java/com/das/modules/page/service/StatisticalAnalysisService.java new file mode 100644 index 00000000..ec4e5193 --- /dev/null +++ b/das/src/main/java/com/das/modules/page/service/StatisticalAnalysisService.java @@ -0,0 +1,30 @@ +package com.das.modules.page.service; + +import com.das.modules.page.domian.dto.TrendAnalyseDto; +import com.das.modules.page.domian.dto.TrendContrastDto; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; + +import java.util.List; + +public interface StatisticalAnalysisService { + + /** + * 趋势分析Excel导出 + * @param param 查询条件 + */ + void trendAnalyseExport(List param, HttpServletRequest request, HttpServletResponse response); + + /** + * 功率曲线Excel导出 + * @param param 查询条件 + */ + void powerCurveExport(TrendAnalyseDto param, HttpServletRequest request, HttpServletResponse response); + + /** + * 趋势对比Excel导出 + * @param param 查询条件 + */ + void trendContrastExport(TrendContrastDto param, HttpServletRequest request, HttpServletResponse response); + +} diff --git a/das/src/main/java/com/das/modules/page/service/impl/HomeServiceImpl.java b/das/src/main/java/com/das/modules/page/service/impl/HomeServiceImpl.java index 37fba82f..d07a8391 100644 --- a/das/src/main/java/com/das/modules/page/service/impl/HomeServiceImpl.java +++ b/das/src/main/java/com/das/modules/page/service/impl/HomeServiceImpl.java @@ -69,7 +69,8 @@ public class HomeServiceImpl implements HomeService { attributesList.add("ikwhthisday"); //是否锁定 attributesList.add("Locked"); - + //叶轮转速 + attributesList.add("iRotorSpeed"); for (SysEquipmentVo item : sysEquipmentVos) { //构建查询属性参数 SnapshotValueQueryParam snapshotValueQueryParam = new SnapshotValueQueryParam(); diff --git a/das/src/main/java/com/das/modules/page/service/impl/StatisticalAnalysisServiceImpl.java b/das/src/main/java/com/das/modules/page/service/impl/StatisticalAnalysisServiceImpl.java new file mode 100644 index 00000000..3b1d80f1 --- /dev/null +++ b/das/src/main/java/com/das/modules/page/service/impl/StatisticalAnalysisServiceImpl.java @@ -0,0 +1,559 @@ +package com.das.modules.page.service.impl; + +import cn.hutool.core.io.IoUtil; +import cn.hutool.core.util.RandomUtil; +import cn.hutool.poi.excel.ExcelUtil; +import cn.hutool.poi.excel.ExcelWriter; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.das.common.exceptions.ServiceException; +import com.das.common.utils.BeanCopyUtils; +import com.das.modules.curve.domain.entity.CurveItemEntity; +import com.das.modules.curve.service.TheoreticalPowerCurveService; +import com.das.modules.data.domain.SnapshotValueQueryParam; +import com.das.modules.data.domain.TSValueQueryParam; +import com.das.modules.data.service.DataService; +import com.das.modules.equipment.entity.SysIotModelField; +import com.das.modules.equipment.mapper.SysIotModelFieldMapper; +import com.das.modules.page.domian.dto.TrendAnalyseDto; +import com.das.modules.page.domian.dto.TrendContrastDto; +import com.das.modules.page.service.StatisticalAnalysisService; +import jakarta.servlet.ServletOutputStream; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; +import org.apache.poi.ss.usermodel.HorizontalAlignment; +import org.apache.poi.ss.usermodel.VerticalAlignment; +import org.apache.poi.ss.usermodel.Workbook; +import org.apache.poi.xssf.usermodel.XSSFClientAnchor; +import org.apache.poi.xssf.usermodel.XSSFDrawing; +import org.apache.poi.xssf.usermodel.XSSFSheet; +import org.jfree.chart.ChartFactory; +import org.jfree.chart.ChartUtils; +import org.jfree.chart.JFreeChart; +import org.jfree.chart.axis.CategoryAxis; +import org.jfree.chart.axis.CategoryLabelPositions; +import org.jfree.chart.plot.CategoryPlot; +import org.jfree.chart.plot.PlotOrientation; +import org.jfree.chart.renderer.category.CategoryItemRenderer; +import org.jfree.chart.title.LegendTitle; +import org.jfree.chart.title.TextTitle; +import org.jfree.data.category.DefaultCategoryDataset; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.awt.*; +import java.io.File; +import java.io.IOException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; +import java.util.List; +import java.util.*; +import java.util.stream.Collectors; + +@Service +public class StatisticalAnalysisServiceImpl implements StatisticalAnalysisService { + + @Autowired + DataService dataService; + + @Autowired + private TheoreticalPowerCurveService theoreticalPowerCurveService; + + @Autowired + private SysIotModelFieldMapper sysIotModelFieldMapper; + + private final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm"); + + /** + * 趋势分析Excel导出 + * + * @param param 查询条件 + */ + @Override + public void trendAnalyseExport(List param, HttpServletRequest request, HttpServletResponse response) { + //根据条件获取历史数据 + List>>> mapsList = new ArrayList<>(); + for (TrendAnalyseDto trendAnalyseDto : param) { + TSValueQueryParam tsValueQueryParam = new TSValueQueryParam(); + BeanCopyUtils.copy(trendAnalyseDto,tsValueQueryParam); + Map>> resultMap = dataService.queryTimeSeriesValues(tsValueQueryParam); + mapsList.add(resultMap); + } + //获取Excel的列 + LinkedHashMap map = getTrendColumnName(param); + List> dataList = new ArrayList<>(); + //图表数据集 + DefaultCategoryDataset dataset = new DefaultCategoryDataset(); + // 遍历数据,填充Excel和图表数据集 + setTrendAnalyseExcelValue(mapsList, dataList, map, dataset); + ExcelWriter writer = ExcelUtil.getWriter(RandomUtil.randomInt(100, 1000) + "statistics" + ".xlsx"); + //设置Excel样式 + setExcelStyle(writer, map, dataList); + // 使用JFreeChart生成折线图 + createChart(dataList, writer, dataset, "趋势分析"); + //下载Excel + downloadExcel(response, writer,"趋势分析"); + } + + /** + * 功率曲线Excel导出 + * + * @param param 查询条件 + */ + @Override + public void powerCurveExport(TrendAnalyseDto param, HttpServletRequest request, HttpServletResponse response) { + //获取理论数据 + List curveItemEntitieList = theoreticalPowerCurveService. + queryCurveItemByParent(param.getMadeinfactory(), param.getModel()); + //根据条件获取历史数据 + TSValueQueryParam tsValueQueryParam = new TSValueQueryParam(); + BeanCopyUtils.copy(param,tsValueQueryParam); + Map>> resultMap = dataService.queryTimeSeriesValues(tsValueQueryParam); + List> dataList = new ArrayList<>(); + //填充功率曲线,Excel的数据集 + setPowerCurveExcelValue(resultMap, dataList, curveItemEntitieList); + //获取功率曲线的列 + LinkedHashMap map = getPowerCurveColumnName(); + //获取图表数据集 + DefaultCategoryDataset dataset = getDefaultCategoryDataset(dataList); + ExcelWriter writer = ExcelUtil.getWriter(RandomUtil.randomInt(100, 1000) + "statistics" + ".xlsx"); + //设置Excel样式 + setExcelStyle(writer, map, dataList); + //使用JFreeChart生成图表 + createChart(dataList, writer, dataset, "功率曲线分析"); + //下载Excel + downloadExcel(response, writer,"功率曲线分析"); + } + + + /** + * 趋势对比Excel导出 + * @param param 查询条件 + * @return TD数据库数据 + */ + @Override + public void trendContrastExport(TrendContrastDto param, HttpServletRequest request, HttpServletResponse response) { + //根据条件获取历史数据 + TSValueQueryParam tsValueQueryParam = new TSValueQueryParam(); + BeanCopyUtils.copy(param,tsValueQueryParam); + Map>> maps = dataService.queryTimeSeriesValues(tsValueQueryParam); + //自定义别名 别名的key和实体类中的名称要对应上! + LinkedHashMap map = gettrendContrastColumnName(param); + List> dataList = new ArrayList<>(); + //图表数据集 + DefaultCategoryDataset dataset = new DefaultCategoryDataset(); + // 遍历数据,将数据添加到dataList中 + setTrendContrastExcelValue(maps, dataList, map, dataset); + ExcelWriter writer = ExcelUtil.getWriter(RandomUtil.randomInt(100, 1000) + "statistics" + ".xlsx"); + //设置Excel样式 + setExcelStyle(writer, map, dataList); + // 使用JFreeChart生成折线图 + createChart(dataList, writer, dataset, "趋势对比"); + //下载Excel + downloadExcel(response, writer,"趋势对比"); + } + + /** + * 获取图表数据集 + * + * @param dataList 数据集 + * @return 图表数据集 + */ + private DefaultCategoryDataset getDefaultCategoryDataset(List> dataList) { + DefaultCategoryDataset dataset = new DefaultCategoryDataset(); + for (Map mapData : dataList) { + Object time = mapData.get("time"); + Object speed = mapData.get("iWindSpeed"); + Object power = mapData.get("iGenPower"); + Object theorySpeed = mapData.get("theoryIWindSpeed"); + Object theoryPower = mapData.get("theoryIGenPower"); + if (speed != null && power != null) { + dataset.addValue((Float) speed, "风速实际值", (String) time); + dataset.addValue((Float) power, "功率实际值", (String) time); + } + if (theorySpeed != null && theoryPower != null) { + dataset.addValue((Float) theorySpeed, "风速理论值", (String) time); + dataset.addValue((Float) theoryPower, "功率理论值", (String) time); + } + } + return dataset; + } + + + /** + * 设置Excel样式 + * + * @param writer ExcelWriter + * @param map 表格的列 + * @param dataList Excel数据集 + */ + private void setExcelStyle(ExcelWriter writer, LinkedHashMap map, List> dataList) { + //自定义别名 别名的key和实体类中的名称要对应上! + writer.setHeaderAlias(map); + //水平居中对齐,垂直中间对齐 + writer.getStyleSet().setAlign(HorizontalAlignment.CENTER, VerticalAlignment.CENTER); + //所有单元格宽25个字符 + writer.setColumnWidth(-1, 25); + // 一次性写出内容,使用默认样式,强制输出标题 + writer.write(dataList, true); + } + + /** + * 趋势分析-遍历数据,填充Excel和图表数据集 + * + * @param mapsList 测点历史数据 + * @param dataList Excel数据集 + * @param map 表格的列 + * @param dataset 图表数据集 + */ + private void setTrendAnalyseExcelValue(List>>> mapsList, + List> dataList, + LinkedHashMap map, DefaultCategoryDataset dataset) { + for (int i = 0; i < mapsList.size(); i++) { + List timesListstr = new ArrayList<>(); + List valuesList = new ArrayList<>(); + int num = i + 1; + String pointName = null; + Map>> stringMapMap = mapsList.get(i); + for (Map.Entry>> stringMapEntry : stringMapMap.entrySet()) { + for (Map.Entry> mapEntry : stringMapEntry.getValue().entrySet()) { + pointName = mapEntry.getKey(); + for (Map.Entry stringObjectEntry : mapEntry.getValue().entrySet()) { + String key1 = stringObjectEntry.getKey(); + if (key1.equals("times")) { + List times = (List) stringObjectEntry.getValue(); + List liststr = times.stream() + .map(timestamp -> new Date(timestamp)) + .map(date -> sdf.format(date)) + .toList(); + timesListstr.addAll(liststr); + } + if (key1.equals("values")) { + List values = (List) stringObjectEntry.getValue(); + valuesList.addAll(values); + } + } + } + } + String pointNameKey = pointName + num; + String timeKey = "time" + num; + //添加图表的数据集 + for (int j = 0; j < timesListstr.size(); j++) { + dataset.addValue(valuesList.get(j), map.get(pointNameKey), timesListstr.get(j)); + } + if (i == 0) { + for (int j = 0; j < timesListstr.size(); j++) { + Map dataMap = new HashMap<>(); + dataMap.put(timeKey, timesListstr.get(j)); + dataMap.put(pointNameKey, valuesList.get(j)); + dataList.add(dataMap); + } + } else { + for (int j = 0; j < timesListstr.size(); j++) { + Map stringObjectMap = dataList.get(j); + stringObjectMap.put(timeKey, timesListstr.get(j)); + stringObjectMap.put(pointNameKey, valuesList.get(j)); + } + } + } + } + + /** + * 趋势对比填充Excel数据集 + * + * @param dataList Excel数据集 + * @param map 表格的列 + * @param dataset Excel数据集 + */ + private void setTrendContrastExcelValue(Map>> maps, + List> dataList, + LinkedHashMap map, DefaultCategoryDataset dataset) { + for (Map.Entry>> stringMapEntry : maps.entrySet()) { + int flagNum = 0; + for (Map.Entry> mapEntry : stringMapEntry.getValue().entrySet()) { + List timesListstr = new ArrayList<>(); + List valuesList = new ArrayList<>(); + String key = mapEntry.getKey(); + for (Map.Entry stringObjectEntry : mapEntry.getValue().entrySet()) { + String key1 = stringObjectEntry.getKey(); + if (key1.equals("times")) { + List times = (List) stringObjectEntry.getValue(); + List liststr = times.stream() + .map(timestamp -> new Date(timestamp)) + .map(date -> sdf.format(date)) + .toList(); + timesListstr.addAll(liststr); + } + if (key1.equals("values")) { + List values = (List) stringObjectEntry.getValue(); + valuesList.addAll(values); + } + } + if (flagNum == 0) { + for (int j = 0; j < timesListstr.size(); j++) { + Map dataMap = new HashMap<>(); + dataMap.put("time", timesListstr.get(j)); + dataMap.put(key, valuesList.get(j)); + dataList.add(dataMap); + } + } else { + for (int j = 0; j < timesListstr.size(); j++) { + Map stringObjectMap = dataList.get(j); + stringObjectMap.put(key, valuesList.get(j)); + } + } + flagNum++; + //生成图表的数据集 + for (int j = 0; j < timesListstr.size(); j++) { + dataset.addValue(valuesList.get(j), map.get(key), timesListstr.get(j)); + } + } + } + + } + + /** + * 趋势-获取表格的列 + * * + * + * @param param 查询条件 + * @return 表格的列 + */ + private LinkedHashMap getTrendColumnName(List param) { + LinkedHashMap map = new LinkedHashMap<>(); + for (int i = 0; i < param.size(); i++) { + TrendAnalyseDto trendAnalyseDto = param.get(i); + String timeName = trendAnalyseDto.getTimeName(); + int num = i + 1; + map.put("time" + num, "时间" + num); + for (SnapshotValueQueryParam device : trendAnalyseDto.getDevices()) { + //获取属性名称 + for (String attribute : device.getAttributes()) { + map.put(attribute + num, timeName); + } + } + } + return map; + } + + /** + * 趋势对比-获取表格的列 + * + * @param param 查询条件 + * @return 表格的列 + */ + private LinkedHashMap gettrendContrastColumnName(TrendContrastDto param) { + LinkedHashMap map = new LinkedHashMap<>(); + map.put("time", "时间"); + List strList = new ArrayList<>(); + for (SnapshotValueQueryParam device : param.getDevices()) { + strList.addAll(device.getAttributes()); + } + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.in("attribute_code", strList); + List sysIotModelFields = sysIotModelFieldMapper.selectVoList(queryWrapper); + for (SysIotModelField sysIotModelField : sysIotModelFields) { + map.put(sysIotModelField.getAttributeCode(), sysIotModelField.getAttributeName()); + } + return map; + } + + /** + * 获取功率曲线的列 + * + * @return + */ + private LinkedHashMap getPowerCurveColumnName() { + LinkedHashMap map = new LinkedHashMap<>(); + map.put("time", "时间"); + map.put("iWindSpeed", "风速"); + map.put("iGenPower", "功率"); + map.put("theoryIWindSpeed", "理论风速"); + map.put("theoryIGenPower", "理论功率"); + return map; + } + + + /** + * 功率曲线-遍历数据,填充Excel数据集 + * + * @param maps 测点历史数据 + * @param dataList Excel数据集 + * @param curveItemEntitieList 理论数据 + */ + private void setPowerCurveExcelValue(Map>> maps, + List> dataList, List curveItemEntitieList) { + for (Map.Entry>> stringMapEntry : maps.entrySet()) { + int flagNum = 0; + for (Map.Entry> mapEntry : stringMapEntry.getValue().entrySet()) { + List timesListstr = new ArrayList<>(); + List valuesList = new ArrayList<>(); + String key = mapEntry.getKey(); + for (Map.Entry stringObjectEntry : mapEntry.getValue().entrySet()) { + String key1 = stringObjectEntry.getKey(); + if (key1.equals("times")) { + List times = (List) stringObjectEntry.getValue(); + List liststr = times.stream() + .map(timestamp -> new Date(timestamp)) + .map(date -> sdf.format(date)) + .toList(); + timesListstr.addAll(liststr); + } + if (key1.equals("values")) { + List values = (List) stringObjectEntry.getValue(); + valuesList.addAll(values); + } + } + if (flagNum == 0) { + for (int j = 0; j < timesListstr.size(); j++) { + Map dataMap = new HashMap<>(); + dataMap.put("time", timesListstr.get(j)); + dataMap.put(key, valuesList.get(j)); + dataList.add(dataMap); + } + } else { + for (int j = 0; j < timesListstr.size(); j++) { + Map stringObjectMap = dataList.get(j); + stringObjectMap.put(key, valuesList.get(j)); + } + } + flagNum++; + } + } + //转换为map + List> listMap = curveItemEntitieList.stream() + .map(entity -> { + Map map = new HashMap<>(); + map.put("theoryIWindSpeed", entity.getPower()); + map.put("theoryIGenPower", entity.getSpeed()); + return map; + }) + .collect(Collectors.toList()); + if (listMap.size() > dataList.size()) { + for (int i = 0; i < listMap.size(); i++) { + Map stringObjectMap1 = listMap.get(i); + if (dataList.size() > i) { + Map stringObjectMap = dataList.get(i); + stringObjectMap.put("theoryIWindSpeed", stringObjectMap1.get("theoryIWindSpeed")); + stringObjectMap.put("theoryIGenPower", stringObjectMap1.get("theoryIGenPower")); + listMap.remove(i); + } + } + dataList.addAll(listMap); + } else { + for (int i = 0; i < dataList.size(); i++) { + Map dataMap = dataList.get(i); + if (listMap.size() > i) { + Map stringObjectMap1 = listMap.get(i); + dataMap.put("theoryIWindSpeed", stringObjectMap1.get("theoryIWindSpeed")); + dataMap.put("theoryIGenPower", stringObjectMap1.get("theoryIGenPower")); + } + } + } + } + + + /** + * 下载Excel + * + * @param response 响应对象 + * @param writer Excel对象 + */ + private void downloadExcel(HttpServletResponse response, ExcelWriter writer,String title) { + response.setCharacterEncoding(StandardCharsets.UTF_8.name()); + response.setContentType("application/vnd.ms-excel"); + // 清除缓存 + response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate"); + response.setHeader("Pragma", "no-cache"); + response.setDateHeader("Expires", 0); + try { + // 设置请求头属性 + response.setHeader("Content-Disposition", "attachment;filename=" + URLEncoder.encode(title+".xls", StandardCharsets.UTF_8)); + ServletOutputStream out = response.getOutputStream(); + // 写出到文件 + writer.flush(out, true); + // 关闭writer,释放内存 + writer.close(); + // 此处记得关闭输出Servlet流 + IoUtil.close(out); + } catch (Exception e) { + throw new ServiceException("文件下载失败==" + e); + } + } + + /** + * 使用JFreeChart生成折线图 + * + * @param data excel数据集 + * @param writer excel对象 + * @param dataset 图表数据集 + * @param titleStr 标题 + */ + private void createChart(List> data, ExcelWriter writer, + DefaultCategoryDataset dataset, String titleStr) { + // 获取Sheet对象 + XSSFSheet xssfSheet = (XSSFSheet) writer.getSheet(); + Workbook workbook = writer.getWorkbook(); + JFreeChart chart = ChartFactory.createLineChart( + titleStr, // 图表标题 + "", // 横轴标签 + "", // 纵轴标签 + dataset, // 数据集 + PlotOrientation.VERTICAL, // 图表方向 + true, // 是否显示图例 + true, // 是否使用工具提示 + false // 是否生成URL链接 + ); + // 设置图表标题的字体 + TextTitle title = chart.getTitle(); + title.setFont(new java.awt.Font("SimSun", java.awt.Font.BOLD, 16)); + // 获取图表的绘图区域 + CategoryPlot plot = chart.getCategoryPlot(); + // 设置横轴标签 + CategoryAxis domainAxis = plot.getDomainAxis(); + domainAxis.setLabelFont(new java.awt.Font("SimSun", java.awt.Font.PLAIN, 12)); + domainAxis.setMaximumCategoryLabelLines(1); // 可以控制标签行数 + domainAxis.setCategoryMargin(3); // 控制类别之间的间距 + domainAxis.setCategoryLabelPositions(CategoryLabelPositions.UP_45); // 旋转横轴标签为45度,90度为:UP_90 + domainAxis.setLabelFont(new Font("SansSerif", Font.PLAIN, 7));//调整字体大小 +// if (data.size() > 50) { +// domainAxis.setVisible(false); // 隐藏横坐标 +// } + // 设置图例的字体 + LegendTitle legend = chart.getLegend(); + if (legend != null) { + legend.setItemFont(new java.awt.Font("SimSun", java.awt.Font.PLAIN, 12)); + } + // 设置绘图区域的背景颜色 + plot.setBackgroundPaint(Color.WHITE); + // 设置绘图区域的边框 + plot.setOutlinePaint(Color.LIGHT_GRAY); + plot.setOutlineVisible(true); + // 设置网格线的颜色 + plot.setDomainGridlinePaint(Color.LIGHT_GRAY); + plot.setRangeGridlinePaint(Color.LIGHT_GRAY); + // 设置线条的宽度 + CategoryItemRenderer renderer1 = plot.getRenderer(); + renderer1.setSeriesStroke(0, new BasicStroke(2.0f)); // 设置线条宽度 + // 将图表保存为 PNG 文件 + String chartFilePath = "lineChart.png"; + // 调整图表尺寸 + int width = 750; + int height = 400; + try { + ChartUtils.saveChartAsPNG(new File(chartFilePath), chart, width, height); + byte[] bytes = java.nio.file.Files.readAllBytes(java.nio.file.Paths.get(chartFilePath)); + int pictureIdx = workbook.addPicture(bytes, Workbook.PICTURE_TYPE_PNG); + // 计算数据的最后一行,创建图表插入位置的锚点 + XSSFClientAnchor anchor = new XSSFClientAnchor(0, 1, 0, 1, 0, data.size() + 2, 15, data.size() + 30); + XSSFDrawing drawing = xssfSheet.createDrawingPatriarch(); + drawing.createPicture(anchor, pictureIdx); + } catch (IOException e) { + throw new ServiceException("图表保存失败==" + e); + } + + } + + +} + diff --git a/das/src/main/resources/application.yml b/das/src/main/resources/application.yml index 5dba9c52..3988874e 100644 --- a/das/src/main/resources/application.yml +++ b/das/src/main/resources/application.yml @@ -98,4 +98,10 @@ logging: tdengine: password: taosdata url: jdbc:TAOS-RS://192.168.109.160:6041/das - username: root \ No newline at end of file + username: root + +minio: + url: http://192.168.109.187:9000 + bucket: das + accessKey: das + secretKey: zaq12WSX \ No newline at end of file diff --git a/ui/dasadmin/src/api/backend/statAnalysis/request.ts b/ui/dasadmin/src/api/backend/statAnalysis/request.ts index a3b1f123..c615dfd7 100644 --- a/ui/dasadmin/src/api/backend/statAnalysis/request.ts +++ b/ui/dasadmin/src/api/backend/statAnalysis/request.ts @@ -36,49 +36,85 @@ export const runAirBlowerReq = ( }) } -export const getReportTemplateListReq = (data: { category: '单机报表' | '多机报表', pageNum: number, pageSize: number }) => { - return createAxios { + return createAxios< + never, + Promise<{ + code: number msg: string - }, - success: boolean - }>>({ + data: { + total: number + rows: { id: string; category: '单机报表' | '多机报表'; template: string }[] + code: number + msg: string + } + success: boolean + }> + >({ url: '/api/page/report/template/getList', method: 'post', - data + data, }) } -export const addReportTemplateListReq = (data: { category: '单机报表' | '多机报表', template: string }) => { - return createAxios>({ +export const addReportTemplateListReq = (data: { category: '单机报表' | '多机报表'; template: string }) => { + return createAxios< + never, + Promise<{ + code: number + msg: string + data: { + id: string + category: '单机报表' | '多机报表' + template: string + }[] + success: boolean + }> + >({ url: '/api/page/report/template/add', method: 'post', - data + data, }) } export const delReportTemplateListReq = (data: { id: string }) => { - return createAxios>({ + return createAxios< + never, + Promise<{ + code: number + msg: string + success: boolean + }> + >({ url: '/api/page/report/template/del', method: 'post', - data + data, + }) +} + +export function powerCurveExport(params: object = {}) { + return createAxios({ + url: '/api/page/statistical/powerCurveExport', + method: 'POST', + data: params, + responseType: 'blob', + }) +} + +export function trendContrastExport(params: object = {}) { + return createAxios({ + url: '/api/page/statistical/trendContrastExport', + method: 'POST', + data: params, + responseType: 'blob', + }) +} + +export function trendAnalyseExport(params: object = {}) { + return createAxios({ + url: '/api/page/statistical/trendAnalyseExport', + method: 'POST', + data: params, + responseType: 'blob', }) } diff --git a/ui/dasadmin/src/layouts/backend/components/aside.vue b/ui/dasadmin/src/layouts/backend/components/aside.vue index 0297ec81..96ad7965 100644 --- a/ui/dasadmin/src/layouts/backend/components/aside.vue +++ b/ui/dasadmin/src/layouts/backend/components/aside.vue @@ -18,7 +18,7 @@ diff --git a/ui/dasadmin/src/layouts/backend/components/logo.vue b/ui/dasadmin/src/layouts/backend/components/logo.vue index 6f64a0e6..c13b01a0 100644 --- a/ui/dasadmin/src/layouts/backend/components/logo.vue +++ b/ui/dasadmin/src/layouts/backend/components/logo.vue @@ -23,9 +23,24 @@ import { closeShade } from '/@/utils/pageShade' import { Session } from '/@/utils/storage' import { BEFORE_RESIZE_LAYOUT } from '/@/stores/constant/cacheKey' import { setNavTabsWidth } from '/@/utils/layout' +import { ref, onMounted, onUnmounted } from 'vue' const config = useConfig() // const siteConfig = useSiteConfig() +const headerHeight = ref(config.headerHeight()) + +onMounted(() => { + // 监听窗口大小变化 + window.addEventListener('resize', resizeHandler) +}) + +onUnmounted(() => { + // 组件卸载前移除监听器 + window.removeEventListener('resize', resizeHandler) +}) +const resizeHandler = () => { + headerHeight.value = config.headerHeight() +} const onMenuCollapse = function () { if (config.layout.shrink && !config.layout.menuCollapse) { @@ -49,7 +64,7 @@ const onMenuCollapse = function () { diff --git a/ui/dasadmin/src/views/backend/home/windMatrix.vue b/ui/dasadmin/src/views/backend/home/windMatrix.vue index 872ec670..e5c5c52f 100644 --- a/ui/dasadmin/src/views/backend/home/windMatrix.vue +++ b/ui/dasadmin/src/views/backend/home/windMatrix.vue @@ -68,7 +68,7 @@