diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..fea53c67 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,13 @@ +{ + "files.associations": { + "xstring": "cpp", + "unordered_map": "cpp", + "locale": "cpp", + "xlocale": "cpp", + "xlocmes": "cpp", + "xlocmon": "cpp", + "xlocnum": "cpp", + "xloctime": "cpp", + "xhash": "cpp" + } +} \ No newline at end of file diff --git a/das-dn/CMakeLists.txt b/das-dn/CMakeLists.txt index aa4991cf..a481d1ea 100644 --- a/das-dn/CMakeLists.txt +++ b/das-dn/CMakeLists.txt @@ -4,7 +4,7 @@ set (VERSION 1.0.1) # set (CMAKE_CXX_STANDARD 11) option (USE_MQTT "use mqtt protocol" ON) -option (USE_WEBSOCKET "use websocket" ON) +option (USE_WEBSOCKET "use websocket" OFF) option (USE_SQLITE3 "use sqlite3" ON) if (USE_SQLITE3) @@ -148,7 +148,7 @@ if (USE_WEBSOCKET) set (APP_LIBS ${APP_LIBS} ssl crypto) add_definitions(-DUSE_WEBSOCKET) add_definitions(-DNOPOLL_OS_UNIX=1) - #add_definitions(-DSHOW_DEBUG_LOG) + add_definitions(-DSHOW_DEBUG_LOG) add_definitions(-DNOPOLL_HAVE_VASPRINTF=1) add_definitions(-DNOPOLL_HAVE_TLSv10_ENABLED=1) add_definitions(-DNOPOLL_HAVE_TLSv11_ENABLED=1) @@ -158,50 +158,50 @@ endif () if (USE_MQTT) set(APP_SRCS ${APP_SRCS} - third_party/mqtt/actions.c - third_party/mqtt/callbacks.c - third_party/mqtt/connect.c - third_party/mqtt/handle_auth.c - third_party/mqtt/handle_connack.c - third_party/mqtt/handle_disconnect.c - third_party/mqtt/handle_ping.c - third_party/mqtt/handle_pubackcomp.c - third_party/mqtt/handle_publish.c - third_party/mqtt/handle_pubrec.c - third_party/mqtt/handle_pubrel.c - third_party/mqtt/handle_suback.c - third_party/mqtt/handle_unsuback.c - third_party/mqtt/helpers.c - third_party/mqtt/logging_mosq.c - third_party/mqtt/loop.c - third_party/mqtt/memory_mosq.c - third_party/mqtt/messages_mosq.c - third_party/mqtt/misc_mosq.c - third_party/mqtt/mosquitto.c - third_party/mqtt/net_mosq_ocsp.c - third_party/mqtt/net_mosq.c - third_party/mqtt/options.c - third_party/mqtt/packet_datatypes.c - third_party/mqtt/packet_mosq.c - third_party/mqtt/property_mosq.c - third_party/mqtt/read_handle.c - third_party/mqtt/send_connect.c - third_party/mqtt/send_disconnect.c - third_party/mqtt/send_mosq.c - third_party/mqtt/send_publish.c - third_party/mqtt/send_subscribe.c - third_party/mqtt/send_unsubscribe.c - third_party/mqtt/send_mosq.c - third_party/mqtt/socks_mosq.c - third_party/mqtt/srv_mosq.c - third_party/mqtt/strings_mosq.c - third_party/mqtt/thread_mosq.c - third_party/mqtt/time_mosq.c - third_party/mqtt/tls_mosq.c - third_party/mqtt/utf8_mosq.c - third_party/mqtt/util_mosq.c - third_party/mqtt/util_topic.c - third_party/mqtt/will_mosq.c) + third_party/mqtt/actions.c + third_party/mqtt/callbacks.c + third_party/mqtt/connect.c + third_party/mqtt/handle_auth.c + third_party/mqtt/handle_connack.c + third_party/mqtt/handle_disconnect.c + third_party/mqtt/handle_ping.c + third_party/mqtt/handle_pubackcomp.c + third_party/mqtt/handle_publish.c + third_party/mqtt/handle_pubrec.c + third_party/mqtt/handle_pubrel.c + third_party/mqtt/handle_suback.c + third_party/mqtt/handle_unsuback.c + third_party/mqtt/helpers.c + third_party/mqtt/logging_mosq.c + third_party/mqtt/loop.c + third_party/mqtt/memory_mosq.c + third_party/mqtt/messages_mosq.c + third_party/mqtt/misc_mosq.c + third_party/mqtt/mosquitto.c + third_party/mqtt/net_mosq_ocsp.c + third_party/mqtt/net_mosq.c + third_party/mqtt/options.c + third_party/mqtt/packet_datatypes.c + third_party/mqtt/packet_mosq.c + third_party/mqtt/property_mosq.c + third_party/mqtt/read_handle.c + third_party/mqtt/send_connect.c + third_party/mqtt/send_disconnect.c + third_party/mqtt/send_mosq.c + third_party/mqtt/send_publish.c + third_party/mqtt/send_subscribe.c + third_party/mqtt/send_unsubscribe.c + third_party/mqtt/send_mosq.c + third_party/mqtt/socks_mosq.c + third_party/mqtt/srv_mosq.c + third_party/mqtt/strings_mosq.c + third_party/mqtt/thread_mosq.c + third_party/mqtt/time_mosq.c + third_party/mqtt/tls_mosq.c + third_party/mqtt/utf8_mosq.c + third_party/mqtt/util_mosq.c + third_party/mqtt/util_topic.c + third_party/mqtt/will_mosq.c) # set (APP_SRCS ${APP_SRCS} hwmqtt/hwmqtt.cpp) add_definitions(-DUSE_MQTT) add_definitions(-DWITH_SOCKS) @@ -228,6 +228,7 @@ if (UNIX) set (APP_LIBS ${APP_LIBS} pthread) set (APP_LIBS ${APP_LIBS} dl) set (APP_LIBS ${APP_LIBS} rt) + #set (APP_LIBS ${APP_LIBS} websockets) endif() add_executable (application ${APP_SRCS}) diff --git a/das-dn/cmg/main.cpp b/das-dn/cmg/main.cpp index 2b7c6e5a..d532f0be 100644 --- a/das-dn/cmg/main.cpp +++ b/das-dn/cmg/main.cpp @@ -21,10 +21,24 @@ #include #include #include +#if 1 #include #include + +#include "KompexSQLitePrerequisites.h" +#include "KompexSQLiteDatabase.h" +#include "KompexSQLiteStatement.h" +#include "KompexSQLiteException.h" +#include "KompexSQLiteStreamRedirection.h" +#include "KompexSQLiteBlob.h" +using namespace Kompex; +#else +#include +#endif #include +#if 0 #include +#endif #include #include #include @@ -35,7 +49,7 @@ #include "ytlog.h" #include "public.h" -#define MAX_MSG_COUNT 64 +#define MAX_MSG_COUNT 4096 bool g_dataAcquisitionReload = false; std::string g_traceId; @@ -47,8 +61,14 @@ LONG m_ycbwload = 0; typedef std::vector stringvector; stringvector m_gLinkIDs; //链路名和Id -typedef std::unordered_map name2intmap; -name2intmap unitname2uid_map; +//typedef std::unordered_map name2intmap; + +typedef struct { + short uid; + std::string address; +} struct_uidaddress; +typedef std::unordered_map name2unitmap; +//name2intmap unitname2uid_map; #define CMD_CONTROL_OPERATION 0 #define CMD_CONTROL_SETTING 1 @@ -69,11 +89,58 @@ typedef struct { } struct_attr; typedef std::vector attrvector; +SQLiteDatabase m_database; +#if 0 struct { Json::Int64 m_iLinkIrn; time_t m_iStartTime; } struMonLinkLog; +#endif +#if 1 +#define WORKER_ID_BITS 5 +#define DATA_CENTER_ID_BITS 5 +#define SEQUENCE_BITS 12 +#define MAX_WORKER_ID ((1 << WORKER_ID_BITS) - 1) +#define MAX_DATA_CENTER_ID ((1 << DATA_CENTER_ID_BITS) - 1) +#define SEQUENCE_MASK ((1 << SEQUENCE_BITS) - 1) +#define EPOCH 1640995200000 // 2022-01-01 00:00:00 + +typedef struct { + uint64_t worker_id; + uint64_t data_center_id; + uint64_t sequence; + uint64_t last_timestamp; +} Snowflake; + +uint64_t current_time() { + struct timeval tv; + gettimeofday(&tv, NULL); + return (tv.tv_sec * 1000) + (tv.tv_usec / 1000); +} + +uint64_t snowflake_next_id(Snowflake *sf) { + uint64_t timestamp = current_time(); + if (timestamp < sf->last_timestamp) { + return 0; + } + if (sf->last_timestamp == timestamp) { + sf->sequence = (sf->sequence + 1) & SEQUENCE_MASK; + if (sf->sequence == 0) { + while (current_time() <= sf->last_timestamp); + } + } else { + sf->sequence = 0; // reset sequence + } + sf->last_timestamp = timestamp; + return ((timestamp - EPOCH) << (WORKER_ID_BITS + DATA_CENTER_ID_BITS + SEQUENCE_BITS)) | + (sf->data_center_id << (WORKER_ID_BITS + SEQUENCE_BITS)) | + (sf->worker_id << SEQUENCE_BITS) | + sf->sequence; +} + +Snowflake g_sf = {1, 1, 0, 0}; +#endif std::vector split(const std::string &s, char delimiter) { std::vector tokens; @@ -136,6 +203,7 @@ bool configInitializeMemory(void) memset(config_database.yts, 0, sizeof(struYT) * DATABASE_YT_NUM); strcpy(config_system32.yk_pass, "12345"); + snprintf(config_system32.version, sizeof(config_system32.version), "%s", "0"); config_system32.yk_keep = 30; config_system32.interval_time = 2; config_system32.log_enabled = FALSE; @@ -326,7 +394,7 @@ bool configWriteDatabaseCFG(void) return false; } - +#if 1 BYTE *websocket_msg_join(noPollMsg **msg, int msg_count, int *buffer_size) { BYTE* buffer; @@ -384,6 +452,9 @@ int websocket_write(noPollConn* conn, const char * buffer, int buffer_len) return -1; #endif } +#else + +#endif bool publish_sensor_data(noPollConn* conn, const std::string traceId, const char* command, const Json::Value payload) { @@ -394,10 +465,14 @@ bool publish_sensor_data(noPollConn* conn, const std::string traceId, const char Json::Value jsonRoot; jsonRoot["cmd"] = command; - char str[128]; + char str[128]; +#if 0 uuid_t uuid; uuid_generate_time(uuid); - uuid_unparse_upper(uuid, str); + uuid_unparse_upper(uuid, str); +#else + snprintf(str, sizeof(str), "%lld", snowflake_next_id(&g_sf)); +#endif if (traceId == "") { jsonRoot["cmdId"] = str; } else { @@ -410,38 +485,17 @@ bool publish_sensor_data(noPollConn* conn, const std::string traceId, const char jsonRoot["data"] = payload; std::string outputConfig = Json::writeString(builder, jsonRoot); - vLog(LOG_DEBUG, "send cmd: %s, payload: %d\n", command, outputConfig.length()); + vLog(LOG_DEBUG, "send cmd: %s, payload: %d\n%s\n", command, outputConfig.length(), outputConfig.c_str()); int rc = websocket_write(conn, outputConfig.c_str(), outputConfig.length()); if (rc != 0) { - vLog(LOG_DEBUG, "websocket write is error<%d>。\n", rc); + vLog(LOG_DEBUG, "websocket write is error<%d>,insert into database.\n", rc); + //插入数据库 return false; } return true; } -#if 1 -static int GetProcessIDByIRN(QLONG IRN) -{ - int i; - for (i = 0; i < PROCESSES_NUM; i++) - { - if (config.processes[i].irn == IRN) - { - return i; - } - } - - return -1; -} - -static QLONG GetUnitIRNByUid(int uid) -{ - if (uid < 0 || uid >= UNIT_NUM) return -1; - return config.units[uid].irn; -} -#endif - static int GetUnitYXCount(int uid) { if (uid < 0 || uid >= UNIT_NUM) return 0; @@ -474,14 +528,11 @@ static float GetUnitYCReal(int uid, int order) if (order < 0 || order >= pUnit->yccount) return 0; pYC = &pUnit->ycs[order]; udb = pYC->order; - if (udb < 0 || udb >= DATABASE_YC_NUM) - { + if (udb < 0 || udb >= DATABASE_YC_NUM) { value = pYC->value; coef = 1.0f; base = 0.0f; - } - else - { + } else { value = database.ycs[udb].value; coef = pYC->coef; base = pYC->base; @@ -506,13 +557,10 @@ static float GetUnitYCRealFromValue(int uid, int order, long value) if (order < 0 || order >= pUnit->yccount) return 0; pYC = &pUnit->ycs[order]; udb = pYC->order; - if (udb < 0 || udb >= DATABASE_YC_NUM) - { + if (udb < 0 || udb >= DATABASE_YC_NUM) { coef = 1.0f; base = 0.0f; - } - else - { + } else { coef = pYC->coef; base = pYC->base; } @@ -533,14 +581,11 @@ static float GetUnitYMReal(int uid, int order) if (order < 0 || order >= pUnit->ymcount) return 0; pYM = &pUnit->yms[order]; udb = pYM->order; - if (udb < 0 || udb >= DATABASE_YM_NUM) - { + if (udb < 0 || udb >= DATABASE_YM_NUM) { value = pYM->value; coef = 1.0f; base = 0.0f; - } - else - { + } else { value = (long)database.yms[udb].value; pYM->update_time = database.yms[udb].update_time; coef = pYM->coef; @@ -563,12 +608,9 @@ static BYTE GetUnitYX(int uid, int point) if (point < 0 || point >= pUnit->yxcount) return 0; pYX = &pUnit->yxs[point]; udb = pYX->order; - if (udb < 0 || udb >= DATABASE_YX_NUM) - { + if (udb < 0 || udb >= DATABASE_YX_NUM) { value = pYX->value; - } - else - { + } else { value = database.yxs[udb].value; pYX->value = value; pYX->update_time = database.yxs[udb].update_time; @@ -576,64 +618,8 @@ static BYTE GetUnitYX(int uid, int point) return value; } -static QLONG GetUnitYXIRNByPoint(int uid, int point) -{ - struUnit* pUnit; - if (uid < 0 || uid >= UNIT_NUM) return -1; - - pUnit = &config.units[uid]; - if (pUnit->yxcount <= 0) return -1; - if (point < 0 || point >= pUnit->yxcount) return -1; - return pUnit->yxs[point].irn; -} - -static QLONG GetUnitYCIRNByPoint(int uid, int point) -{ - struUnit* pUnit; - if (uid < 0 || uid >= UNIT_NUM) return -1; - - pUnit = &config.units[uid]; - if (pUnit->yccount <= 0) return -1; - if (point < 0 || point >= pUnit->yccount) return -1; - return pUnit->ycs[point].irn; -} - -static QLONG GetUnitYMIRNByPoint(int uid, int point) -{ - struUnit* pUnit; - if (uid < 0 || uid >= UNIT_NUM) return -1; - - pUnit = &config.units[uid]; - if (pUnit->ymcount <= 0) return -1; - if (point < 0 || point >= pUnit->ymcount) return -1; - return pUnit->yms[point].irn; -} - -static QLONG GetUnitYKIRNByPoint(int uid, int point) -{ - struUnit* pUnit; - if (uid < 0 || uid >= UNIT_NUM) return -1; - - pUnit = &config.units[uid]; - if (pUnit->ykcount <= 0) return -1; - if (point < 0 || point >= pUnit->ykcount) return -1; - return pUnit->yks[point].irn; -} - -static QLONG GetUnitYTIRNByPoint(int uid, int point) -{ - struUnit* pUnit; - if (uid < 0 || uid >= UNIT_NUM) return -1; - - pUnit = &config.units[uid]; - if (pUnit->ytcount <= 0) return -1; - if (point < 0 || point >= pUnit->ytcount) return -1; - return pUnit->yts[point].irn; -} - int GetUnitYXBW(int& uid, BOOLEAN& value, BYTE& qds, int& type, unionCP56Time& st) { - int i; int order; int point; @@ -647,12 +633,10 @@ int GetUnitYXBW(int& uid, BOOLEAN& value, BYTE& qds, int& type, unionCP56Time& s int GetUnitSOE(int& uid, BOOLEAN& value, BYTE& qds, unionCP56Time& st) { - int i; int order; int point; - while (soe.GetSOE(m_soeload, st, order, value, qds, uid, point)) - { + while (soe.GetSOE(m_soeload, st, order, value, qds, uid, point)) { m_soeload++; return point; } @@ -662,12 +646,10 @@ int GetUnitSOE(int& uid, BOOLEAN& value, BYTE& qds, unionCP56Time& st) int GetUnitYCBW(int& uid, LONG& value, BYTE& qds, int& type, unionCP56Time& st) { - int i; int order; int point; - while (ycbw.GetYCBW(m_ycbwload, st, order, value, qds, uid, point, type)) - { + while (ycbw.GetYCBW(m_ycbwload, st, order, value, qds, uid, point, type)) { m_ycbwload++; return point; } @@ -700,14 +682,12 @@ BOOLEAN GetUnitYK(int uid, int& order, BYTE& value, BYTE& act, BYTE& result) pYK->result = YKR_OPER; yklog.PushYKLog(system32.now, udb, value, YKT_SELRET, YKS_PROC, uid); return TRUE; - } - else if (result == YKR_FAIL) { + } else if (result == YKR_FAIL) { pYK->state = YKS_IDLE; pYK->result = YKR_IDLE; pYK->op_unit = -1; return TRUE; - } - else if (result == YKR_IDLE) { + } else if (result == YKR_IDLE) { pYK->state = YKS_IDLE; pYK->op_unit = -1; } @@ -836,14 +816,12 @@ BOOLEAN GetUnitYT(int uid, int& order, DWORD& value, BYTE& act, BYTE& result) pYT->result = YTR_OPER; ytlog.PushYTLog(system32.now, udb, value, YTT_SELRET, YTS_PROC, uid); return TRUE; - } - else if (result == YTR_FAIL) { + } else if (result == YTR_FAIL) { pYT->state = YTS_IDLE; pYT->result = YTR_IDLE; pYT->op_unit = -1; return TRUE; - } - else if (result == YTR_IDLE) { + } else if (result == YTR_IDLE) { pYT->state = YTS_IDLE; pYT->op_unit = -1; } @@ -963,36 +941,26 @@ int MakeYKFrame(noPollConn* conn, int uid) //发送确认 Json::Value jsonRoot; - jsonRoot["irn"] = (Json::Int64)GetUnitYKIRNByPoint(uid, order); - jsonRoot["operType"] = CMD_CONTROL_OPERATION; - jsonRoot["operValue"] = value; - if (YKS_SELED == action && YKR_FAIL == result) - { + jsonRoot["deviceId"] = static_units[uid].deviceId; + jsonRoot["serviceName"] = (const char *)config.units[uid].yks[order].name; + jsonRoot["opValue"] = value; + if (YKS_SELED == action && YKR_FAIL == result) { action = YKS_ABRED; - } - else if (YKS_SELREQ == action && YKR_OVER == result) - { + } else if (YKS_SELREQ == action && YKR_OVER == result) { action = YKS_ABRED; - } - else if (YKS_SELING == action && YKR_OVER == result) - { + } else if (YKS_SELING == action && YKR_OVER == result) { action = YKS_ABRED; } - if (YKS_SELED == action) - { + if (YKS_SELED == action) { SetUnitYK(uid, order, value, YKS_EXEREQ, YKR_IDLE); vLog(LOG_WARN, "Unit(%d) yk(%d) %s state is YKS_EXEREQ result is YKR_IDLE.\n", uid, order, (value ? "CLOSE" : "TRIP")); return 1; - } - else if (YKS_ABRED == action) - { + } else if (YKS_ABRED == action) { SetUnitYK(uid, order, value, YKS_ABRREQ, YKR_IDLE); vLog(LOG_WARN, "Unit(%d) yk(%d) %s state is YKS_ABRREQ result is YKR_IDLE.\n", uid, order, (value ? "CLOSE" : "TRIP")); jsonRoot["result"] = false; - } - else - { + } else { jsonRoot["result"] = true; } @@ -1018,35 +986,25 @@ int MakeYTFrame(noPollConn* conn, int uid) //发送确认 Json::Value jsonRoot; - jsonRoot["irn"] = (Json::Int64)GetUnitYTIRNByPoint(uid, order); - jsonRoot["operType"] = CMD_CONTROL_SETTING; - jsonRoot["operValue"] = value; - if (YTS_SELED == action && YTR_FAIL == result) - { + jsonRoot["deviceId"] = static_units[uid].deviceId; + jsonRoot["serviceName"] = (const char *)config.units[uid].yts[order].name; + jsonRoot["opValue"] = value; + if (YTS_SELED == action && YTR_FAIL == result) { + action = YTS_ABRED; + } else if (YTS_SELREQ == action && YTR_OVER == result) { + action = YTS_ABRED; + } else if (YTS_SELING == action && YTR_OVER == result) { action = YTS_ABRED; } - else if (YTS_SELREQ == action && YTR_OVER == result) - { - action = YTS_ABRED; - } - else if (YTS_SELING == action && YTR_OVER == result) - { - action = YTS_ABRED; - } - if (YTS_SELED == action) - { + if (YTS_SELED == action) { SetUnitYT(uid, order, value, YTS_EXEREQ, YTR_IDLE); vLog(LOG_DEBUG, "Unit(%d) set point(%d) %d state is YTS_EXEREQ result is YTR_IDLE.\n", uid, order, value); return 1; - } - else if (YTS_ABRED == action) - { + } else if (YTS_ABRED == action) { SetUnitYT(uid, order, value, YTS_ABRREQ, YTR_IDLE); vLog(LOG_DEBUG, "Unit(%d) set point(%d) %d state is YTS_ABRREQ result is YTR_IDLE.\n", uid, order, value); jsonRoot["result"] = false; - } - else - { + } else { jsonRoot["result"] = true; } @@ -1090,8 +1048,7 @@ bool OnReceivedDeviceCommand(const Json::Value jsonRoot) SetUnitYK(uid, point, (operValue & 0x03), YKS_SELREQ, YKR_IDLE); vLog(LOG_WARN, "Unit(%d) yk(%d) %s state is YKS_SELREQ result is YKR_IDLE.\n", uid, point, ((operValue & 0x03) ? "CLOSE" : "TRIP")); } else if (operType == CMD_CONTROL_SETTING) { //遥调 - union main - { + union { float fval; DWORD dval; } operValue; @@ -1111,35 +1068,9 @@ bool OnReceivedDeviceCommand(const Json::Value jsonRoot) return TRUE; } -bool OnReceivedLogCommand(const Json::Value jsonRoot) +static BOOLEAN processUartParam(const Json::Value jsonRoot, int ord) { -#if 0 - if (jsonRoot["linkIrn"].isNull()) return FALSE; - QLONG linkIrn = -1; - if (jsonRoot["linkIrn"].isInt64()) linkIrn = jsonRoot["linkIrn"].asInt64(); - struMonLinkLog.m_iStartTime = system32.timers; - struMonLinkLog.m_iLinkIrn = linkIrn; - channelBuffer.enabled = TRUE; - channelBuffer.mon_port = GetProcessIDByIRN(linkIrn);; - vLog(LOG_DEBUG, "receive action LINK_LOG_REQUEST, irn is:%lld, pid is: %d\n", struMonLinkLog.m_iLinkIrn, channelBuffer.mon_port); -#endif - return true; -} - - -static BOOLEAN processUartParam(const char *jsonStr, int ord) -{ - Json::Value jsonRoot; - const int rawJsonLength = strlen(jsonStr); - std::string err; - - Json::CharReaderBuilder builder; - builder["emitUTF8"] = true; - Json::CharReader* reader(builder.newCharReader()); - if (!reader->parse(jsonStr, jsonStr + rawJsonLength, &jsonRoot, &err)) { - vLog(LOG_ERROR, "串口参数:Json 格式错误!\n"); - return FALSE; - } + if (ord < 0 || ord >= HARDWARE_PORTS_NUM) return FALSE; config_config.hardware.ports[ord].state = TRUE; snprintf(config_config.hardware.ports[ord].name, sizeof(config_config.hardware.ports[ord].name), "ttyS%d", ord); @@ -1182,20 +1113,38 @@ static BOOLEAN processUartParam(const char *jsonStr, int ord) return TRUE; } - -static BOOLEAN processHostIEC104ProcessParam(const char *jsonStr, int pid) +static BOOLEAN processNetworkParam(const Json::Value jsonRoot, int pid) { - Json::Value jsonRoot; - const int rawJsonLength = strlen(jsonStr); - std::string err; + if (pid < 0 || pid >= PROCESSES_NUM) return FALSE; - Json::CharReaderBuilder builder; - builder["emitUTF8"] = true; - Json::CharReader* reader(builder.newCharReader()); - if (!reader->parse(jsonStr, jsonStr + rawJsonLength, &jsonRoot, &err)) { - vLog(LOG_ERROR, "HostIEC104参数:Json 格式错误!\n"); - return FALSE; + config_config.processes[pid].option.network.ignored_source = TRUE; + config_config.processes[pid].option.network.socket_type = SOCK_STREAM; + config_config.processes[pid].option.network.bind_addr = INADDR_ANY; + config_config.processes[pid].option.network.bind_port = 0; + config_config.processes[pid].option.network.target_addr = INADDR_ANY; + config_config.processes[pid].option.network.target_port = 502; + + if (jsonRoot["host"].isInt()) { + config_config.processes[pid].option.network.target_addr = jsonRoot["host"].asInt(); + } else if (jsonRoot["host"].isString()) { + if (inet_pton(AF_INET, jsonRoot["host"].asCString(), &config_config.processes[pid].option.network.target_addr) == 1) { + vLog(LOG_DEBUG, "IPv4 地址转换成功,网络字节序为: %u.\n", config_config.processes[pid].option.network.target_addr); + } else { + vLog(LOG_ERROR, "inet_pton error(%d,%s).\n", errno, strerror(errno)); + } } + if (jsonRoot["port"].isInt()) { + config_config.processes[pid].option.network.target_port = jsonRoot["port"].asInt(); + } else if (jsonRoot["port"].isString()) { + config_config.processes[pid].option.network.target_port = atoi(jsonRoot["port"].asCString()); + } +} + +static BOOLEAN processHostIEC104ProcessParam(const Json::Value jsonRoot, int pid) +{ + if (pid < 0 || pid >= PROCESSES_NUM) return FALSE; + + config_config.processes[pid].option.iec104.net.ignored_source = TRUE; config_config.processes[pid].option.iec104.net.socket_type = SOCK_STREAM; config_config.processes[pid].option.iec104.net.target_addr = INADDR_ANY; config_config.processes[pid].option.iec104.net.target_port = 2404; @@ -1249,7 +1198,13 @@ static BOOLEAN processHostIEC104ProcessParam(const char *jsonStr, int pid) static bool dealConfigFile(const Json::Value jsonRoot) { + bool result = false; do { + FILE* pf = fopen("aaa.json", "w+"); + if (pf) { + fwrite(jsonRoot.toStyledString().c_str(), jsonRoot.toStyledString().length(), 1, pf); + fclose(pf); + } if (!configInitializeMemory()) { vLog(LOG_ERROR, "Fail initialize memory!\n"); break; @@ -1258,8 +1213,15 @@ static bool dealConfigFile(const Json::Value jsonRoot) vLog(LOG_ERROR, "配置文件格式错误,缺少节点ID的配置信息,请检查。\n"); break; } - - name2intmap deviceIDs; + if (jsonRoot["version"].isNull()) { + vLog(LOG_WARN, "配置文件格式错误,缺少版本信息,请检查。\n"); + } + if (jsonRoot["version"].isInt()) { + snprintf(config_system32.version, sizeof(config_system32.version), "%d", jsonRoot["version"].asInt()); + } else if (jsonRoot["version"].isString()) { + snprintf(config_system32.version, sizeof(config_system32.version), "%d", jsonRoot["version"].asCString()); + } + name2unitmap deviceIDs; deviceIDs.clear(); //更新节点配置 config_nodes.m_node[0].m_netnode_no = 0; @@ -1292,15 +1254,28 @@ static bool dealConfigFile(const Json::Value jsonRoot) const Json::Value equipment = equipments[i]; int uid = i; - snprintf(config_static_units[uid].name, sizeof(config_static_units[uid].name), "%s", "device"); - snprintf(config_static_units[uid].model, sizeof(config_static_units[uid].model), "%s", "model"); + snprintf(config_static_units[uid].name, sizeof(config_static_units[uid].name), "device_%d", uid); + snprintf(config_static_units[uid].model, sizeof(config_static_units[uid].model), "model_%d", uid); snprintf(config_static_units[uid].manufacturerId, sizeof(config_static_units[uid].manufacturerId), "%s", "iss"); + + std::string id = ""; + std::string address = ""; if (equipment["id"].isString()) { - std::string id = equipment["id"].asString(); - deviceIDs.insert(name2intmap::value_type(id, uid)); + id = equipment["id"].asString(); snprintf(config_static_units[uid].deviceId, sizeof(config_static_units[uid].deviceId), "%s", id.c_str()); } else { - snprintf(config_static_units[uid].deviceId, sizeof(config_static_units[uid].deviceId), "%s", "iss"); + snprintf(config_static_units[uid].deviceId, sizeof(config_static_units[uid].deviceId), "iss_%d", uid); + } + if (equipment["addr"].isString()) { + address = equipment["addr"].asString(); + } + struct_uidaddress uidAddress; + uidAddress.uid = uid; + uidAddress.address = address; + if (deviceIDs.find(id) == deviceIDs.end()) { + deviceIDs.insert(name2unitmap::value_type(id, uidAddress)); + } else { + vLog(LOG_DEBUG, "found two devices have the same id.\n"); } attrvector yxs, ycs, yms, yks, yts; @@ -1335,7 +1310,9 @@ static bool dealConfigFile(const Json::Value jsonRoot) else if (type == "yt") yts.push_back(name_param); } } - + config_config.units[uid].value = SPI_ON; + config_config.units[uid].softdog = UNIT_WATCHDOG_TIME; + config_config.units[uid].state = TRUE; config_config.units[uid].type = MASTER_UNIT; config_config.units[uid].yxcount = yxs.size(); @@ -1351,6 +1328,8 @@ static bool dealConfigFile(const Json::Value jsonRoot) for (int k = 0; k < config_config.units[uid].yccount; k++) { snprintf(config_config.units[uid].ycs[k].name, sizeof(config_config.units[uid].ycs[k].name), "%s", ycs[k].name.c_str()); config_config.units[uid].ycs[k].order = ycorder++; + config_config.units[uid].ycs[k].value = 0; + config_config.units[uid].ycs[k].qds = 0x80; //默认为无效 config_config.units[uid].ycs[k].factor = 1; config_config.units[uid].ycs[k].change_pos = 2; config_config.units[uid].ycs[k].coef = 1.0f; @@ -1394,6 +1373,7 @@ static bool dealConfigFile(const Json::Value jsonRoot) for (int k = 0; k < config_config.units[uid].ymcount; k++) { snprintf(config_config.units[uid].yms[k].name, sizeof(config_config.units[uid].yms[k].name), "%s", yms[k].name.c_str()); config_config.units[uid].yms[k].order = ymorder++; + config_config.units[uid].yms[k].value = 0; config_config.units[uid].yms[k].coef = 1.0f; config_config.units[uid].yms[k].base = 0; if (config_config.units[uid].type == MASTER_UNIT) { @@ -1445,6 +1425,8 @@ static bool dealConfigFile(const Json::Value jsonRoot) for (int k = 0; k < config_config.units[uid].yxcount; k++) { snprintf(config_config.units[uid].yxs[k].name, sizeof(config_config.units[uid].yxs[k].name), "%s", yxs[k].name.c_str()); config_config.units[uid].yxs[k].order = yxorder++; + config_config.units[uid].yxs[k].value = 0; + config_config.units[uid].yxs[k].qds = 0x80; //默认为无效 config_config.units[uid].yxs[k].invert = 0; config_database.yxs[config_config.units[uid].yxs[k].order].auto_reset = 0; @@ -1469,9 +1451,8 @@ static bool dealConfigFile(const Json::Value jsonRoot) //更新链路配置文件 const Json::Value links = jsonRoot["links"]; int count = links.size(); - int uid = 0; int uartId = 0; - //my_units_map.clear(); + m_gLinkIDs.clear(); for (int i = 0; i < count; i++) { const Json::Value link = links[i]; @@ -1486,25 +1467,29 @@ static bool dealConfigFile(const Json::Value jsonRoot) snprintf(config_config.processes[i].name, sizeof(config_config.processes[i].name), "%s", link["linkName"].asCString()); } if (link["linkId"].isString()) { - m_gLinkIDs.push_back(link["linkId"].asString()); + config_config.processes[i].irn = strtoll(link["linkId"].asCString(), NULL, 10); } if (link["protocol"].isInt()) { config_config.processes[i].proto = link["protocol"].asInt(); } + BYTE addrType = ADDR_TYPE_NORMAL; //根据协议设定单元地址类型 //处理链路参数,根据协议参数的不同来处理 const Json::Value params = link["params"]; if (!params.isNull()) { switch (config_config.processes[i].proto) { case PROTOCOL_HOST_MODBUS_RTU: - processUartParam(params.asCString(), uartId); + processUartParam(params, uartId); config_config.processes[i].order = uartId++; break; case PROTOCOL_HOST_MODBUS_TCP: case PROTOCOL_HOST_MODBUS_RTU_TCP: + addrType = ADDR_TYPE_IPV4; + processNetworkParam(params, i); break; case PROTOCOL_HOST_IEC104: - processHostIEC104ProcessParam(params.asCString(), i); + addrType = ADDR_TYPE_IPV4_FACNO; + processHostIEC104ProcessParam(params, i); break; } } @@ -1515,7 +1500,37 @@ static bool dealConfigFile(const Json::Value jsonRoot) for (int j = 0; j < size; j++) { std::string id = devices[j].asCString(); if (deviceIDs.find(id) != deviceIDs.end()) { - config_config.processes[i].units[j] = deviceIDs[id]; + int uid = deviceIDs[id].uid; + std::string address = deviceIDs[id].address; + config_config.processes[i].units[j] = uid; + //根据协议修改地址 + if (address != "") { + if (addrType == ADDR_TYPE_HEX) { + StringToHex(address.c_str(), config_config.units[uid].addr); + } else if (addrType == ADDR_TYPE_IPV4) { + DWORD addr; + inet_pton(AF_INET, address.c_str(), (struct in_addr*)&addr); + memcpy(config_config.units[uid].addr, &addr, sizeof(addr)); + } else if (addrType == ADDR_TYPE_IPV4_FACNO) { + std::vector tokens = split(address, ':'); + if (tokens.size() >= 2) { + DWORD addr; + inet_pton(AF_INET, tokens.at(0).c_str(), (struct in_addr*)&addr); + memcpy(config_config.units[uid].addr, &addr, sizeof(addr)); + addr = (DWORD)atoi(tokens.at(1).c_str()); + memcpy(&config_config.units[uid].addr[4], &addr, sizeof(addr)); + } else { + DWORD addr; + inet_pton(AF_INET, tokens.at(0).c_str(), (struct in_addr*)&addr); + memcpy(config_config.units[uid].addr, &addr, sizeof(addr)); + addr = 1; + memcpy(&config_config.units[uid].addr[4], &addr, sizeof(addr)); + } + } else { + DWORD addr = (DWORD)atoi(address.c_str()); + memcpy(config_config.units[uid].addr, &addr, sizeof(addr)); + } + } } } } @@ -1530,21 +1545,20 @@ static bool dealConfigFile(const Json::Value jsonRoot) configWriteDatabaseCFG(); //保存静态文件 configWriteStaticUnitCFG(); //units.sta - } while (0); - //查询设备配置 - //vLog(LOG_DEBUG, "mosq is: %ld, App name is: %s, and messageId is: %s\n", response->mosq, response->app, response->messageId); - - pthread_exit(0); - return NULL; + result = true; + } while (0); + + //重启application + if (result) { + g_dataAcquisitionReload = true; + } else { + vLog(LOG_WARN, "配置文件读取失败,请检查后下发!\n"); + } } bool OnReceivedSystemAction(noPollConn* conn, const std::string cmdId, const std::string cmd, const Json::Value data) { - int rc; - int code = 0; - std::string reason; - do { if (cmd == "configUpdate") { //配置更新 g_traceId = cmdId; @@ -1552,8 +1566,6 @@ bool OnReceivedSystemAction(noPollConn* conn, const std::string cmdId, const std } else if (cmd == "deviceControl") { g_traceId = cmdId; //response命令的traceId OnReceivedDeviceCommand(data); - } else if (cmd == "LINK_LOG_REQUEST") { - OnReceivedLogCommand(data); } else { vLog(LOG_DEBUG, "command: %s is not supported.\n", cmd.c_str()); } @@ -1573,6 +1585,8 @@ void on_message(noPollConn* conn, const char *msg, const int size) Json::CharReaderBuilder builder; Json::CharReader* reader(builder.newCharReader()); + vLog(LOG_DEBUG, "Received: %s.\n", msg); + do { if (!reader->parse(msg, msg + size, &jsonRoot, &err)) { vLog(LOG_ERROR, "reader->parse(msg, msg + size, &jsonRoot, &err) error<%d,%s>。\n", errno, err.c_str()); @@ -1618,6 +1632,9 @@ void stop(int signo) if (zlog_inited) zlog_fini(); + m_database.SaveDatabaseFromMemoryToFile("./ry-das-dn.db"); + m_database.Close(); + signo = 0; pthread_mutex_unlock(&mutex); exit(signo); @@ -1630,13 +1647,15 @@ void heart_beat(noPollConn* conn, int status) payload["ttl"] = 30000; payload["status"] = status; - if (status == 1) - { + if (status == 1) { Json::Value jsonItem; Json::Value jsonValue; - for (int i = 0; i < m_gLinkIDs.size(); i++) { + //for (int i = 0; i < static_cast(m_gLinkIDs.size()); i++) { + for (int i = 0; i < PROCESSES_NUM; i++) { if (config.processes[i].state == TRUE) { - jsonValue["linkId"] = m_gLinkIDs.at(i); + char linkId[32]; + snprintf(linkId, sizeof(linkId), "%lld", config.processes[i].irn); + jsonValue["linkId"] = linkId; jsonValue["online"] = (config.processes[i].softdog >= PROCESS_WATCHDOG_TIME) ? false : true; jsonItem.append(jsonValue); } @@ -1649,6 +1668,7 @@ void heart_beat(noPollConn* conn, int status) publish_sensor_data(conn, "", "heartbeat", payload); } +#if 0 DWORD OnReadChannelRingBuff(char *rbuff, DWORD len) { DWORD l; @@ -1702,124 +1722,115 @@ BOOLEAN publishMonLinkLog(noPollConn* conn) publish_sensor_data(conn, "", "LINK_LOG", jsonRoot); } +#endif bool publishinitDeviceData(noPollConn* conn, int uid) { if (uid < 0 || uid >= UNIT_NUM) return false; + if ((config.units[uid].state & 0x80) != 0x80) { + return false; + } + if ((config.units[uid].state & 0x40) == 0x40) { //该设备已经发送过初始化 + return false; + } + Json::Value root; Json::Value values; int i; int count = GetUnitYCCount(uid); if (count) { - //Json::Value jsonValue; for (i = 0; i < count; i++) { values[(const char *)config.units[uid].ycs[i].name] = GetUnitYCReal(uid, i); - //values.append(jsonValue); } } count = GetUnitYMCount(uid); if (count) { - //Json::Value jsonValue; for (i = 0; i < count; i++) { values[(const char *)config.units[uid].yms[i].name] = GetUnitYMReal(uid, i); - //values.append(jsonValue); } } count = GetUnitYXCount(uid); if (count) { - //Json::Value jsonValue; for (i = 0; i < count; i++) { values[(const char *)config.units[uid].yxs[i].name] = GetUnitYX(uid, i); - //values.append(jsonValue); } } if (values.size()) { root["deviceId"] = static_units[uid].deviceId; root["values"] = values; + + config.units[uid].state |= 0x40; return publish_sensor_data(conn, "", "initDeviceData", root); } return false; } -bool publishRealData(noPollConn* conn, int uid) +bool publishAnalogData(noPollConn* conn, int uid) { -#if 0 - int i; - int uid; - int count = 0; - - Json::Value jsonRoot; - QLONG dataTime = system32.timers; - dataTime *= 1000; - - Json::Value yxs; - Json::Value ycs; - Json::Value yms; - for (int k = 0; k < m_total_units.size(); k++) - { - uid = m_total_units.at(k); - if (config.units[uid].value == SPI_ON) continue; - count = GetUnitYCCount(uid); - if (count) - { - Json::Value jsonValue; - - for (i = 0; i < count; i++) - { - float value = GetUnitYCReal(uid, i); - jsonValue["irn"] = (Json::Int64)GetUnitYCIRNByPoint(uid, i); - jsonValue["dataValue"] = value; - jsonValue["dataTime"] = (Json::Int64)dataTime; - ycs.append(jsonValue); - } - } - - count = GetUnitYMCount(uid); - if (count) - { - Json::Value jsonItem; - Json::Value jsonValue; - - for (i = 0; i < count; i++) - { - float value = GetUnitYMReal(uid, i); - jsonValue["irn"] = (Json::Int64)GetUnitYMIRNByPoint(uid, i); - jsonValue["dataValue"] = value; - jsonValue["dataTime"] = (Json::Int64)dataTime; - yms.append(jsonValue); - } - } - - count = GetUnitYXCount(uid); - if (count) - { - Json::Value jsonItem; - Json::Value jsonValue; - - for (i = 0; i < count; i++) - { - BYTE value = GetUnitYX(uid, i); - jsonValue["irn"] = (Json::Int64)GetUnitYXIRNByPoint(uid, i); - jsonValue["dataValue"] = value; - jsonValue["dataTime"] = (Json::Int64)dataTime; - yxs.append(jsonValue); - } + if (uid < 0 || uid >= UNIT_NUM) return false; + Json::Value root; + Json::Value values; + int count = GetUnitYCCount(uid); + if (count) { + for (int i = 0; i < count; i++) { + values[(const char *)config.units[uid].ycs[i].name] = GetUnitYCReal(uid, i); } } -#if 0 - jsonRoot["pushTime"] = (Json::Int64)dataTime; -#endif - if (!yxs.isNull()) jsonRoot["yxs"] = yxs; - if (!ycs.isNull()) jsonRoot["ycs"] = ycs; - if (!yms.isNull()) jsonRoot["yms"] = yms; - if (jsonRoot.isNull()) return true; - return publish_sensor_data(conn, "", "REAL_DATA", jsonRoot); -#endif + if (values.size()) { + root["deviceId"] = static_units[uid].deviceId; + root["values"] = values; + return publish_sensor_data(conn, "", "analogData", root); + } return false; } +bool publishStateData(noPollConn* conn, int uid) +{ + if (uid < 0 || uid >= UNIT_NUM) return false; + Json::Value root; + Json::Value values; + int count = GetUnitYXCount(uid); + if (count) { + for (int i = 0; i < count; i++) { + values[(const char *)config.units[uid].yxs[i].name] = GetUnitYX(uid, i); + } + } + if (values.size()) { + root["deviceId"] = static_units[uid].deviceId; + root["values"] = values; + return publish_sensor_data(conn, "", "stateData", root); + } + return false; +} + +bool publishHistoryAnalogData(noPollConn* conn, int uid) +{ + if (uid < 0 || uid >= UNIT_NUM) return false; + Json::Value root; + Json::Value values; + if (values.size()) { + root["deviceId"] = static_units[uid].deviceId; + root["values"] = values; + return publish_sensor_data(conn, "", "historyAnalogData", root); + } + return false; +} + +bool publishHistoryStateData(noPollConn* conn, int uid) +{ + if (uid < 0 || uid >= UNIT_NUM) return false; + Json::Value root; + Json::Value values; + if (values.size()) { + root["deviceId"] = static_units[uid].deviceId; + root["values"] = values; + return publish_sensor_data(conn, "", "historyStateData", root); + } + return false; +} +#if 0 bool publishSOEData(noPollConn* conn) { int i, uid; @@ -1930,7 +1941,7 @@ bool publishYCBWData(noPollConn* conn) return publish_sensor_data(conn, "", "analogData", jsonRoot); } - +#endif void freeMem(void) { if (database.yxs) { @@ -1986,6 +1997,57 @@ void freeMem(void) } } +#if 0 +static int client_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { + switch (reason) { + case LWS_CALLBACK_CLIENT_ESTABLISHED: + printf("Client connected to server\n"); + + // 发送消息到服务器 + char *payload = "Hello, world.\n"; + lws_write(wsi, payload, strlen(payload), LWS_WRITE_TEXT); + break; + + case LWS_CALLBACK_CLIENT_RECEIVE: + printf("Received from server: %s\n", (char *)in); + break; + + case LWS_CALLBACK_CLOSED: + printf("Connection closed\n"); + lws_cancel_service(lws_get_context(wsi)); + break; + + default: + break; + } + return 0; +} + +static const struct lws_protocols protocols[] = { + { + "http-only", // 这不是WebSocket协议,但客户端可能需要处理HTTP重定向等 + NULL, + 0, + 0, NULL, 0 + }, + { + "lws-minimal-client", // WebSocket客户端协议名 + client_callback, + sizeof(struct per_session_data__), /* per_session_data should be sufficiently big */ + 0, NULL, 0 + }, + { NULL, NULL, 0, 0, NULL, 0 } /* Sentinel */ +}; +#endif + +void releaseAllUnits(void) +{ + for (int i = 0; i < UNIT_NUM; i++) { + if ((config.units[i].state & 0x01) != 0x01) continue; + config.units[i].state &= 0xBF; //0x40取反 + } +} + int main(int argc, char** argv) { int c; @@ -2079,7 +2141,39 @@ int main(int argc, char** argv) #if 1 noPollCtx *ctx = nopoll_ctx_new(); + nopoll_log_enable(ctx, nopoll_false); + nopoll_log_color_enable(ctx, nopoll_false); noPollConn *conn; + DWORD last_connect_sec = 0; +#else + struct lws_context_creation_info info; + struct lws_client_connect_info ccinfo; + struct lws_context *context; + + memset(&info, 0, sizeof info); + info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */ + info.protocols = protocols; + info.gid = -1; + info.uid = -1; + info.options = 0; + + context = lws_create_context(&info); + if (!context) { + fprintf(stderr, "libwebsocket init failed\n"); + return -1; + } + char url[512]; + snprintf(url, sizeof(url), "/node/%s/%s/", nodeId, version); + memset(&ccinfo, 0, sizeof ccinfo); + ccinfo.context = context; + ccinfo.address = host; + ccinfo.port = port; + ccinfo.path = url; + ccinfo.host = host; + ccinfo.origin = "origin"; + ccinfo.protocol = protocols[1].name; + ccinfo.ietf_version_or_minus_one = -1; /* auto */ + lws_client_connect_via_info(&ccinfo); #endif do { int status = 2; //0 - 离线, 1 - 在线, 2 - 未配置 @@ -2101,11 +2195,23 @@ int main(int argc, char** argv) vLog(LOG_DEBUG, "nodes.m_node[0].m_machine_name is: %s\n", nodes.m_node[0].m_machine_name); } } else { - //snprintf(nodeId, sizeof(nodeId), "%s", szHostCode); - vLog(LOG_DEBUG, "configed node id is: %s.\n", nodeId); + vLog(LOG_DEBUG, "configed node id is: %s.\n", nodes.m_node[0].m_machine_name); + snprintf(nodeId, sizeof(nodeId), "%s", nodes.m_node[0].m_machine_name); + if (system32.version[0] != '\0') { + snprintf(version, sizeof(version), "%s", system32.version); + } else { + snprintf(version, sizeof(version), "%s", "1"); + } if (enable_auto_platform) { //增加协议和单元配置的数量统计 -// m_total_processes.clear(); - //m_total_units.clear(); + //m_gLinkIDs.clear(); + unitname2service_map.clear(); +#if 0 + for (int i = 0; i < PROCESSES_NUM; i++) { + char id[128]; + snprintf(id, sizefof(id), "%lld", config.processes[i].irn); + m_gLinkIDs.push_back(id); + } +#endif for (int i = 0; i < UNIT_NUM; i++) { if (config.units[i].state != TRUE) continue; std::string unit_id = static_units[i].deviceId; @@ -2150,12 +2256,35 @@ int main(int argc, char** argv) #if 1 //创建WS链接 if (ctx != NULL) { - nopoll_conn_connect_timeout(ctx, 200); + nopoll_conn_connect_timeout(ctx, 50000); char url[512]; char cPort[64]; - snprintf(url, sizeof(url), "/node/%s/%s/", nodeId, version); + snprintf(url, sizeof(url), "/node/%s/%s", nodeId, version); snprintf(cPort, sizeof(cPort), "%d", port); + vLog(LOG_DEBUG, "%d here to connect:%s:%s.\n", __LINE__, host, cPort); + releaseAllUnits(); conn = nopoll_conn_new(ctx, host, cPort, NULL, url, NULL, NULL); + + last_connect_sec = system32.timers; + } + //创建历史数据库 + m_database.Open("./ry-das-dn.db", SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, 0); + m_database.MoveDatabaseToMemory(); + //创建数据库表 + try { + SQLiteStatement *pStmt = new SQLiteStatement(&m_database); + pStmt->SqlStatement("CREATE TABLE IF NOT EXISTS HIS_RECORD_TABLE (" + "ID INTEGER PRIMARY KEY AUTOINCREMENT," + "MSG TEXT NOT NULL," + "DATETIME INTEGER NOT NULL," + "STATUS INTEGER NOT NULL);"); + //比较表空间 + delete pStmt; + } catch (SQLiteException &exception) { + vLog(LOG_ERROR, "\nException Occured"); + exception.Show(); + vLog(LOG_ERROR, "SQLite result code: %d,%s\n", exception.GetSqliteResultCode(), exception.GetErrorDescription().c_str()); + return FALSE; } #endif } @@ -2187,81 +2316,75 @@ int main(int argc, char** argv) } if (enable_auto_platform) { #if 1 + #if 0 nopoll_bool isOk = nopoll_conn_is_ok(conn); if (nopoll_conn_is_ok(conn) != nopoll_true) { nopoll_conn_close(conn); char url[512]; char cPort[64]; - snprintf(url, sizeof(url), "/node/%s/%s/", nodeId, version); + snprintf(url, sizeof(url), "/node/%s/%s", nodeId, version); snprintf(cPort, sizeof(cPort), "%d", port); + + vLog(LOG_DEBUG, "tcp connect error url is:%s.\n", url); + vLog(LOG_DEBUG, "%d here to connect:%s:%s.\n", __LINE__, host, cPort); + releaseAllUnits(); conn = nopoll_conn_new(ctx, host, cPort, NULL, url, NULL, NULL); - continue; + //continue; } else { if (!nopoll_conn_wait_until_connection_ready(conn, 5)) { - // some error handling nopoll_conn_close(conn); char url[512]; char cPort[64]; - snprintf(url, sizeof(url), "/node/%s/%s/", nodeId, version); + snprintf(url, sizeof(url), "/node/%s/%s", nodeId, version); snprintf(cPort, sizeof(cPort), "%d", port); + vLog(LOG_DEBUG, "websocket connect not ready url is:%s.\n", url); + vLog(LOG_DEBUG, "%d here to connect:%s:%s.\n", __LINE__, host, cPort); + releaseAllUnits(); conn = nopoll_conn_new(ctx, host, cPort, NULL, url, NULL, NULL); - continue; + //continue; } } -#if 0 - int retries = 0; - char recv_buffer[4096]; - int bytes_read = nopoll_conn_read(conn, (char *)recv_buffer, sizeof(recv_buffer), nopoll_true, 1000); - if (bytes_read < 0) { - vLog(LOG_ERROR, "expected to find bytes from the connection but found: %d\n", bytes_read); - /* close the connection */ - if (conn) { - nopoll_conn_close(conn); - continue; - } - } else if (bytes_read > 0) { - recv_buffer[bytes_read] = '\0'; - vLog(LOG_DEBUG, "\nrecv: %s \nlength = %d\n", recv_buffer, bytes_read); - } /* end if */ -#else + #else + nopoll_bool isOk = nopoll_conn_is_ready(conn); + #endif #if 1 - int msg_count = 0; - noPollMsg *msg[40]; - while ((msg[msg_count] = nopoll_conn_get_msg(conn)) != NULL) { - vLog(LOG_DEBUG, "recv length = %d, %d\n", nopoll_msg_get_payload_size(msg[msg_count]), nopoll_msg_is_final(msg[msg_count])); - if (nopoll_msg_is_final(msg[msg_count])) { - msg_count++; - break; - } else { - msg_count++; + if (isOk) { + int msg_count = 0; + noPollMsg *msg[40]; + while ((msg[msg_count] = nopoll_conn_get_msg(conn)) != NULL) { + vLog(LOG_DEBUG, "recv length = %d, %d\n", nopoll_msg_get_payload_size(msg[msg_count]), nopoll_msg_is_final(msg[msg_count])); + if (nopoll_msg_is_final(msg[msg_count])) { + msg_count++; + break; + } else { + msg_count++; + } + } + if (msg_count > 0) { + int buffer_len; + BYTE *buffer = websocket_msg_join(msg, msg_count, &buffer_len); + if (buffer) { + on_message(conn, (const char *)buffer, buffer_len); + delete [] buffer; + buffer = NULL; + } + } + for (int i = 0; i < msg_count; i++) { + nopoll_msg_unref(msg[i]); + } + } else { + if (last_connect_sec > 0 && system32.timers > (last_connect_sec + 30)) { + nopoll_conn_connect_timeout(ctx, 50000); + char url[512]; + char cPort[64]; + snprintf(url, sizeof(url), "/node/%s/%s", nodeId, version); + snprintf(cPort, sizeof(cPort), "%d", port); + vLog(LOG_DEBUG, "%d here to connect:%s:%s.\n", __LINE__, host, cPort); + releaseAllUnits(); + conn = nopoll_conn_new(ctx, host, cPort, NULL, url, NULL, NULL); + last_connect_sec = system32.timers; } } - if (msg_count > 0) { - int buffer_len; - BYTE *buffer = websocket_msg_join(msg, msg_count, &buffer_len); - if (buffer) { - on_message(conn, (const char *)buffer, buffer_len); - delete [] buffer; - buffer = NULL; - } - } - for (int i = 0; i < msg_count; i++) { - nopoll_msg_unref(msg[i]); - } -#else - int bytes_read; - char recv_buffer[2048]; - do { - bytes_read = nopoll_conn_read(conn, (char *)recv_buffer, sizeof(recv_buffer), nopoll_false, 0); - if (bytes_read > 0) { - recv_buffer[bytes_read] = '\0'; - vLog(LOG_DEBUG, "\nrecv: %s \nlength = %d\n", recv_buffer, bytes_read); - } - if (nopoll_conn_read_pending(conn) > 0) { //接收没有结束 - vLog(LOG_DEBUG, "nopoll_conn_read_pending: %d.\n", nopoll_conn_read_pending(conn)); - } - } while (nopoll_conn_read_pending(conn) > 0); -#endif #endif BOOLEAN sec_changed = FALSE; if (last_sec != (time_t)system32.timers) { @@ -2269,6 +2392,7 @@ int main(int argc, char** argv) sec_changed = TRUE; } if (sec_changed) { +#if 0 if (struMonLinkLog.m_iStartTime > 0) { if (struMonLinkLog.m_iStartTime > 0 && system32.timers <= (struMonLinkLog.m_iStartTime + 30)) { //默认为30s,调试设置为2s //发送报文 @@ -2280,8 +2404,7 @@ int main(int argc, char** argv) struMonLinkLog.m_iStartTime = 0; } } - publishYXBWData(conn); - publishSOEData(conn); +#endif if ((last_sec % 20) == 0) { heart_beat(conn, status); } @@ -2291,12 +2414,15 @@ int main(int argc, char** argv) MakeYKFrame(conn, i); MakeYTFrame(conn, i); if (sec_changed) { + publishinitDeviceData(conn, i); if ((last_sec % 60) == 0) { //更新数据 - //publishRealData(conn, i); - publishinitDeviceData(conn, i); + publishAnalogData(conn, i); + publishStateData(conn, i); } } } +#else + lws_service(context, 500); #endif } } @@ -2314,7 +2440,7 @@ int main(int argc, char** argv) destroy_thread(); freeMem(); vLog(LOG_DEBUG, "App: dataAcquisition start reload.\n"); - } while(0); + } while(1); pthread_mutex_destroy(&mutex); @@ -2322,6 +2448,9 @@ int main(int argc, char** argv) #if 1 nopoll_conn_close(conn); nopoll_ctx_unref(ctx); + + m_database.SaveDatabaseFromMemoryToFile("./ry-das-dn.db"); + m_database.Close(); #endif vLog(LOG_DEBUG, "system stop okay.\n"); return EXIT_SUCCESS; diff --git a/das-dn/comm/process.cpp b/das-dn/comm/process.cpp index fa33719f..fe850a62 100644 --- a/das-dn/comm/process.cpp +++ b/das-dn/comm/process.cpp @@ -9,16 +9,13 @@ static void* main_run_process(void* param) { CProcess* proc = (CProcess *)param; - DWORD old_ticks = 0; - while (TRUE) - { + while (TRUE) { if (!proc->m_bRunFlag) break; proc->Run(); - if (old_ticks != system32.ticks) - { + if (old_ticks != system32.ticks) { old_ticks = system32.ticks; proc->OnTimer(); //every 10ms } diff --git a/das-dn/comm/public.cpp b/das-dn/comm/public.cpp index d5f78555..9d8bdd8a 100644 --- a/das-dn/comm/public.cpp +++ b/das-dn/comm/public.cpp @@ -2255,44 +2255,49 @@ void* idle_process(void* param) yk(); yt(); //操作,保存系统操作Log - if ((system32.timers % 3600) == 0) - { //过整点保存 - if (system32.log_enabled) - { + if ((system32.timers % 3600) == 0) { //过整点保存 + if (system32.log_enabled) { yxbw.DumpYXBW(); soe.DumpSOE(); yklog.DumpYKLog(); ytlog.DumpYTLog(); } yxbw_save = yxbw.GetSavePos(); - if (yxbw_load != yxbw_save) - { //有变位信息 + if (yxbw_load != yxbw_save) { //有变位信息 WriteDatabaseCFG(); yxbw_load = yxbw_save; } } - for (int i = 0; i < PROCESSES_NUM; i++) - { + for (int i = 0; i < PROCESSES_NUM; i++) { config.processes[i].softdog++; - if (config.processes[i].softdog > PROCESS_WATCHDOG_TIME) - { + if (config.processes[i].softdog > PROCESS_WATCHDOG_TIME) { config.processes[i].softdog = PROCESS_WATCHDOG_TIME; - } - else - { + } else { } } - for (int i = 0; i < UNIT_NUM; i++) - { + for (int i = 0; i < UNIT_NUM; i++) { config.units[i].softdog++; - if (config.units[i].softdog > UNIT_WATCHDOG_TIME) - { + if (config.units[i].softdog > UNIT_WATCHDOG_TIME) { config.units[i].softdog = UNIT_WATCHDOG_TIME; config.units[i].value = SPI_ON; - } - else - { + } else { config.units[i].value = SPI_OFF; + if ((config.units[i].state & 0x80) != 0x80) { + int j = 0; + for (j = 0; j < config.units[i].yxcount; j++) { + if ((config.units[i].yxs[j].qds & 0x80) == 0x80) break; + } + if (j < config.units[i].yxcount) { + continue; + } + for (j = 0; j < config.units[i].yccount; j++) { + if ((config.units[i].ycs[j].qds & 0x80) == 0x80) break; + } + if (j < config.units[i].yccount) { + continue; + } + config.units[i].state |= 0x80; + } } } } diff --git a/das-dn/inc/process.h b/das-dn/inc/process.h index b73fd0c3..3ab7ae62 100644 --- a/das-dn/inc/process.h +++ b/das-dn/inc/process.h @@ -278,38 +278,33 @@ public: if (point < 0 || point >= pUnit->yxcount) return; udb = pUnit->yxs[point].order; - if (udb < 0 || udb >= DATABASE_YX_NUM) - { //遥信点号不在数据库定义范围内,只刷新单元数据 - if (pUnit->yxs[point].value != value) - { //update value + if (pUnit->yxs[point].invert) { + value = !value; + } + + if (udb < 0 || udb >= DATABASE_YX_NUM) { //遥信点号不在数据库定义范围内,只刷新单元数据 + if (pUnit->yxs[point].value != value) { //update value pUnit->yxs[point].value = value; pUnit->yxs[point].update_time = system32.timers; pUnit->yxs[point].yxbw = TRUE; - if (bAddYXBW) - { + if (bAddYXBW) { yxbw.PushYXBW(system32.now, udb, value, qds, uid, point, YXBWT_AUTO); } } - } - else if (database.yxs[udb].value != value) - { //update value + } else if (database.yxs[udb].value != value) { //update value pUnit->yxs[point].value = value; pUnit->yxs[point].yxbw = TRUE; pUnit->yxs[point].update_time = system32.timers; database.yxs[udb].value = value; - database.yxs[udb].bw_time = system32.timers; //设置刷新时间 - database.yxs[udb].update_time = system32.timers; //设置刷新时间 + database.yxs[udb].bw_time = system32.timers; //设置刷新时间 + database.yxs[udb].update_time = system32.timers; //设置刷新时间 database.yxs[udb].op_unit = uid; database.yxs[udb].qds = qds; - if (bAddYXBW) - { + if (bAddYXBW) { yxbw.PushYXBW(system32.now, udb, value, qds, uid, point, YXBWT_AUTO); } - } - else - { - if (pUnit->yxs[point].value != value) - { + } else { + if (pUnit->yxs[point].value != value) { pUnit->yxs[point].value = value; pUnit->yxs[point].update_time = system32.timers; //若数据库中的遥信位置是正确的则不额外产生变位信息 @@ -389,23 +384,17 @@ public: if (point < 0 || point >= pUnit->yccount) return; udb = pUnit->ycs[point].order; - if (udb < 0 || udb >= DATABASE_YC_NUM) - { //遥测点号不在数据库定义范围内,只刷新本单元数据 - if (pUnit->ycs[point].value != value) - { //update value + if (udb < 0 || udb >= DATABASE_YC_NUM) { //遥测点号不在数据库定义范围内,只刷新本单元数据 + if (pUnit->ycs[point].value != value) { //update value pUnit->ycs[point].value = value; pUnit->ycs[point].update_time = system32.timers; pUnit->ycs[point].ycbw = TRUE; } - } - else if (database.ycs[udb].value != value) - { //update value + } else if (database.ycs[udb].value != value) { //update value pUnit->ycs[point].value = value; pUnit->ycs[point].update_time = system32.timers; - if (pUnit->ycs[point].change_pos >= 0 && bAddYCBW) - { - if (abs(pUnit->ycs[point].value - database.ycs[udb].value) >= pUnit->ycs[point].change_pos) - { //40码值变化量认为是遥测变位 + if (pUnit->ycs[point].change_pos >= 0 && bAddYCBW) { + if (abs(pUnit->ycs[point].value - database.ycs[udb].value) >= pUnit->ycs[point].change_pos) { //40码值变化量认为是遥测变位 pUnit->ycs[point].ycbw = TRUE; ycbw.PushYCBW(system32.now, udb, value, qds, uid, point, YCBWT_AUTO); } @@ -414,11 +403,8 @@ public: database.ycs[udb].op_unit = uid; database.ycs[udb].update_time = system32.timers; //设置刷新时间 database.ycs[udb].qds = qds; - } - else - { - if (pUnit->ycs[point].value != value) - { + } else { + if (pUnit->ycs[point].value != value) { pUnit->ycs[point].value = value; pUnit->ycs[point].update_time = system32.timers; pUnit->ycs[point].ycbw = TRUE; @@ -439,23 +425,17 @@ public: udb = pUnit->ycs[point].order; nvalue = (long)(pUnit->ycs[point].factor * value); - if (udb < 0 || udb >= DATABASE_YC_NUM) - { //遥测点号不在数据库定义范围内,只刷新本单元数据 - if (pUnit->ycs[point].value != nvalue) - { //update value + if (udb < 0 || udb >= DATABASE_YC_NUM) { //遥测点号不在数据库定义范围内,只刷新本单元数据 + if (pUnit->ycs[point].value != nvalue) { //update value pUnit->ycs[point].value = nvalue; pUnit->ycs[point].update_time = system32.timers; pUnit->ycs[point].ycbw = TRUE; } - } - else if (database.ycs[udb].value != nvalue) - { //update value + } else if (database.ycs[udb].value != nvalue) { //update value pUnit->ycs[point].value = nvalue; pUnit->ycs[point].update_time = system32.timers; - if (pUnit->ycs[point].change_pos >= 0 && bAddYCBW) - { - if (abs(pUnit->ycs[point].value - database.ycs[udb].value) >= pUnit->ycs[point].change_pos) - { //40码值变化量认为是遥测变位 + if (pUnit->ycs[point].change_pos >= 0 && bAddYCBW) { + if (abs(pUnit->ycs[point].value - database.ycs[udb].value) >= pUnit->ycs[point].change_pos) { //40码值变化量认为是遥测变位 pUnit->ycs[point].ycbw = TRUE; ycbw.PushYCBW(system32.now, udb, nvalue, qds, uid, point, YCBWT_AUTO); } @@ -464,11 +444,8 @@ public: database.ycs[udb].op_unit = uid; database.ycs[udb].update_time = system32.timers; //设置刷新时间 database.ycs[udb].qds = qds; - } - else - { - if (pUnit->ycs[point].value != nvalue) - { + } else { + if (pUnit->ycs[point].value != nvalue) { pUnit->ycs[point].value = nvalue; pUnit->ycs[point].update_time = system32.timers; pUnit->ycs[point].ycbw = TRUE; @@ -501,7 +478,7 @@ public: pYC->update_time = database.ycs[udb].update_time; pYC->qds = database.ycs[udb].qds; } - if (pYC->factor > 1 && pUnit->type == 0x00) + if (pYC->factor > 1 && (pUnit->type & 0x0f) == 0x00) { //系数有效,且为转发单元 value /= pYC->factor; } @@ -680,7 +657,8 @@ public: database.yms[udb].value = value; database.yms[udb].update_time = system32.timers; //设置刷新时间 database.yms[udb].op_unit = uid; - }; + } + inline DWORD GetUnitYM(int uid, int order) const { int udb; @@ -695,7 +673,8 @@ public: pUnit->yms[order].value = database.yms[udb].value; pUnit->yms[order].update_time = database.yms[udb].update_time; return database.yms[udb].value; - }; + } + void SetUnitYMQDS(int uid, int point, BYTE qds); BYTE GetUnitYMQDS(int uid, int point) const; inline float GetUnitYMReal(int uid, int order) const diff --git a/das-dn/inc/public.h b/das-dn/inc/public.h index 25fcf22f..5679bf50 100644 --- a/das-dn/inc/public.h +++ b/das-dn/inc/public.h @@ -188,30 +188,14 @@ typedef int SOCKET; #define PROTOCOL_HOST_MODBUS_TCP 16 //MODBUS tcp主 #define PROTOCOL_SUB_MODBUS_TCP 17 //MODBUS RTU over tcp从 #define PROTOCOL_HOST_MODBUS_RTU_TCP 18 //MODBUS RTU over tcp主 -//#define PROTOCOL_HOST_MODBUS_RTU_NCY 19 //NCY-6100系列微机保护装置MODBUS RTU主 -//#define PROTOCOL_HOST_MODBUS_RTU_LIYEZG 20 //浙江立业电器MODBUS RTU主 #define PROTOCOL_RTU_STATE 21 //主控状态 #define PROTOCOL_LOCAL_DEBUG 22 //本地调试 -//#define PROTOCOL_SUB_XT9712 23 //XT9712从 -//#define PROTOCOL_HISDATA 24 //历史数据协议 #define PROTOCOL_BF_FTP 25 //倍福FTP数据协议 -//#define PROTOCOL_HOST_NSA 27 //NSA主 -#define PROTOCOL_HOST_DLT645V2007 30 //dlt645v2007协议 -#define PROTOCOL_HOST_DLT645V2007_HR 31 //中电华瑞dlt645v2007协议 -#define PROTOCOL_HOST_DLT645V2007_OVERTCP 32 //中电华瑞dlt645v2007 over tcp协议 +#define PROTOCOL_HOST_DLT645V2007 32 //dlt645v2007协议 #define PROTOCOL_CALC 37 //计算 -//#define PROTOCOL_DLT_1867 40 //需求响应协议dlt1867-2008 #define PROTOCOL_SUB_GDW104 47 //国网104从 -//#define PROTOCOL_AGC 50 //功率自动控制app -//#define PROTOCOL_AVC 51 //电压自动控制app #define PROTOCOL_HW_MQTT 72 //华为物联平台ROMA -//#define PROTOCOL_ISS_MQTT 73 //软通动力mqtt -//#define PROTOCOL_LM_MQTT 74 //罗米mqtt -//#define PROTOCOL_GRPC_PUBLISH 76 //gRPC上传发布 -//#define PROTOCOL_GRPC_SUBSCRIBE 77 //gRPC下载订阅 #define PROTOCOL_OPCUA 78 //opcua协议 -//#define PROTOCOL_HOST_MODBUS_RTU_RDS100AFT 80 //珠海瑞捷电气股份有限公司RD系列保护装置MODBUS RTU主 -//#define PROTOCOL_HOST_MODBUS_RTU_APF 81 //江苏沃海电气有限公司APF(SVG)MODBUS RTU主 #define ADDR_TYPE_NORMAL 0 @@ -512,6 +496,9 @@ typedef struct DWORD ticks; DWORD timers; DWORD auto_reset_interval; + + char projectName[64]; //项目名称 + char version[64]; //配置文件版本信息 } struSystem; typedef struct diff --git a/das-dn/third_party/nopoll/nopoll_conn.c b/das-dn/third_party/nopoll/nopoll_conn.c index 6e02412b..48254c3b 100644 --- a/das-dn/third_party/nopoll/nopoll_conn.c +++ b/das-dn/third_party/nopoll/nopoll_conn.c @@ -289,17 +289,13 @@ NOPOLL_SOCKET __nopoll_conn_sock_connect_opts_internal (noPollCtx * ctx, nopoll_conn_set_sock_block (session, nopoll_false); /* do a tcp connect */ - if (connect (session, res->ai_addr, res->ai_addrlen) < 0) { - if(errno != NOPOLL_EINPROGRESS && errno != NOPOLL_EWOULDBLOCK && errno != NOPOLL_ENOTCONN) { - nopoll_log (ctx, NOPOLL_LEVEL_WARNING, "unable to connect to remote host %s:%s errno=%d", - host, port, errno); - - shutdown (session, SHUT_RDWR); - nopoll_close_socket (session); - + if (connect (session, res->ai_addr, res->ai_addrlen) < 0) { + if(errno != NOPOLL_EINPROGRESS && errno != NOPOLL_EWOULDBLOCK && errno != NOPOLL_ENOTCONN) { + nopoll_log (ctx, NOPOLL_LEVEL_WARNING, "unable to connect to remote host %s:%s errno=%d", host, port, errno); + shutdown (session, SHUT_RDWR); + nopoll_close_socket (session); /* relase address info */ freeaddrinfo (res); - return -1; } /* end if */ } /* end if */ @@ -385,6 +381,7 @@ char * __nopoll_conn_get_client_init (noPollConn * conn, noPollConnOpts * opts) conn->handshake->expected_accept = nopoll_strdup (key); /* send initial handshake */ +#if 0 return nopoll_strdup_printf ("GET %s HTTP/1.1" "\r\nHost: %s" "\r\nUpgrade: websocket" @@ -413,6 +410,36 @@ char * __nopoll_conn_get_client_init (noPollConn * conn, noPollConnOpts * opts) conn->protocols ? conn->protocols : "", /* extra arbitrary headers */ (opts && opts->extra_headers) ? opts->extra_headers : ""); +#else + return nopoll_strdup_printf ("GET %s HTTP/1.1" + "\r\nHost: %s:%s" + "\r\nUpgrade: websocket" + "\r\nConnection: Upgrade" + "\r\nSec-WebSocket-Key: %s" + "\r\nSec-WebSocket-Version: %d" + "%s%s" + "%s%s" /* Cookie */ + "%s%s" /* protocol part */ + "%s" /* extra arbitrary headers */ + "\r\n\r\n", + conn->get_url, + conn->host_name, conn->port, + /* sec-websocket-key */ + key, + /* sec-websocket-version */ + conn->ctx->protocol_version, + /* Origin (support not sending Origin: header in case it is not defined) */ + (conn->origin != NULL && (opts == NULL || opts->add_origin_header)) ? "\r\nOrigin: " : "", + (conn->origin != NULL && (opts == NULL || opts->add_origin_header)) ? conn->origin : "", + /* Cookie */ + (opts && opts->cookie) ? "\r\nCookie: " : "", + (opts && opts->cookie) ? opts->cookie : "", + /* protocol part */ + conn->protocols ? "\r\nSec-WebSocket-Protocol: " : "", + conn->protocols ? conn->protocols : "", + /* extra arbitrary headers */ + (opts && opts->extra_headers) ? opts->extra_headers : ""); +#endif } @@ -882,6 +909,7 @@ noPollConn * __nopoll_conn_new_common (noPollCtx * ctx, /* get client init payload */ content = __nopoll_conn_get_client_init (conn, options); + if (content == NULL) { nopoll_log (ctx, NOPOLL_LEVEL_CRITICAL, "Failed to build client init message, unable to connect"); nopoll_conn_shutdown (conn); @@ -1041,15 +1069,19 @@ noPollConn * __nopoll_conn_new_common (noPollCtx * ctx, /* call to send content */ remaining_timeout = ctx->conn_connect_std_timeout; while (remaining_timeout > 0) { - if (size != conn->send (conn, content, size)) { + int sent_size = conn->send (conn, content, size); + //fprintf(stderr, "here sent size is: %d.\n", sent_size); + if (size != sent_size) { /* for some reason, under FreeBSD, a ENOTCONN is reported when they should be returning EINPROGRESS and/or EWOULDBLOCK */ if (errno == NOPOLL_EWOULDBLOCK || errno == NOPOLL_EINPROGRESS || errno == NOPOLL_ENOTCONN) { /* nopoll_log (ctx, NOPOLL_LEVEL_CRITICAL, "Connection in progress (errno=%d), session: %d", errno, session); */ + //fprintf(stderr, "Connection in progress (errno=%d,%s), session: %d\n", errno, strerror(errno), session); nopoll_sleep (10000); remaining_timeout -= 10000; continue; } /* end if */ + //fprintf(stderr, "Failed to send websocket init message, error code was: %d,%s (2), closing session\n", errno, strerror(errno)); nopoll_log (ctx, NOPOLL_LEVEL_CRITICAL, "Failed to send websocket init message, error code was: %d (2), closing session", errno); nopoll_conn_shutdown (conn); } /* end if */ @@ -1057,6 +1089,7 @@ noPollConn * __nopoll_conn_new_common (noPollCtx * ctx, break; } + //fprintf (stderr, "Web socket initial client handshake sent"); nopoll_log (ctx, NOPOLL_LEVEL_DEBUG, "Web socket initial client handshake sent"); /* release content */