This commit is contained in:
高云鹏 2024-08-12 17:09:04 +08:00
commit 85d2ffa9c2
41 changed files with 1364 additions and 705 deletions

13
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,13 @@
{
"files.associations": {
"xstring": "cpp",
"unordered_map": "cpp",
"locale": "cpp",
"xlocale": "cpp",
"xlocmes": "cpp",
"xlocmon": "cpp",
"xlocnum": "cpp",
"xloctime": "cpp",
"xhash": "cpp"
}
}

View File

@ -4,7 +4,7 @@ set (VERSION 1.0.1)
# set (CMAKE_CXX_STANDARD 11)
option (USE_MQTT "use mqtt protocol" ON)
option (USE_WEBSOCKET "use websocket" ON)
option (USE_WEBSOCKET "use websocket" OFF)
option (USE_SQLITE3 "use sqlite3" ON)
if (USE_SQLITE3)
@ -148,7 +148,7 @@ if (USE_WEBSOCKET)
set (APP_LIBS ${APP_LIBS} ssl crypto)
add_definitions(-DUSE_WEBSOCKET)
add_definitions(-DNOPOLL_OS_UNIX=1)
#add_definitions(-DSHOW_DEBUG_LOG)
add_definitions(-DSHOW_DEBUG_LOG)
add_definitions(-DNOPOLL_HAVE_VASPRINTF=1)
add_definitions(-DNOPOLL_HAVE_TLSv10_ENABLED=1)
add_definitions(-DNOPOLL_HAVE_TLSv11_ENABLED=1)
@ -158,50 +158,50 @@ endif ()
if (USE_MQTT)
set(APP_SRCS ${APP_SRCS}
third_party/mqtt/actions.c
third_party/mqtt/callbacks.c
third_party/mqtt/connect.c
third_party/mqtt/handle_auth.c
third_party/mqtt/handle_connack.c
third_party/mqtt/handle_disconnect.c
third_party/mqtt/handle_ping.c
third_party/mqtt/handle_pubackcomp.c
third_party/mqtt/handle_publish.c
third_party/mqtt/handle_pubrec.c
third_party/mqtt/handle_pubrel.c
third_party/mqtt/handle_suback.c
third_party/mqtt/handle_unsuback.c
third_party/mqtt/helpers.c
third_party/mqtt/logging_mosq.c
third_party/mqtt/loop.c
third_party/mqtt/memory_mosq.c
third_party/mqtt/messages_mosq.c
third_party/mqtt/misc_mosq.c
third_party/mqtt/mosquitto.c
third_party/mqtt/net_mosq_ocsp.c
third_party/mqtt/net_mosq.c
third_party/mqtt/options.c
third_party/mqtt/packet_datatypes.c
third_party/mqtt/packet_mosq.c
third_party/mqtt/property_mosq.c
third_party/mqtt/read_handle.c
third_party/mqtt/send_connect.c
third_party/mqtt/send_disconnect.c
third_party/mqtt/send_mosq.c
third_party/mqtt/send_publish.c
third_party/mqtt/send_subscribe.c
third_party/mqtt/send_unsubscribe.c
third_party/mqtt/send_mosq.c
third_party/mqtt/socks_mosq.c
third_party/mqtt/srv_mosq.c
third_party/mqtt/strings_mosq.c
third_party/mqtt/thread_mosq.c
third_party/mqtt/time_mosq.c
third_party/mqtt/tls_mosq.c
third_party/mqtt/utf8_mosq.c
third_party/mqtt/util_mosq.c
third_party/mqtt/util_topic.c
third_party/mqtt/will_mosq.c)
third_party/mqtt/actions.c
third_party/mqtt/callbacks.c
third_party/mqtt/connect.c
third_party/mqtt/handle_auth.c
third_party/mqtt/handle_connack.c
third_party/mqtt/handle_disconnect.c
third_party/mqtt/handle_ping.c
third_party/mqtt/handle_pubackcomp.c
third_party/mqtt/handle_publish.c
third_party/mqtt/handle_pubrec.c
third_party/mqtt/handle_pubrel.c
third_party/mqtt/handle_suback.c
third_party/mqtt/handle_unsuback.c
third_party/mqtt/helpers.c
third_party/mqtt/logging_mosq.c
third_party/mqtt/loop.c
third_party/mqtt/memory_mosq.c
third_party/mqtt/messages_mosq.c
third_party/mqtt/misc_mosq.c
third_party/mqtt/mosquitto.c
third_party/mqtt/net_mosq_ocsp.c
third_party/mqtt/net_mosq.c
third_party/mqtt/options.c
third_party/mqtt/packet_datatypes.c
third_party/mqtt/packet_mosq.c
third_party/mqtt/property_mosq.c
third_party/mqtt/read_handle.c
third_party/mqtt/send_connect.c
third_party/mqtt/send_disconnect.c
third_party/mqtt/send_mosq.c
third_party/mqtt/send_publish.c
third_party/mqtt/send_subscribe.c
third_party/mqtt/send_unsubscribe.c
third_party/mqtt/send_mosq.c
third_party/mqtt/socks_mosq.c
third_party/mqtt/srv_mosq.c
third_party/mqtt/strings_mosq.c
third_party/mqtt/thread_mosq.c
third_party/mqtt/time_mosq.c
third_party/mqtt/tls_mosq.c
third_party/mqtt/utf8_mosq.c
third_party/mqtt/util_mosq.c
third_party/mqtt/util_topic.c
third_party/mqtt/will_mosq.c)
# set (APP_SRCS ${APP_SRCS} hwmqtt/hwmqtt.cpp)
add_definitions(-DUSE_MQTT)
add_definitions(-DWITH_SOCKS)
@ -228,6 +228,7 @@ if (UNIX)
set (APP_LIBS ${APP_LIBS} pthread)
set (APP_LIBS ${APP_LIBS} dl)
set (APP_LIBS ${APP_LIBS} rt)
#set (APP_LIBS ${APP_LIBS} websockets)
endif()
add_executable (application ${APP_SRCS})

File diff suppressed because it is too large Load Diff

View File

@ -9,16 +9,13 @@
static void* main_run_process(void* param)
{
CProcess* proc = (CProcess *)param;
DWORD old_ticks = 0;
while (TRUE)
{
while (TRUE) {
if (!proc->m_bRunFlag) break;
proc->Run();
if (old_ticks != system32.ticks)
{
if (old_ticks != system32.ticks) {
old_ticks = system32.ticks;
proc->OnTimer(); //every 10ms
}

View File

@ -2255,44 +2255,49 @@ void* idle_process(void* param)
yk();
yt();
//操作保存系统操作Log
if ((system32.timers % 3600) == 0)
{ //过整点保存
if (system32.log_enabled)
{
if ((system32.timers % 3600) == 0) { //过整点保存
if (system32.log_enabled) {
yxbw.DumpYXBW();
soe.DumpSOE();
yklog.DumpYKLog();
ytlog.DumpYTLog();
}
yxbw_save = yxbw.GetSavePos();
if (yxbw_load != yxbw_save)
{ //有变位信息
if (yxbw_load != yxbw_save) { //有变位信息
WriteDatabaseCFG();
yxbw_load = yxbw_save;
}
}
for (int i = 0; i < PROCESSES_NUM; i++)
{
for (int i = 0; i < PROCESSES_NUM; i++) {
config.processes[i].softdog++;
if (config.processes[i].softdog > PROCESS_WATCHDOG_TIME)
{
if (config.processes[i].softdog > PROCESS_WATCHDOG_TIME) {
config.processes[i].softdog = PROCESS_WATCHDOG_TIME;
}
else
{
} else {
}
}
for (int i = 0; i < UNIT_NUM; i++)
{
for (int i = 0; i < UNIT_NUM; i++) {
config.units[i].softdog++;
if (config.units[i].softdog > UNIT_WATCHDOG_TIME)
{
if (config.units[i].softdog > UNIT_WATCHDOG_TIME) {
config.units[i].softdog = UNIT_WATCHDOG_TIME;
config.units[i].value = SPI_ON;
}
else
{
} else {
config.units[i].value = SPI_OFF;
if ((config.units[i].state & 0x80) != 0x80) {
int j = 0;
for (j = 0; j < config.units[i].yxcount; j++) {
if ((config.units[i].yxs[j].qds & 0x80) == 0x80) break;
}
if (j < config.units[i].yxcount) {
continue;
}
for (j = 0; j < config.units[i].yccount; j++) {
if ((config.units[i].ycs[j].qds & 0x80) == 0x80) break;
}
if (j < config.units[i].yccount) {
continue;
}
config.units[i].state |= 0x80;
}
}
}
}

View File

@ -278,38 +278,33 @@ public:
if (point < 0 || point >= pUnit->yxcount) return;
udb = pUnit->yxs[point].order;
if (udb < 0 || udb >= DATABASE_YX_NUM)
{ //遥信点号不在数据库定义范围内,只刷新单元数据
if (pUnit->yxs[point].value != value)
{ //update value
if (pUnit->yxs[point].invert) {
value = !value;
}
if (udb < 0 || udb >= DATABASE_YX_NUM) { //遥信点号不在数据库定义范围内,只刷新单元数据
if (pUnit->yxs[point].value != value) { //update value
pUnit->yxs[point].value = value;
pUnit->yxs[point].update_time = system32.timers;
pUnit->yxs[point].yxbw = TRUE;
if (bAddYXBW)
{
if (bAddYXBW) {
yxbw.PushYXBW(system32.now, udb, value, qds, uid, point, YXBWT_AUTO);
}
}
}
else if (database.yxs[udb].value != value)
{ //update value
} else if (database.yxs[udb].value != value) { //update value
pUnit->yxs[point].value = value;
pUnit->yxs[point].yxbw = TRUE;
pUnit->yxs[point].update_time = system32.timers;
database.yxs[udb].value = value;
database.yxs[udb].bw_time = system32.timers; //设置刷新时间
database.yxs[udb].update_time = system32.timers; //设置刷新时间
database.yxs[udb].bw_time = system32.timers; //设置刷新时间
database.yxs[udb].update_time = system32.timers; //设置刷新时间
database.yxs[udb].op_unit = uid;
database.yxs[udb].qds = qds;
if (bAddYXBW)
{
if (bAddYXBW) {
yxbw.PushYXBW(system32.now, udb, value, qds, uid, point, YXBWT_AUTO);
}
}
else
{
if (pUnit->yxs[point].value != value)
{
} else {
if (pUnit->yxs[point].value != value) {
pUnit->yxs[point].value = value;
pUnit->yxs[point].update_time = system32.timers;
//若数据库中的遥信位置是正确的则不额外产生变位信息
@ -389,23 +384,17 @@ public:
if (point < 0 || point >= pUnit->yccount) return;
udb = pUnit->ycs[point].order;
if (udb < 0 || udb >= DATABASE_YC_NUM)
{ //遥测点号不在数据库定义范围内,只刷新本单元数据
if (pUnit->ycs[point].value != value)
{ //update value
if (udb < 0 || udb >= DATABASE_YC_NUM) { //遥测点号不在数据库定义范围内,只刷新本单元数据
if (pUnit->ycs[point].value != value) { //update value
pUnit->ycs[point].value = value;
pUnit->ycs[point].update_time = system32.timers;
pUnit->ycs[point].ycbw = TRUE;
}
}
else if (database.ycs[udb].value != value)
{ //update value
} else if (database.ycs[udb].value != value) { //update value
pUnit->ycs[point].value = value;
pUnit->ycs[point].update_time = system32.timers;
if (pUnit->ycs[point].change_pos >= 0 && bAddYCBW)
{
if (abs(pUnit->ycs[point].value - database.ycs[udb].value) >= pUnit->ycs[point].change_pos)
{ //40码值变化量认为是遥测变位
if (pUnit->ycs[point].change_pos >= 0 && bAddYCBW) {
if (abs(pUnit->ycs[point].value - database.ycs[udb].value) >= pUnit->ycs[point].change_pos) { //40码值变化量认为是遥测变位
pUnit->ycs[point].ycbw = TRUE;
ycbw.PushYCBW(system32.now, udb, value, qds, uid, point, YCBWT_AUTO);
}
@ -414,11 +403,8 @@ public:
database.ycs[udb].op_unit = uid;
database.ycs[udb].update_time = system32.timers; //设置刷新时间
database.ycs[udb].qds = qds;
}
else
{
if (pUnit->ycs[point].value != value)
{
} else {
if (pUnit->ycs[point].value != value) {
pUnit->ycs[point].value = value;
pUnit->ycs[point].update_time = system32.timers;
pUnit->ycs[point].ycbw = TRUE;
@ -439,23 +425,17 @@ public:
udb = pUnit->ycs[point].order;
nvalue = (long)(pUnit->ycs[point].factor * value);
if (udb < 0 || udb >= DATABASE_YC_NUM)
{ //遥测点号不在数据库定义范围内,只刷新本单元数据
if (pUnit->ycs[point].value != nvalue)
{ //update value
if (udb < 0 || udb >= DATABASE_YC_NUM) { //遥测点号不在数据库定义范围内,只刷新本单元数据
if (pUnit->ycs[point].value != nvalue) { //update value
pUnit->ycs[point].value = nvalue;
pUnit->ycs[point].update_time = system32.timers;
pUnit->ycs[point].ycbw = TRUE;
}
}
else if (database.ycs[udb].value != nvalue)
{ //update value
} else if (database.ycs[udb].value != nvalue) { //update value
pUnit->ycs[point].value = nvalue;
pUnit->ycs[point].update_time = system32.timers;
if (pUnit->ycs[point].change_pos >= 0 && bAddYCBW)
{
if (abs(pUnit->ycs[point].value - database.ycs[udb].value) >= pUnit->ycs[point].change_pos)
{ //40码值变化量认为是遥测变位
if (pUnit->ycs[point].change_pos >= 0 && bAddYCBW) {
if (abs(pUnit->ycs[point].value - database.ycs[udb].value) >= pUnit->ycs[point].change_pos) { //40码值变化量认为是遥测变位
pUnit->ycs[point].ycbw = TRUE;
ycbw.PushYCBW(system32.now, udb, nvalue, qds, uid, point, YCBWT_AUTO);
}
@ -464,11 +444,8 @@ public:
database.ycs[udb].op_unit = uid;
database.ycs[udb].update_time = system32.timers; //设置刷新时间
database.ycs[udb].qds = qds;
}
else
{
if (pUnit->ycs[point].value != nvalue)
{
} else {
if (pUnit->ycs[point].value != nvalue) {
pUnit->ycs[point].value = nvalue;
pUnit->ycs[point].update_time = system32.timers;
pUnit->ycs[point].ycbw = TRUE;
@ -501,7 +478,7 @@ public:
pYC->update_time = database.ycs[udb].update_time;
pYC->qds = database.ycs[udb].qds;
}
if (pYC->factor > 1 && pUnit->type == 0x00)
if (pYC->factor > 1 && (pUnit->type & 0x0f) == 0x00)
{ //系数有效,且为转发单元
value /= pYC->factor;
}
@ -680,7 +657,8 @@ public:
database.yms[udb].value = value;
database.yms[udb].update_time = system32.timers; //设置刷新时间
database.yms[udb].op_unit = uid;
};
}
inline DWORD GetUnitYM(int uid, int order) const
{
int udb;
@ -695,7 +673,8 @@ public:
pUnit->yms[order].value = database.yms[udb].value;
pUnit->yms[order].update_time = database.yms[udb].update_time;
return database.yms[udb].value;
};
}
void SetUnitYMQDS(int uid, int point, BYTE qds);
BYTE GetUnitYMQDS(int uid, int point) const;
inline float GetUnitYMReal(int uid, int order) const

View File

@ -188,30 +188,14 @@ typedef int SOCKET;
#define PROTOCOL_HOST_MODBUS_TCP 16 //MODBUS tcp主
#define PROTOCOL_SUB_MODBUS_TCP 17 //MODBUS RTU over tcp从
#define PROTOCOL_HOST_MODBUS_RTU_TCP 18 //MODBUS RTU over tcp主
//#define PROTOCOL_HOST_MODBUS_RTU_NCY 19 //NCY-6100系列微机保护装置MODBUS RTU主
//#define PROTOCOL_HOST_MODBUS_RTU_LIYEZG 20 //浙江立业电器MODBUS RTU主
#define PROTOCOL_RTU_STATE 21 //主控状态
#define PROTOCOL_LOCAL_DEBUG 22 //本地调试
//#define PROTOCOL_SUB_XT9712 23 //XT9712从
//#define PROTOCOL_HISDATA 24 //历史数据协议
#define PROTOCOL_BF_FTP 25 //倍福FTP数据协议
//#define PROTOCOL_HOST_NSA 27 //NSA主
#define PROTOCOL_HOST_DLT645V2007 30 //dlt645v2007协议
#define PROTOCOL_HOST_DLT645V2007_HR 31 //中电华瑞dlt645v2007协议
#define PROTOCOL_HOST_DLT645V2007_OVERTCP 32 //中电华瑞dlt645v2007 over tcp协议
#define PROTOCOL_HOST_DLT645V2007 32 //dlt645v2007协议
#define PROTOCOL_CALC 37 //计算
//#define PROTOCOL_DLT_1867 40 //需求响应协议dlt1867-2008
#define PROTOCOL_SUB_GDW104 47 //国网104从
//#define PROTOCOL_AGC 50 //功率自动控制app
//#define PROTOCOL_AVC 51 //电压自动控制app
#define PROTOCOL_HW_MQTT 72 //华为物联平台ROMA
//#define PROTOCOL_ISS_MQTT 73 //软通动力mqtt
//#define PROTOCOL_LM_MQTT 74 //罗米mqtt
//#define PROTOCOL_GRPC_PUBLISH 76 //gRPC上传发布
//#define PROTOCOL_GRPC_SUBSCRIBE 77 //gRPC下载订阅
#define PROTOCOL_OPCUA 78 //opcua协议
//#define PROTOCOL_HOST_MODBUS_RTU_RDS100AFT 80 //珠海瑞捷电气股份有限公司RD系列保护装置MODBUS RTU主
//#define PROTOCOL_HOST_MODBUS_RTU_APF 81 //江苏沃海电气有限公司APFSVGMODBUS RTU主
#define ADDR_TYPE_NORMAL 0
@ -512,6 +496,9 @@ typedef struct
DWORD ticks;
DWORD timers;
DWORD auto_reset_interval;
char projectName[64]; //项目名称
char version[64]; //配置文件版本信息
} struSystem;
typedef struct

View File

@ -289,17 +289,13 @@ NOPOLL_SOCKET __nopoll_conn_sock_connect_opts_internal (noPollCtx * ctx,
nopoll_conn_set_sock_block (session, nopoll_false);
/* do a tcp connect */
if (connect (session, res->ai_addr, res->ai_addrlen) < 0) {
if(errno != NOPOLL_EINPROGRESS && errno != NOPOLL_EWOULDBLOCK && errno != NOPOLL_ENOTCONN) {
nopoll_log (ctx, NOPOLL_LEVEL_WARNING, "unable to connect to remote host %s:%s errno=%d",
host, port, errno);
shutdown (session, SHUT_RDWR);
nopoll_close_socket (session);
if (connect (session, res->ai_addr, res->ai_addrlen) < 0) {
if(errno != NOPOLL_EINPROGRESS && errno != NOPOLL_EWOULDBLOCK && errno != NOPOLL_ENOTCONN) {
nopoll_log (ctx, NOPOLL_LEVEL_WARNING, "unable to connect to remote host %s:%s errno=%d", host, port, errno);
shutdown (session, SHUT_RDWR);
nopoll_close_socket (session);
/* relase address info */
freeaddrinfo (res);
return -1;
} /* end if */
} /* end if */
@ -385,6 +381,7 @@ char * __nopoll_conn_get_client_init (noPollConn * conn, noPollConnOpts * opts)
conn->handshake->expected_accept = nopoll_strdup (key);
/* send initial handshake */
#if 0
return nopoll_strdup_printf ("GET %s HTTP/1.1"
"\r\nHost: %s"
"\r\nUpgrade: websocket"
@ -413,6 +410,36 @@ char * __nopoll_conn_get_client_init (noPollConn * conn, noPollConnOpts * opts)
conn->protocols ? conn->protocols : "",
/* extra arbitrary headers */
(opts && opts->extra_headers) ? opts->extra_headers : "");
#else
return nopoll_strdup_printf ("GET %s HTTP/1.1"
"\r\nHost: %s:%s"
"\r\nUpgrade: websocket"
"\r\nConnection: Upgrade"
"\r\nSec-WebSocket-Key: %s"
"\r\nSec-WebSocket-Version: %d"
"%s%s"
"%s%s" /* Cookie */
"%s%s" /* protocol part */
"%s" /* extra arbitrary headers */
"\r\n\r\n",
conn->get_url,
conn->host_name, conn->port,
/* sec-websocket-key */
key,
/* sec-websocket-version */
conn->ctx->protocol_version,
/* Origin (support not sending Origin: header in case it is not defined) */
(conn->origin != NULL && (opts == NULL || opts->add_origin_header)) ? "\r\nOrigin: " : "",
(conn->origin != NULL && (opts == NULL || opts->add_origin_header)) ? conn->origin : "",
/* Cookie */
(opts && opts->cookie) ? "\r\nCookie: " : "",
(opts && opts->cookie) ? opts->cookie : "",
/* protocol part */
conn->protocols ? "\r\nSec-WebSocket-Protocol: " : "",
conn->protocols ? conn->protocols : "",
/* extra arbitrary headers */
(opts && opts->extra_headers) ? opts->extra_headers : "");
#endif
}
@ -882,6 +909,7 @@ noPollConn * __nopoll_conn_new_common (noPollCtx * ctx,
/* get client init payload */
content = __nopoll_conn_get_client_init (conn, options);
if (content == NULL) {
nopoll_log (ctx, NOPOLL_LEVEL_CRITICAL, "Failed to build client init message, unable to connect");
nopoll_conn_shutdown (conn);
@ -1041,15 +1069,19 @@ noPollConn * __nopoll_conn_new_common (noPollCtx * ctx,
/* call to send content */
remaining_timeout = ctx->conn_connect_std_timeout;
while (remaining_timeout > 0) {
if (size != conn->send (conn, content, size)) {
int sent_size = conn->send (conn, content, size);
//fprintf(stderr, "here sent size is: %d.\n", sent_size);
if (size != sent_size) {
/* for some reason, under FreeBSD, a ENOTCONN is reported when they should be returning EINPROGRESS and/or EWOULDBLOCK */
if (errno == NOPOLL_EWOULDBLOCK || errno == NOPOLL_EINPROGRESS || errno == NOPOLL_ENOTCONN) {
/* nopoll_log (ctx, NOPOLL_LEVEL_CRITICAL, "Connection in progress (errno=%d), session: %d", errno, session); */
//fprintf(stderr, "Connection in progress (errno=%d,%s), session: %d\n", errno, strerror(errno), session);
nopoll_sleep (10000);
remaining_timeout -= 10000;
continue;
} /* end if */
//fprintf(stderr, "Failed to send websocket init message, error code was: %d,%s (2), closing session\n", errno, strerror(errno));
nopoll_log (ctx, NOPOLL_LEVEL_CRITICAL, "Failed to send websocket init message, error code was: %d (2), closing session", errno);
nopoll_conn_shutdown (conn);
} /* end if */
@ -1057,6 +1089,7 @@ noPollConn * __nopoll_conn_new_common (noPollCtx * ctx,
break;
}
//fprintf (stderr, "Web socket initial client handshake sent");
nopoll_log (ctx, NOPOLL_LEVEL_DEBUG, "Web socket initial client handshake sent");
/* release content */

View File

@ -28,6 +28,7 @@
<annotations.version>4.8.6</annotations.version>
<undertow.version>2.3.14.Final</undertow.version>
<apache.poi>5.3.0</apache.poi>
<taosdata.verson>3.2.10</taosdata.verson>
</properties>
<dependencies>
@ -92,6 +93,11 @@
<version>${hutool.version}</version>
</dependency>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>${taosdata.verson}</version>
</dependency>
<!-- 提供Redis连接池 -->
<dependency>
@ -154,7 +160,7 @@
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
<version>3.4.4</version>
</dependency>

View File

@ -1,5 +1,9 @@
package com.das;
import com.das.modules.node.service.DataService;
import com.das.modules.node.service.TDEngineService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@ -10,10 +14,19 @@ import org.springframework.scheduling.annotation.EnableScheduling;
*/
@EnableScheduling
@SpringBootApplication
public class DasApplication {
public class DasApplication implements CommandLineRunner {
@Autowired
TDEngineService tDEngineService;
@Autowired
DataService dataService;
public static void main(String[] args) {
SpringApplication.run(DasApplication.class, args);
}
@Override
public void run(String... args) {
tDEngineService.init();
dataService.createTdStable();
}
}

View File

@ -0,0 +1,33 @@
package com.das.common.constant;
/**
* @author chenhaojie
*
*/
public interface BaseIotModelType {
/**
* 遥测量
*/
int TYPE_PSR_ANALOG = 138;
/**
* 累积量
*/
int TYPE_PSR_ACCUMULATOR = 139;
/**
* 离散量
*/
int TYPE_PSR_DISCRETE = 140;
/**
* 遥控量
*/
int TYPE_PSR_CONTROL = 147;
/**
* 遥调量
*/
int TYPE_PSR_SETPOINT = 146;
}

View File

@ -1,18 +0,0 @@
package com.das.common.constant;
/**
* @author chenhaojie
*
*/
public interface DataStatus {
/**
* 正常状态
*/
String NORMAL = "0";
/**
* 停用状态
*/
String DISABLE = "1";
}

View File

@ -226,12 +226,12 @@ public class SysIotModelController {
/** 物模型导入 */
@PostMapping("/import")
public R<Void> importSysIotModel(String iotModelId, @RequestParam("file") MultipartFile file) throws IOException {
public R<Void> importSysIotModel(String id, @RequestParam("file") MultipartFile file) throws IOException {
if (StringUtils.isEmpty(iotModelId)) {
if (StringUtils.isEmpty(id)) {
throw new ServiceException("请选择需要导入的物模型属性信息");
}
sysIotModelService.importSysIotModel(iotModelId, file);
sysIotModelService.importSysIotModel(id, file);
return R.success();
}

View File

@ -0,0 +1,24 @@
package com.das.modules.equipment.domain.vo;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import lombok.Data;
/**
* 物模型前端回显
*
* @author guchengwei
*/
@Data
public class IotModelFieldVo {
@JsonSerialize(using = ToStringSerializer.class)
private Long id;
private Integer objectType;
private String iotModelCode;
private String iotModelName;
}

View File

@ -25,5 +25,11 @@ public interface SysEquipmentMapper extends BaseMapperPlus<SysEquipment, SysEqui
void updateIotAddr(Long id, String iotAddr);
// 获取设备的属性信息
List<String> getEquipmentAttributeInfo(Long equipmentId);
// 获取设备的动作信息
List<String> getEquipmentServiceInfo(Long equipmentId);
}

View File

@ -4,6 +4,7 @@ package com.das.modules.equipment.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.das.modules.equipment.domain.excel.SysIotModelFieldExcel;
import com.das.modules.equipment.domain.excel.SysIotModelServiceExcel;
import com.das.modules.equipment.domain.vo.IotModelFieldVo;
import com.das.modules.equipment.domain.vo.SysIotModelVo;
import com.das.modules.equipment.entity.SysIotModel;
import org.apache.ibatis.annotations.Mapper;
@ -25,4 +26,10 @@ public interface SysIotModelMapper extends BaseMapper<SysIotModel> {
String getIotModelServiceCode(Integer objectType);
List<IotModelFieldVo> getAllIotModel();
String getIotModel(Long id);
List<String> getAllIotModelField(Long id);
}

View File

@ -1,28 +1,25 @@
package com.das.modules.node.command;
import cn.hutool.core.collection.ListUtil;
import com.das.common.utils.AdminRedisTemplate;
import com.das.modules.node.constant.NodeConstant;
import com.das.modules.node.domain.bo.TerminalMessage;
import com.fasterxml.jackson.databind.JsonNode;
import com.das.modules.node.service.DataService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Map;
public class AnalogDataCommand implements BaseCommand{
public static final String YC_KEY_NAME = "RDB:YC:VALUES";
@Service(value = NodeConstant.ANALOG_DATA)
@Slf4j
public class AnalogDataCommand implements BaseCommand {
@Autowired
AdminRedisTemplate adminRedisTemplate;
DataService dataService;
@Override
public void doCommand(TerminalMessage data) {
JsonNode jsonNode = data.getData();
Map<String, Object> values = Map.of(
"deviceId", jsonNode.get("deviceId").asLong(),
"dataTime", data.getTime(),
"values", ListUtil.toList(jsonNode.get("value").asText())
);
adminRedisTemplate.set(YC_KEY_NAME, values);
try {
dataService.handleData(data);
// 存入td库
} catch (Exception e) {
log.error("解析数据异常", e);
}
}
}

View File

@ -1,10 +0,0 @@
package com.das.modules.node.command;
import com.das.modules.node.domain.bo.TerminalMessage;
public class HistoryStateDataCommand implements BaseCommand{
@Override
public void doCommand(TerminalMessage data) {
// 更新td数据库
}
}

View File

@ -1,10 +1,30 @@
package com.das.modules.node.command;
import cn.hutool.core.collection.ListUtil;
import com.das.common.utils.AdminRedisTemplate;
import com.das.modules.node.constant.NodeConstant;
import com.das.modules.node.domain.bo.TerminalMessage;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service(value = NodeConstant.INIT_DEVICE_DATA)
@Slf4j
public class InitDeviceDataCommand implements BaseCommand{
@Autowired
AdminRedisTemplate adminRedisTemplate;
@Override
public void doCommand(TerminalMessage data) {
log.info("收到设备初始化数据");
// 存入redis
JsonNode dataInfo = data.getData();
if (!dataInfo.isEmpty()) {
String deviceId = dataInfo.get("deviceId").asText();
// 存入redis
adminRedisTemplate.set(deviceId, ListUtil.toList(dataInfo.get("values").asText()));
}
}
}

View File

@ -1,27 +1,25 @@
package com.das.modules.node.command;
import cn.hutool.core.collection.ListUtil;
import com.das.common.utils.AdminRedisTemplate;
import com.das.modules.node.constant.NodeConstant;
import com.das.modules.node.domain.bo.TerminalMessage;
import com.fasterxml.jackson.databind.JsonNode;
import com.das.modules.node.service.DataService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Map;
public class StateDataCommand implements BaseCommand{
public static final String YM_KEY_NAME = "RDB:YM:VALUES";
@Service(value = NodeConstant.STATE_DATA)
@Slf4j
public class StateDataCommand implements BaseCommand {
@Autowired
AdminRedisTemplate adminRedisTemplate;
DataService dataService;
@Override
public void doCommand(TerminalMessage data) {
JsonNode jsonNode = data.getData();
Map<String, Object> values = Map.of(
"deviceId", jsonNode.get("deviceId").asLong(),
"dataTime", data.getTime(),
"values", ListUtil.toList(jsonNode.get("value").asText())
);
adminRedisTemplate.set(YM_KEY_NAME, values);
try {
dataService.handleData(data);
} catch (Exception e) {
log.error("解析数据异常", e);
}
}
}

View File

@ -9,4 +9,12 @@ public interface NodeConstant {
String LAST_PONG_TIME = "LastPongTime";
String HEARTBEAT = "heartbeat";
String DEVICE_CONTROL = "deviceControl";
String INIT_DEVICE_DATA = "initDeviceData";
String ANALOG_DATA = "analogData";
String STATE_DATA = "stateData";
}

View File

@ -8,7 +8,9 @@ import com.das.common.result.R;
import com.das.common.utils.PageDataInfo;
import com.das.modules.node.domain.dto.*;
import com.das.modules.node.domain.vo.*;
import com.das.modules.node.service.DataService;
import com.das.modules.node.service.SysNodeService;
import com.fasterxml.jackson.databind.JsonNode;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.beans.factory.annotation.Autowired;
@ -34,6 +36,10 @@ public class SysNodeController {
@Autowired
private SysNodeService sysNodeService;
@Autowired
private DataService dataService;
/** 获取节点列表 */
@PostMapping("/list")
@ -84,8 +90,9 @@ public class SysNodeController {
/** 配置下发 */
@PostMapping("/configUpdate")
public R<Void> configUpdate() {
return R.success();
public R<?> configUpdate() {
JsonNode configUpdateData = dataService.getConfigUpdateInfo(Long.valueOf(1));
return R.success(configUpdateData);
}
@ -158,10 +165,10 @@ public class SysNodeController {
public R<?> getMappingList(@RequestBody SysImptabmappingDto sysImptabmappingDto) {
//判断是否有权限
// boolean hasPermission = StpUtil.hasPermission(SysAuthorityIds.SYS_AUTHORITY_ID_DEVICE_MGR.toString());
// if(!hasPermission){
// return R.fail("没有节点管理权限");
// }
boolean hasPermission = StpUtil.hasPermission(SysAuthorityIds.SYS_AUTHORITY_ID_DEVICE_MGR.toString());
if(!hasPermission){
return R.fail("没有节点管理权限");
}
if (sysImptabmappingDto.getLinkId() == null) {
throw new ServiceException("参数缺失");
}
@ -229,5 +236,11 @@ public class SysNodeController {
}
return R.success("导入失败");
}
/** 遥控遥调 */
@PostMapping("/link/deviceControl")
public void deviceControl(@RequestBody DeviceControlDto device) {
sysNodeService.deviceControl(device);
}
}

View File

@ -0,0 +1,23 @@
package com.das.modules.node.domain.bo;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RTData {
@JsonSerialize(using = ToStringSerializer.class)
private Long deviceId;
private Long dataTime;
private Map<String, Object> values;
}

View File

@ -12,5 +12,7 @@ public class BindEquipmentInfoDto {
@JsonSerialize(using = ToStringSerializer.class)
private Long equipmentId;
private Integer porder;
private String iotAddr;
}

View File

@ -0,0 +1,20 @@
package com.das.modules.node.domain.dto;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import lombok.Data;
@Data
public class DeviceControlDto {
@JsonSerialize(using = ToStringSerializer.class)
private Long nodeId;
@JsonSerialize(using = ToStringSerializer.class)
private Long deviceId;
private String serviceName;
private Integer opValue;
}

View File

@ -21,4 +21,6 @@ public class ImptabmappingDto implements Serializable {
private Integer type;
private String params;
private String name;
}

View File

@ -15,7 +15,7 @@ public class EquipmentVo {
private String addr;
private List<IotModelVo> attrs;
private List<NewIotModelVo> attrs;
private List<IotModelVo> services;
private List<NewIotModelVo> services;
}

View File

@ -14,9 +14,15 @@ import lombok.Data;
@Data
public class IotModelVo {
private String name;
private Integer attributeType;
private String type;
private Integer serviceType;
private String iotAddr;
private String equipmentAttribute;
private String equipmentService;
private Object params;
}

View File

@ -0,0 +1,13 @@
package com.das.modules.node.domain.vo;
import lombok.Data;
@Data
public class NewIotModelVo {
private String name;
private String type;
private Object params;
}

View File

@ -30,7 +30,7 @@ public class SysCommunicationLinkVo {
/**
* 协议参数
*/
private Object params;
private String params;
/**
* 所属系统节点

View File

@ -18,9 +18,9 @@ import java.util.List;
@Mapper
public interface SysImptabmappingMapper extends BaseMapperPlus<SysImptabmapping, SysImptabmapping> {
List<ImptabmappingVo> getMappingList(Long linkId, String code);
List<ImptabmappingVo> getMappingList(Long linkId, Integer type);
List<ImptabmappingVo> getMappingControlList(Long linkId, String code);
List<ImptabmappingVo> getMappingControlList(Long linkId, Integer type);
List<SysImptabmappingVo> getBindDevice(Long linkId);
@ -34,9 +34,9 @@ public interface SysImptabmappingMapper extends BaseMapperPlus<SysImptabmapping,
String getIotAddrByEquipmentId(Long equipmentId);
String getAttributeCode(Long equipmentId, Integer type);
String getAttributeCode(Long equipmentId, Integer type, String name);
String getServiceCode(Long equipmentId, Integer type);
String getServiceCode(Long equipmentId, Integer type, String name);
List<SysImptabmappingVo> getMappingInfoList(Long linkId);
}

View File

@ -8,4 +8,7 @@ public interface DataService {
void pushMessage(TerminalMessage msg);
JsonNode getConfigUpdateInfo(Long nodeId);
void createTdStable();
void handleData(TerminalMessage data);
}

View File

@ -42,4 +42,6 @@ public interface SysNodeService {
void exportMappingList(SysImptabmappingDto sysImptabmappingDto,HttpServletRequest request, HttpServletResponse response);
boolean importMappingList(String linkId, MultipartFile file);
void deviceControl(DeviceControlDto device);
}

View File

@ -0,0 +1,170 @@
package com.das.modules.node.service;
import cn.hutool.core.collection.ListUtil;
import com.das.modules.equipment.domain.vo.IotModelFieldVo;
import com.das.modules.equipment.mapper.SysIotModelMapper;
import com.das.modules.node.domain.bo.RTData;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Service;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Service
@EnableAsync
public class TDEngineService {
private HikariDataSource hikariDataSource;
@Value("${tdengine.url}")
private String url;
@Value("${tdengine.username}")
private String username;
@Value("${tdengine.password}")
private String password;
@Value("${tdengine.batch-size:10000}")
private int batchSize;
public void init() {
if (hikariDataSource == null) {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(url);
config.setUsername(username);
config.setPassword(password);
config.setConnectionTestQuery("select server_status()");
config.setMinimumIdle(10); //minimum number of idle connection
config.setMaximumPoolSize(20); //maximum number of connection in the pool
config.setMaxLifetime(0); // maximum life time for each connection
// config.setIdleTimeout(0); // max idle time for recycle idle connection
log.info("=======>TDEngineUrl:" + config.getJdbcUrl());
hikariDataSource = new HikariDataSource(config);
}
}
// 遍历所有的物模型存入内存中
public void initIotModel(List<IotModelFieldVo> allIotModel, ConcurrentHashMap<String,Map<String, Object>> iotFieldMap) {
// 创建物模型超级表
StringBuilder sb = new StringBuilder(1024*1024);
try (Connection conn = hikariDataSource.getConnection();
Statement pstmt = conn.createStatement()) {
ListUtil.page(allIotModel, batchSize, (list)->{
sb.setLength(0);
for (IotModelFieldVo info : list) {
sb.append("CREATE STABLE IF NOT EXISTS ");
sb.append(info.getIotModelCode());
sb.append(" (`updatetime` TIMESTAMP");
Map<String, Object> map = iotFieldMap.get(info.getIotModelCode());
// 使用增强的 for 循环遍历键
for (String key : map.keySet()) {
sb.append(", ");
sb.append(key);
sb.append(" float");
}
sb.append(") TAGS (`deviceid` BIGINT);");
}
try {
pstmt.executeUpdate(sb.toString());
} catch (SQLException ex) {
log.error("save yx error", ex);
}
});
} catch (SQLException ex) {
log.error(ex.getMessage());
}
}
@Async
public void updateYCValues(List<RTData> values, String iotModelCode) {
StringBuilder sb = new StringBuilder(1024*1024);
try (Connection conn = hikariDataSource.getConnection();
Statement pstmt = conn.createStatement()) {
ListUtil.page(values, batchSize, (list)->{
sb.setLength(0);
sb.append("insert into ");
for (RTData dv : list) {
sb.append("d");
sb.append(dv.getDeviceId());
sb.append(" using " );
sb.append(iotModelCode);
sb.append(" tags (");
sb.append(dv.getDeviceId());
sb.append(") values (");
sb.append(dv.getDataTime());
dv.getValues().forEach((key, value) ->
sb.append(",").append(value)
);
sb.append(")");
}
try {
pstmt.executeUpdate(sb.toString());
} catch (SQLException ex) {
log.error("save yc error", ex);
}
});
} catch (SQLException ex) {
log.error(ex.getMessage());
}
}
@Async
public void updateStataValues(List<RTData> values, String iotModelCode) {
StringBuilder sb = new StringBuilder(1024*1024);
try (Connection conn = hikariDataSource.getConnection();
Statement pstmt = conn.createStatement()) {
ListUtil.page(values, batchSize, (list)->{
sb.setLength(0);
sb.append("insert into ");
for (RTData dv : list) {
sb.append("d");
sb.append(dv.getDeviceId());
sb.append(" using " );
sb.append(iotModelCode);
sb.append(" tags (");
sb.append(dv.getDeviceId());
sb.append(") values (");
sb.append(dv.getDataTime());
dv.getValues().forEach((key, value) ->
sb.append(",").append(value)
);
sb.append(")");
}
try {
pstmt.executeUpdate(sb.toString());
} catch (SQLException ex) {
log.error("save yc error", ex);
}
});
} catch (SQLException ex) {
log.error(ex.getMessage());
}
}
@PreDestroy
public void free() {
if (hikariDataSource != null) {
log.info("释放TDEngine资源");
hikariDataSource.close();
}
}
}

View File

@ -1,26 +1,40 @@
package com.das.modules.node.service.impl;
import cn.hutool.core.collection.ListUtil;
import com.das.common.constant.BaseIotModelType;
import com.das.common.utils.AdminRedisTemplate;
import com.das.modules.equipment.domain.vo.IotModelFieldVo;
import com.das.modules.equipment.mapper.SysIotModelMapper;
import com.das.modules.node.disruptor.MessageEventFactory;
import com.das.modules.node.disruptor.TerminalMessageEventHandler;
import com.das.modules.node.domain.bo.RTData;
import com.das.modules.node.domain.bo.TerminalMessage;
import com.das.modules.node.domain.vo.*;
import com.das.modules.node.handler.NodeMessageHandler;
import com.das.modules.node.mapper.SysCommunicationLinkMapper;
import com.das.modules.node.mapper.SysImptabmappingMapper;
import com.das.modules.node.service.DataService;
import com.das.modules.node.service.TDEngineService;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@ -40,6 +54,22 @@ public class DataServiceImpl implements DataService {
@Resource
SysImptabmappingMapper sysImptabmappingMapper;
@Resource
private NodeMessageHandler nodeMessageHandler;
@Autowired
AdminRedisTemplate adminRedisTemplate;
@Autowired
SysIotModelMapper sysIotModelMapper;
@Autowired
TDEngineService tdEngineService;
private ConcurrentHashMap<String, String> iotModelMap = new ConcurrentHashMap<>(10000);
public ConcurrentHashMap<String, Map<String, Object>> iotFieldMap = new ConcurrentHashMap<>(10000);
@PostConstruct
public void init() {
//初始化高性能队列
@ -52,31 +82,28 @@ public class DataServiceImpl implements DataService {
}
@PreDestroy
public void destroy() {
if (ringBuffer != null){
if (ringBuffer != null) {
ringBuffer = null;
}
if (disruptor != null){
if (disruptor != null) {
disruptor.shutdown();
}
}
@Override
public void pushMessage(TerminalMessage msg) {
if (ringBuffer == null){
if (ringBuffer == null) {
return;
}
long seq = ringBuffer.next();
try{
try {
TerminalMessage terminalMessage = ringBuffer.get(seq);
terminalMessage.from(msg);
}
catch (Exception e){
log.error("发送消息失败",e);
}
finally {
} catch (Exception e) {
log.error("发送消息失败", e);
} finally {
ringBuffer.publish(seq);
}
@ -88,54 +115,81 @@ public class DataServiceImpl implements DataService {
List<LinkVo> links = new ArrayList<>();
List<EquipmentVo> equipments = new ArrayList<>();
List<Long> equipmentList = new ArrayList<>();
// 获取所有的链路信息
List<SysCommunicationLinkVo> sysCommunicationLinkVoList = sysCommunicationLinkMapper.querySysCommunicationLink(nodeId);
for (SysCommunicationLinkVo sysCommunicationLinkVo : sysCommunicationLinkVoList) {
LinkVo linkVo = new LinkVo();
linkVo.setLinkId(sysCommunicationLinkVo.getId());
linkVo.setLinkName(sysCommunicationLinkVo.getLinkName());
linkVo.setParams(sysCommunicationLinkVo.getParams());
linkVo.setProtocol(sysCommunicationLinkVo.getProtocol());
List<String> stringList = new ArrayList<>();
// 获取关联的设备Id
equipmentList = sysImptabmappingMapper.getEquipmentId(sysCommunicationLinkVo.getId());
for (Long equipmentId : equipmentList) {
stringList.add(String.valueOf(equipmentId));
}
String[] stringArray = stringList.toArray(new String[0]);
linkVo.setDevices(stringArray);
links.add(linkVo);
}
for (Long equipmentId : equipmentList) {
List<IotModelVo> newIotModelFieldList = new ArrayList<>();
List<IotModelVo> newIotModelServiceList = new ArrayList<>();
// 获取设备IOT地址
String iotAddr = sysImptabmappingMapper.getIotAddrByEquipmentId(equipmentId);
// 根据设备Id获取对应的物模型属性和动作
List<IotModelVo> iotModelFieldList = sysImptabmappingMapper.getIotModelFieldByEquipmentId(equipmentId);
if (!CollectionUtils.isEmpty(iotModelFieldList)) {
for (IotModelVo info : iotModelFieldList) {
if(info.getParams() == null){
// info.setParams();
try {
// 获取所有的链路信息
List<SysCommunicationLinkVo> sysCommunicationLinkVoList = sysCommunicationLinkMapper.querySysCommunicationLink(nodeId);
for (SysCommunicationLinkVo sysCommunicationLinkVo : sysCommunicationLinkVoList) {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(sysCommunicationLinkVo.getParams());
LinkVo linkVo = new LinkVo();
linkVo.setLinkId(sysCommunicationLinkVo.getId());
linkVo.setLinkName(sysCommunicationLinkVo.getLinkName());
linkVo.setParams(jsonNode);
linkVo.setProtocol(sysCommunicationLinkVo.getProtocol());
List<String> stringList = new ArrayList<>();
// 获取关联的设备Id
equipmentList = sysImptabmappingMapper.getEquipmentId(sysCommunicationLinkVo.getId());
for (Long equipmentId : equipmentList) {
stringList.add(String.valueOf(equipmentId));
}
String[] stringArray = stringList.toArray(new String[0]);
linkVo.setDevices(stringArray);
links.add(linkVo);
for (Long equipmentId : equipmentList) {
HashMap map = new HashMap<>();
ObjectMapper equipObjectMapper = new ObjectMapper();
ObjectNode equipJsonNode = equipObjectMapper.convertValue(map, ObjectNode.class);
List<NewIotModelVo> newIotModelFieldList = new ArrayList<>();
List<NewIotModelVo> newIotModelServiceList = new ArrayList<>();
// 获取设备IOT地址
String iotAddr = sysImptabmappingMapper.getIotAddrByEquipmentId(equipmentId);
// 根据设备Id获取对应的物模型属性和动作
List<IotModelVo> iotModelFieldList = sysImptabmappingMapper.getIotModelFieldByEquipmentId(equipmentId);
if (!CollectionUtils.isEmpty(iotModelFieldList)) {
for (IotModelVo info : iotModelFieldList) {
if(info.getServiceType() == null) {
NewIotModelVo newIotModelVo = new NewIotModelVo();
newIotModelVo.setName(info.getEquipmentAttribute());
if (info.getParams() == null) {
newIotModelVo.setParams(equipJsonNode);
} else {
newIotModelVo.setParams(equipObjectMapper.readTree(info.getParams().toString()));
}
if (info.getAttributeType() == BaseIotModelType.TYPE_PSR_ANALOG) {
newIotModelVo.setType("yc");
} else if (info.getAttributeType() == BaseIotModelType.TYPE_PSR_ACCUMULATOR) {
newIotModelVo.setType("ym");
} else if (info.getAttributeType() == BaseIotModelType.TYPE_PSR_DISCRETE) {
newIotModelVo.setType("yx");
}
newIotModelFieldList.add(newIotModelVo);
} else if (info.getAttributeType() == null) {
NewIotModelVo newIotModelVo = new NewIotModelVo();
newIotModelVo.setName(info.getEquipmentService());
if (info.getParams() == null) {
newIotModelVo.setParams(equipJsonNode);
} else {
newIotModelVo.setParams(equipObjectMapper.readTree(info.getParams().toString()));
}
if (info.getServiceType() == BaseIotModelType.TYPE_PSR_SETPOINT) {
newIotModelVo.setType("yt");
} else if (info.getServiceType() == BaseIotModelType.TYPE_PSR_CONTROL) {
newIotModelVo.setType("yk");
}
newIotModelServiceList.add(newIotModelVo);
}
}
}
newIotModelFieldList.add(info);
EquipmentVo equipment = new EquipmentVo();
equipment.setAddr(iotModelFieldList.get(0).getIotAddr());
equipment.setId(equipmentId);
equipment.setAttrs(newIotModelFieldList);
equipment.setServices(newIotModelServiceList);
equipments.add(equipment);
}
}
List<IotModelVo> iotModelServiceList = sysImptabmappingMapper.getIotModelServiceByEquipmentId(equipmentId);
if (!CollectionUtils.isEmpty(iotModelServiceList)) {
for (IotModelVo info : iotModelServiceList) {
if(info.getParams() == null){
// info.setParams();
}
newIotModelServiceList.add(info);
}
}
EquipmentVo equipment = new EquipmentVo();
equipment.setAddr(iotAddr);
equipment.setId(equipmentId);
equipment.setAttrs(newIotModelFieldList);
equipment.setServices(newIotModelServiceList);
equipments.add(equipment);
} catch (Exception e) {
log.error("获取设备配置信息失败", e);
}
configUpdateVo.setCreateTime(System.currentTimeMillis());
configUpdateVo.setNodeId(String.valueOf(nodeId));
@ -146,4 +200,47 @@ public class DataServiceImpl implements DataService {
JsonNode jsonNode = mapper.valueToTree(configUpdateVo);
return jsonNode;
}
@Override
public void createTdStable() {
List<IotModelFieldVo> allIotModel = sysIotModelMapper.getAllIotModel();
for (IotModelFieldVo item : allIotModel) {
String key = String.valueOf(item.getId());
iotModelMap.put(key, item.getIotModelCode());
List<String> modelFieldList = sysIotModelMapper.getAllIotModelField(item.getId());
Map<String, Object> map = new HashMap<>();
for (String field : modelFieldList) {
map.put(field, null);
}
iotFieldMap.put(item.getIotModelCode(), map);
}
tdEngineService.initIotModel(allIotModel, iotFieldMap);
}
@Override
public void handleData(TerminalMessage data) {
JsonNode jsonNode = data.getData();
String deviceId = jsonNode.get("deviceId").asText();
Long dataTime = data.getTime();
adminRedisTemplate.set(deviceId, ListUtil.toList(jsonNode.get("values").asText()));
// 存入td库
// 根据设备ID获取对应的物模型属性
String iotModelCode = sysIotModelMapper.getIotModel(jsonNode.get("deviceId").asLong());
JsonNode valueJsonNode = jsonNode.get("values");
Map<String, Object> resultMap = iotFieldMap.get(iotModelCode);
// 使用增强的 for 循环遍历键
for (String key : resultMap.keySet()) {
float value = (float) valueJsonNode.get(key).asDouble();
resultMap.put(key, value);
}
List<RTData> list = new ArrayList<>();
RTData rtData = RTData.builder()
.dataTime(dataTime)
.deviceId(Long.valueOf(deviceId))
.values(resultMap)
.build();
list.add(rtData);
tdEngineService.updateYCValues(list, iotModelCode);
}
}

View File

@ -13,6 +13,9 @@ import com.das.modules.auth.entity.SysOrg;
import com.das.modules.auth.mapper.SysOrgMapper;
import com.das.modules.equipment.mapper.SysEquipmentMapper;
import com.das.modules.equipment.mapper.SysIotModelMapper;
import com.das.modules.node.constant.NodeConstant;
import com.das.modules.node.disruptor.TerminalMessageEventHandler;
import com.das.modules.node.domain.bo.TerminalMessage;
import com.das.modules.node.domain.dto.*;
import com.das.modules.node.domain.vo.ImptabmappingVo;
import com.das.modules.node.domain.vo.SysCommunicationLinkVo;
@ -26,6 +29,7 @@ import com.das.modules.node.mapper.SysImptabmappingMapper;
import com.das.modules.node.mapper.SysNodeMapper;
import com.das.modules.node.service.SysNodeService;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
@ -44,6 +48,7 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@Transactional(rollbackFor = Exception.class)
@ -68,7 +73,7 @@ public class SysNodeServiceImpl implements SysNodeService {
private SysEquipmentMapper sysEquipmentMapper;
@Autowired
private SysIotModelMapper sysIotModelMapper;
TerminalMessageEventHandler terminalMessageEventHandler;
@Override
public List<SysNodeVo> querySysNodeList() {
@ -184,13 +189,10 @@ public class SysNodeServiceImpl implements SysNodeService {
Integer type = sysImptabmappingDto.getType();
List<ImptabmappingVo> list = new ArrayList<>();
// 根据类型获取物模型属性或者动作编码
String code = "";
if (type == 138 || type == 139 || type == 140) {
code = sysIotModelMapper.getIotModelFieldCode(type);
list = sysImptabmappingMapper.getMappingList(sysImptabmappingDto.getLinkId(), code);
list = sysImptabmappingMapper.getMappingList(sysImptabmappingDto.getLinkId(), type);
} else if (type == 146 || type == 147) {
code = sysIotModelMapper.getIotModelServiceCode(type);
list = sysImptabmappingMapper.getMappingControlList(sysImptabmappingDto.getLinkId(), code);
list = sysImptabmappingMapper.getMappingControlList(sysImptabmappingDto.getLinkId(), type);
}
return list;
@ -233,22 +235,20 @@ public class SysNodeServiceImpl implements SysNodeService {
if (imp.getType() == 138 || imp.getType() == 139 || imp.getType() == 140) {
// 获取属性编码
String code = sysImptabmappingMapper.getAttributeCode(imp.getEquipmentId(), imp.getType());
String code = sysImptabmappingMapper.getAttributeCode(imp.getEquipmentId(), imp.getType(), imp.getName());
sysImptabmapping.setEquipmentAttribute(code);
} else if (imp.getType() == 146 || imp.getType() == 147) {
// 获取动作编码
String code = sysImptabmappingMapper.getServiceCode(imp.getEquipmentId(), imp.getType());
String code = sysImptabmappingMapper.getServiceCode(imp.getEquipmentId(), imp.getType(), imp.getName());
sysImptabmapping.setEquipmentService(code);
}
sysImptabmapping.setUpdatedTime(new Date());
sysImptabmapping.setUpdatedBy(sysUserVo.getAccount());
sysImptabmapping.setLinkId(imp.getLinkId());
sysImptabmapping.setEquipmentId(imp.getEquipmentId());
sysImptabmapping.setParams(String.valueOf(imp.getParams()));
sysImptabmapping.setParams(imp.getParams());
list.add(sysImptabmapping);
}
if (!CollectionUtils.isEmpty(list)) {
sysImptabmappingMapper.updateBatchById(list);
sysImptabmappingMapper.insertOrUpdateBatch(list);
}
}
}
@ -282,7 +282,7 @@ public class SysNodeServiceImpl implements SysNodeService {
public boolean importMappingList(String linkId, MultipartFile file) {
boolean flag = false;
try {
// SysUserVo sysUserVo = (SysUserVo) StpUtil.getTokenSession().get(SessionUtil.SESSION_USER_KEY);
SysUserVo sysUserVo = (SysUserVo) StpUtil.getTokenSession().get(SessionUtil.SESSION_USER_KEY);
List<SysImptabmapping> addList = new ArrayList<>();
String content = new BufferedReader(new InputStreamReader(file.getInputStream(), StandardCharsets.UTF_8))
.lines()
@ -302,10 +302,8 @@ public class SysNodeServiceImpl implements SysNodeService {
newInfo.setRevision(1);
newInfo.setCreatedTime(new Date());
newInfo.setUpdatedTime(new Date());
// newInfo.setCreatedBy(sysUserVo.getAccount());
// newInfo.setUpdatedBy(sysUserVo.getAccount());
newInfo.setCreatedBy("ceshi");
newInfo.setUpdatedBy("ceshi");
newInfo.setCreatedBy(sysUserVo.getAccount());
newInfo.setUpdatedBy(sysUserVo.getAccount());
addList.add(newInfo);
}
if (!CollectionUtils.isEmpty(addList)) {
@ -320,23 +318,72 @@ public class SysNodeServiceImpl implements SysNodeService {
return flag;
}
@Override
public void deviceControl(DeviceControlDto device) {
try {
HashMap map = new HashMap<>();
ObjectMapper objectMapper = new ObjectMapper();
String cmd = NodeConstant.DEVICE_CONTROL;
Long time = System.currentTimeMillis();
map.put("deviceId", device.getDeviceId());
map.put("serviceName", device.getServiceName());
map.put("opValue", device.getOpValue());
// HashMap 转换为 JsonNode
ObjectNode jsonNode = objectMapper.convertValue(map, ObjectNode.class);
TerminalMessage configUpdate = TerminalMessage.builder()
.cmd(cmd)
.cmdId(String.valueOf(device.getDeviceId()))
.time(time)
.data(jsonNode)
.build();
terminalMessageEventHandler.sendTerminalMessageWithResult(device.getNodeId(), configUpdate);
} catch (Exception e) {
log.error("设备控制失败 ", e);
}
}
// 绑定设备的测点信息不是绑定设备到映射表
private void addSysImptabmapping(List<BindEquipmentInfoDto> equipmentId, Long linkId, SysUserVo sysUserVo, List<SysImptabmapping> addList) {
int index = 0;
// 获取设备的测点信息
for (BindEquipmentInfoDto info : equipmentId) {
index++;
SysImptabmapping sysImptabmapping = new SysImptabmapping();
sysImptabmapping.setEquipmentId(info.getEquipmentId());
sysImptabmapping.setLinkId(linkId);
sysImptabmapping.setId(SequenceUtils.generateId());
sysImptabmapping.setCreatedTime(new Date());
sysImptabmapping.setUpdatedTime(new Date());
sysImptabmapping.setCreatedBy(sysUserVo.getAccount());
sysImptabmapping.setUpdatedBy(sysUserVo.getAccount());
sysImptabmapping.setRevision(1);
sysImptabmapping.setPorder(index);
addList.add(sysImptabmapping);
List<String> fieldList = sysEquipmentMapper.getEquipmentAttributeInfo(info.getEquipmentId());
List<String> serviceList = sysEquipmentMapper.getEquipmentServiceInfo(info.getEquipmentId());
if (!CollectionUtils.isEmpty(fieldList)) {
for (String field : fieldList) {
SysImptabmapping sysImptabmapping = getSysImptabmapping(linkId, sysUserVo, info);
sysImptabmapping.setEquipmentAttribute(field);
addList.add(sysImptabmapping);
}
}
if (!CollectionUtils.isEmpty(serviceList)) {
for (String service : serviceList) {
SysImptabmapping sysImptabmapping = getSysImptabmapping(linkId, sysUserVo, info);
sysImptabmapping.setEquipmentService(service);
addList.add(sysImptabmapping);
}
}
// 更新设备表里面的设备地址
sysEquipmentMapper.updateIotAddr(info.getEquipmentId(), info.getIotAddr());
}
}
private static SysImptabmapping getSysImptabmapping(Long linkId, SysUserVo sysUserVo, BindEquipmentInfoDto info) {
SysImptabmapping sysImptabmapping = new SysImptabmapping();
sysImptabmapping.setEquipmentId(info.getEquipmentId());
sysImptabmapping.setLinkId(linkId);
sysImptabmapping.setId(SequenceUtils.generateId());
sysImptabmapping.setCreatedTime(new Date());
sysImptabmapping.setUpdatedTime(new Date());
sysImptabmapping.setCreatedBy(sysUserVo.getAccount());
sysImptabmapping.setUpdatedBy(sysUserVo.getAccount());
sysImptabmapping.setRevision(1);
sysImptabmapping.setPorder(info.getPorder());
return sysImptabmapping;
}
}

View File

@ -92,4 +92,9 @@ das:
logging:
level:
com:
das: DEBUG
das: DEBUG
tdengine:
password: taosdata
url: jdbc:TAOS-RS://192.168.109.160:6041/das
username: root

View File

@ -30,7 +30,7 @@
</select>
<select id="querySysCommunicationLink" resultType="com.das.modules.node.domain.vo.SysCommunicationLinkVo">
select sc.id,sc.link_name,sc.protocol,to_json(sc.params::json) AS params from sys_communicationlink sc where sc.node_id = #{nodeId}
select sc.id,sc.link_name,sc.protocol,sc.params from sys_communicationlink sc where sc.node_id = #{nodeId}
</select>
</mapper>

View File

@ -116,4 +116,16 @@
</update>
<select id="getEquipmentAttributeInfo" resultType="java.lang.String">
select simf.attribute_code from sys_iot_model_field simf
left join sys_equipment se on simf.iot_model_id = se.iot_model_id
where se.id = #{id}
</select>
<select id="getEquipmentServiceInfo" resultType="java.lang.String">
select sims.service_code from sys_iot_model_service sims
left join sys_equipment se on sims.iot_model_id = se.iot_model_id
where se.id = #{id}
</select>
</mapper>

View File

@ -25,22 +25,22 @@
<select id="getMappingList" resultType="com.das.modules.node.domain.vo.ImptabmappingVo">
select si.id, se."name" as equipmentName, simf.attribute_name as name,si.params,se.id as equipmentId from sys_imptabmapping si
left join sys_equipment se on si.equipment_id = se.id
left join sys_iot_model_field simf on se.iot_model_id = simf.iot_model_id
where si.link_id = #{linkId} and si.equipment_attribute = #{code}
left join sys_iot_model_field simf on si.equipment_attribute = simf.attribute_code
where si.link_id = #{linkId} and simf.attribute_type = #{type}
order by si.porder
</select>
<select id="getMappingControlList" resultType="com.das.modules.node.domain.vo.ImptabmappingVo">
select si.id, se."name" as equipmentName, sims.service_name as name,si.params,se.id as equipmentId from sys_imptabmapping si
left join sys_equipment se on si.equipment_id = se.id
left join sys_iot_model_service sims on se.iot_model_id = sims.iot_model_id
where si.link_id = #{linkId} and si.equipment_service = #{code}
left join sys_iot_model_service sims on si.equipment_service = sims.service_code
where si.link_id = #{linkId} and sims.service_type = #{type}
order by si.porder
</select>
<select id="getBindDevice" resultMap="SysImptabmappingMap">
select se.id as equipment_id,se."name" as equipmentName, si.porder, se.iot_addr from sys_imptabmapping si
left join sys_equipment se on se.id = si.equipment_id
select distinct se.id as equipment_id,se."name" as equipmentName, si.porder, se.iot_addr from sys_equipment se
left join sys_imptabmapping si on se.id = si.equipment_id
where si.link_id = #{linkId} order by si.porder
</select>
@ -57,15 +57,16 @@
</select>
<select id="getIotModelFieldByEquipmentId" resultType="com.das.modules.node.domain.vo.IotModelVo">
select simf.attribute_name as name,simf.attribute_type as type,to_json(si.params::json) as params from sys_imptabmapping si
select simf.attribute_type as attributeType,sims.service_type as serviceType, se.iot_addr as iotAddr,si.equipment_attribute as equipmentAttribute,si.equipment_service as equipmentService, si.params from sys_imptabmapping si
left join sys_equipment se on si.equipment_id = se.id
left join sys_iot_model_field simf on se.iot_model_id = simf.iot_model_id
left join sys_iot_model_field simf on si.equipment_attribute = simf.attribute_code
left join sys_iot_model_service sims on si.equipment_service = sims.service_code
where si.equipment_id = #{equipmentId}
</select>
<select id="getIotModelServiceByEquipmentId" resultType="com.das.modules.node.domain.vo.IotModelVo">
select sims.service_name as name ,sims.service_type as type ,to_json(si.params::json) AS params from sys_imptabmapping si
select sims.service_code as name ,sims.service_type as type ,si.params from sys_imptabmapping si
left join sys_equipment se on si.equipment_id = se.id
left join sys_iot_model_service sims on se.iot_model_id = sims.iot_model_id
where si.equipment_id = #{equipmentId}
@ -78,13 +79,13 @@
<select id="getAttributeCode" resultType="java.lang.String">
select distinct simf.attribute_code from sys_iot_model_field simf
left join sys_equipment se on simf.iot_model_id = se.iot_model_id
where se.id = #{equipmentId} and simf.attribute_type = #{type}
where se.id = #{equipmentId} and simf.attribute_type = #{type} and simf.attribute_name = #{name}
</select>
<select id="getServiceCode" resultType="java.lang.String">
select distinct sims.service_code from sys_iot_model_service sims
left join sys_equipment se on sims.iot_model_id = se.iot_model_id
where se.id = #{equipmentId} and sims.service_type = #{type}
where se.id = #{equipmentId} and sims.service_type = #{type} and sims.service_name = #{name}
</select>
<select id="getMappingInfoList" resultMap="SysImptabmappingMap">

View File

@ -54,5 +54,20 @@
</select>
<select id="getAllIotModel" resultType="com.das.modules.equipment.domain.vo.IotModelFieldVo">
select sim.* from sys_iot_model sim
</select>
<select id="getIotModel" resultType="java.lang.String">
select sim.iot_model_code from sys_iot_model sim left join sys_equipment se on sim.id = se.iot_model_id
where se.id = #{id}
</select>
<select id="getAllIotModelField" resultType="java.lang.String">
select simf.attribute_code from sys_iot_model_field simf where simf.iot_model_id = #{id} order by simf.porder
</select>
</mapper>