diff --git a/das-dn/CMakeLists.txt b/das-dn/CMakeLists.txt index 26350a72..aa4991cf 100644 --- a/das-dn/CMakeLists.txt +++ b/das-dn/CMakeLists.txt @@ -4,7 +4,7 @@ set (VERSION 1.0.1) # set (CMAKE_CXX_STANDARD 11) option (USE_MQTT "use mqtt protocol" ON) -option (USE_WEBSOCKET "use websocket" OFF) +option (USE_WEBSOCKET "use websocket" ON) option (USE_SQLITE3 "use sqlite3" ON) if (USE_SQLITE3) @@ -145,11 +145,10 @@ if (USE_WEBSOCKET) third_party/nopoll/nopoll_io.c third_party/nopoll/nopoll_log.c third_party/nopoll/nopoll_msg.c) - - set (APP_SRCS ${APP_SRCS} dlt1867/dlt1867.cpp) set (APP_LIBS ${APP_LIBS} ssl crypto) add_definitions(-DUSE_WEBSOCKET) add_definitions(-DNOPOLL_OS_UNIX=1) + #add_definitions(-DSHOW_DEBUG_LOG) add_definitions(-DNOPOLL_HAVE_VASPRINTF=1) add_definitions(-DNOPOLL_HAVE_TLSv10_ENABLED=1) add_definitions(-DNOPOLL_HAVE_TLSv11_ENABLED=1) diff --git a/das-dn/cmg/changemaster.cpp b/das-dn/cmg/changemaster.cpp index bda6f179..fc7d9214 100644 --- a/das-dn/cmg/changemaster.cpp +++ b/das-dn/cmg/changemaster.cpp @@ -32,16 +32,19 @@ CChangeMaster::CChangeMaster() if (gethostname(hostname, sizeof(hostname)) < 0) { vLog(LOG_ERROR, "gethostname error.\n"); + return; exit(EXIT_FAILURE); } if (NULL == (m_control.m_node = GetNode(hostname))) { vLog(LOG_ERROR, "node table error, cann't run(%s,%s)!\n", m_control.m_node->m_machine_name, hostname); + return; exit(EXIT_FAILURE); } if (m_control.m_node == NULL) { vLog(LOG_ERROR, "node table error, cann't change!\n"); + return; exit(EXIT_FAILURE); } if ((m_control.m_node->m_tcitype != MASTER_TCI) && (m_control.m_node->m_tcitype != STANDBY_TCI)) @@ -444,6 +447,7 @@ void CChangeMaster::ChangeDual() NODE_STRUCT* CChangeMaster::GetNode(char *name) { + return &nodes.m_node[0]; int i = 0; for (i = 0; i < MAX_NODE_NUM; i++) { @@ -457,6 +461,7 @@ NODE_STRUCT* CChangeMaster::GetNode(char *name) NODE_STRUCT* CChangeMaster::GetNode(int netno) { + return &nodes.m_node[0]; int i = 0; for (i = 0; i < MAX_NODE_NUM; i++) diff --git a/das-dn/cmg/main.cpp b/das-dn/cmg/main.cpp index 6a07166e..2b7c6e5a 100644 --- a/das-dn/cmg/main.cpp +++ b/das-dn/cmg/main.cpp @@ -7,26 +7,22 @@ #include #include -//#include "public.h" -#ifdef WIN32 - #include "getopt.h" - BOOLEAN g_bSocketStarted = FALSE; -#else - #include -#endif +#include #include "changemaster.h" #include "process.h" #if 1 #include #include +#include #include #include #include #include #include -#include +#include +#include #include #include #include @@ -37,27 +33,41 @@ #include "soe.h" #include "yklog.h" #include "ytlog.h" +#include "public.h" + +#define MAX_MSG_COUNT 64 -char publish_topic[512] = {'\0'}; -char publish_realdata_topic[512] = {'\0'}; -char subscribe_topic[512] = {'\0'}; bool g_dataAcquisitionReload = false; std::string g_traceId; - -#define AES_BLOCK_SIZE 16 -//AES密钥 -unsigned char key[17] = {"1234567890123456"}; - LONG m_soeload = 0; LONG m_yxbwload = 0; LONG m_ycbwload = 0; -std::vector m_total_processes; -std::vector m_total_units; +typedef std::vector stringvector; +stringvector m_gLinkIDs; //链路名和Id -std::unordered_map ykirn2uid_map; -std::unordered_map ytirn2uid_map; +typedef std::unordered_map name2intmap; +name2intmap unitname2uid_map; + +#define CMD_CONTROL_OPERATION 0 +#define CMD_CONTROL_SETTING 1 +typedef struct { + int uid; + int point; + int order; + int type; +} struct_service_item; + +typedef std::unordered_map name2servicemap; +typedef std::unordered_map unitname2servicemap; +unitname2servicemap unitname2service_map; + +typedef struct { + std::string name; + Json::Value value; +} struct_attr; +typedef std::vector attrvector; struct { Json::Int64 m_iLinkIrn; @@ -75,244 +85,342 @@ std::vector split(const std::string &s, char delimiter) return tokens; } -/* - * 一次输入所有待加密数据,并使用AES CBC模式进行加密 - * - * 输入: - * input:待加密数据 - * input_length:待加密数据的长度 - * encrypted_data:加密后的密文存放位置,注意此指针指向的区域有效长度计算如下:使用PKCS#7填充的output长度计算应为 decrypted_data_length = ((input_length/16)+1)*16 - * 返回值:加密后密文的长度,若为负值,则存在错误。 - */ -int encrypt_data_by_aes_cbc(unsigned char *input, int input_length, unsigned char* iv, unsigned char *encrypted_data) -{ - mbedtls_aes_context aes_context; - int result = 0; - int padding = 0; - int real_input_length = 0; - unsigned char padding_code; - unsigned char *input_tmp; - unsigned char error[64]; - //PKCS7填充到16字节整倍数,若待加密数据正好为16字节整倍数,仍需填充16个字节 - //计算当前长度距离16整倍数的差值,此值即为填充值 - padding = 16 - (input_length % 16); - padding_code = (unsigned char)padding; - //real_input_length即为经过填充后待加密数据的长度,值必然为16整倍数,同时此长度也为加密后的密文长度 - real_input_length = input_length + padding; - if (real_input_length % 16 != 0) { - //填充后必然为16字节整倍数,若不为16字节整倍数,则填充存在问题 - vLog(LOG_ERROR, "failed to padding\n"); - return -1; - } - input_tmp = (unsigned char *)calloc(1, real_input_length); - if (input_tmp == NULL) { - //分配内存错误 - return -2; - } - //进行填充 - memset(input_tmp, 0, real_input_length); - memcpy(input_tmp, input, input_length); - memset(input_tmp + input_length, padding_code, padding); +struSystem config_system32; +struConfig config_config; +struDatabase config_database; +struNodeOption config_nodes; +struUnitStatic config_static_units[UNIT_NUM]; - mbedtls_aes_init(&aes_context); - //若使用AES-128,则必要长度为128bits。若使用AES-256,则密钥长度为256bits - result = mbedtls_aes_setkey_enc(&aes_context, key, 128); - if (result != 0) { - mbedtls_strerror(result, (char *)error, sizeof(error)); - vLog(LOG_ERROR, "failed to set key:%s\n",error); - free(input_tmp); - mbedtls_aes_free(&aes_context); - return -3; +bool configInitializeMemory(void) +{ + int i, j; + + if (config_database.yxs) delete [] config_database.yxs; + if (config_database.ycs) delete [] config_database.ycs; + if (config_database.yms) delete [] config_database.yms; + if (config_database.yks) delete [] config_database.yks; + if (config_database.yts) delete [] config_database.yts; + + for (i = 0; i < UNIT_NUM; i++) { + if (config_config.units[i].yxs) delete [] config_config.units[i].yxs; + if (config_config.units[i].ycs) delete [] config_config.units[i].ycs; + if (config_config.units[i].yms) delete [] config_config.units[i].yms; + if (config_config.units[i].yks) delete [] config_config.units[i].yks; + if (config_config.units[i].yts) delete [] config_config.units[i].yts; + } + + memset(&config_system32, 0, sizeof(config_system32)); + memset(&config_config, 0, sizeof(config_config)); + memset(&config_database, 0, sizeof(config_database)); + memset(&config_nodes, 0, sizeof(config_nodes)); + memset(config_static_units, 0, sizeof(config_static_units)); + + config_database.yxs = new struYX[DATABASE_YX_NUM]; + if (config_database.yxs == NULL) return FALSE; + memset(config_database.yxs, 0, sizeof(struYX) * DATABASE_YX_NUM); + + config_database.ycs = new struYC[DATABASE_YC_NUM]; + if (config_database.ycs == NULL) return FALSE; + memset(config_database.ycs, 0, sizeof(struYC) * DATABASE_YC_NUM); + + config_database.yms = new struYM[DATABASE_YM_NUM]; + if (config_database.yms == NULL) return FALSE; + memset(config_database.yms, 0, sizeof(struYM) * DATABASE_YM_NUM); + + config_database.yks = new struYK[DATABASE_YK_NUM]; + if (config_database.yks == NULL) return FALSE; + memset(config_database.yks, 0, sizeof(struYK) * DATABASE_YK_NUM); + + config_database.yts = new struYT[DATABASE_YT_NUM]; + if (config_database.yts == NULL) return FALSE; + memset(config_database.yts, 0, sizeof(struYT) * DATABASE_YT_NUM); + + strcpy(config_system32.yk_pass, "12345"); + config_system32.yk_keep = 30; + config_system32.interval_time = 2; + config_system32.log_enabled = FALSE; + config_system32.zjd_log_bind_addr = INADDR_ANY; + config_system32.zjd_log_bind_port = 0; + for (i = 0; i < HARDWARE_PORTS_NUM; i++) { + config_config.hardware.ports[i].baud = 9600; + config_config.hardware.ports[i].data = 8; + config_config.hardware.ports[i].parity = 0; + config_config.hardware.ports[i].stop = 0; + config_config.hardware.ports[i].timeout = 1000; //1s + } + for (i = 0; i < PROCESSES_NUM; i++) { + config_config.processes[i].softdog = 0; + for (j = 0; j < PROCESS_UNIT_NUM; j++) { + config_config.processes[i].units[j] = -1; + } + } + for (i = 0; i < UNIT_NUM; i++) { + config_config.units[i].softdog = 0; } - result = mbedtls_aes_crypt_cbc(&aes_context, MBEDTLS_AES_ENCRYPT, real_input_length, iv, input_tmp, encrypted_data); - if (result != 0) { - mbedtls_strerror(result, (char *)error, sizeof(error)); - vLog(LOG_ERROR, "failed to encrypt data:%s\n",error); - free(input_tmp); - mbedtls_aes_free(&aes_context); - return -4; - } - - free(input_tmp); - mbedtls_aes_free(&aes_context); - //返回密文长度 - return real_input_length; + return true; } -/* - * 一次输入所有待加密数据,并使用AES CBC模式进行加密 - * 使用PKCS#7填充的output长度计算应为 output_length = ((input_length/16)+1)*16 - * 输入: - * encrypted_data:待解密的密文数据 - * encrypted_length:待解密的密文数据的有效长度 - * decrypted_data:解密的明文数据 - *返回值:解密后的明文数据的有效长度,即decrypted_data中数据的有效长度。若为负值,则存在错误。 - */ -int decrypt_data_by_aes_cbc(unsigned char *encrypted_data, int encrypted_length, unsigned char* iv, unsigned char *decrypted_data) + +bool configWriteSystemCFG(void) { - mbedtls_aes_context aes_context; - int result = 0; - int padding = 0; - unsigned char padding_code; - unsigned char error[64]; + FILE* pf; + char pathName[512]; - mbedtls_aes_init(&aes_context); - //若使用AES-128,则必要长度为128bits。若使用AES-256,则密钥长度为256bits - result = mbedtls_aes_setkey_dec(&aes_context, key, 128); - if (result != 0) { - mbedtls_strerror(result, (char *)error, sizeof(error)); - vLog(LOG_ERROR, "failed to set key:%s\n", error); - mbedtls_aes_free(&aes_context); - return -1; + snprintf(pathName, sizeof(pathName), "%s/%s", configpath, FILE_SYSTEM_CONFIG); + pf = fopen(pathName, "wb+"); + if (pf != NULL) { + fwrite(&config_system32, sizeof(config_system32), 1, pf); + fclose(pf); + return true; } - - //解密后包含填充值的明文,后续需去除填充值 - unsigned char decrypted_data_include_padding[encrypted_length]; - result = mbedtls_aes_crypt_cbc(&aes_context, MBEDTLS_AES_DECRYPT, encrypted_length, iv, encrypted_data, decrypted_data_include_padding); - if (result != 0) { - mbedtls_strerror(result, (char *)error, sizeof(error)); - vLog(LOG_ERROR, "failed to decrypt data:%s\n", error); - mbedtls_aes_free(&aes_context); - return -2; - } - //去除PKCS#7的填充值 - //读取最后一个值,此值即为填充值的长度 - padding_code = decrypted_data_include_padding[encrypted_length - 1]; - padding = (int)padding_code; - if (padding < 1 || padding > 16) { - vLog(LOG_ERROR, "padding code is illegal!\n"); - return -3; - } - int real_decrypted_data_length = encrypted_length - padding; - - memcpy(decrypted_data, decrypted_data_include_padding, real_decrypted_data_length); - - mbedtls_aes_free(&aes_context); - return real_decrypted_data_length; + return false; } -//加密 -const std::string _encode(const Json::Value jsonRoot, unsigned char* iv) +bool configWriteNodeCFG(void) { - std::string outString; - Json::StreamWriterBuilder builder; - builder["indentation"] = ""; - builder["emitUTF8"] = true; - std::string inBuf = Json::writeString(builder, jsonRoot); - //采用aes加密 - //PKCS#7填充的密文长度计算:decrypted_data_length = ((input_length/16)+1)*16,decrypted_data_length = ((11/16)+1)*16 = 16 - int encrypted_data_length = (((inBuf.length() >> 4) + 1) << 4); - if (encrypted_data_length <= 0) return nullptr; - unsigned char* encrypted_data = new unsigned char[encrypted_data_length]; - memset(encrypted_data, 0, encrypted_data_length); - encrypted_data_length = encrypt_data_by_aes_cbc((unsigned char *)inBuf.c_str(), inBuf.length(), iv, encrypted_data); - if (encrypted_data_length < 0) { - vLog(LOG_ERROR, "failed to encrypt data!\n"); - delete [] encrypted_data; - encrypted_data = NULL; - return nullptr; - } - int len = Base64_encodeLength((const b64_data_t *)encrypted_data, encrypted_data_length); - if (len > 0) { - char *out = new char[len + 1]; - do { - memset(out, 0, sizeof(char)*(len + 1)); - Base64_encode((char *)out, (len + 1), (const b64_data_t *)encrypted_data, encrypted_data_length); - } while(0); - outString = out; + FILE *pf; + char pathName[512]; - delete [] encrypted_data; - encrypted_data = NULL; - - delete [] out; - out = NULL; + snprintf(pathName, sizeof(pathName), "%s/%s", configpath, FILE_NODE_CONFIG); + pf = fopen(pathName, "wb+"); + if (pf != NULL) { + fwrite(&config_nodes, sizeof(config_nodes), 1, pf); + fclose(pf); + return true; } - return outString; + return false; } -//解密 -Json::Value _decode(const char* pBuf, const int size, unsigned char* iv) +bool configWriteHardwareCFG(void) { - Json::Value jsonRoot; - jsonRoot.clear(); - std::string err; - Json::CharReaderBuilder builder; - Json::CharReader* reader(builder.newCharReader()); - int len = Base64_decodeLength(pBuf, size); - if (len > 0) { - char *out = new char[len + 1]; - do { - memset(out, 0, sizeof(char)*(len + 1)); - Base64_decode((b64_data_t *)out, (len + 1), pBuf, size); - //aes解密 - int decrypted_data_length = (((len >> 4) + 1) << 4); - if (decrypted_data_length <= 0) return nullptr; - unsigned char* decrypted_data = new unsigned char[decrypted_data_length + 1]; - memset(decrypted_data, 0, decrypted_data_length+1); - decrypted_data_length = decrypt_data_by_aes_cbc((unsigned char *)out, len, iv, decrypted_data); - if ( decrypted_data_length < 0) { - vLog(LOG_ERROR, "failed to decrypt data!\n"); - delete [] decrypted_data; - decrypted_data = NULL; - break; + FILE* pf; + char pathName[512]; + + snprintf(pathName, sizeof(pathName), "%s/%s", configpath, FILE_HARDWARE_CONFIG); + pf = fopen(pathName, "wb+"); + if (pf != NULL) { + fwrite(&config_config.hardware, sizeof(config_config.hardware), 1, pf); + fclose(pf); + return true; + } + return false; +} + +bool configWriteProcessCFG(void) +{ + FILE* pf; + char pathName[512]; + + snprintf(pathName, sizeof(pathName), "%s/%s", configpath, FILE_PROCESS_CONFIG); + pf = fopen(pathName, "wb+"); + if (pf != NULL) { + fwrite(&config_config.processes, sizeof(config_config.processes), 1, pf); + fclose(pf); + return true; + } + return false; +} + +bool configWriteUnitCFG(void) +{ + int i; + char filename[512]; + char pathName[512]; + FILE *pf = NULL; + FILE *pfPoint = NULL; + + snprintf(pathName, sizeof(pathName), "%s/%s", configpath, FILE_UNIT_CONFIG); + pf = fopen(pathName, "wb+"); + if (pf != NULL) { + for (i = 0; i < UNIT_NUM; i++) { + if (fwrite(&config_config.units[i], sizeof(config_config.units[i]), 1, pf) != 1) break; + if (config_config.units[i].yxcount > 0) { + if (config_config.units[i].yxs != NULL) { + snprintf(filename, sizeof(filename), FILE_UNIT_YX_CONFIG, configpath, i); + pfPoint = fopen(filename, "wb+"); + if (pfPoint != NULL) { + fwrite(config_config.units[i].yxs, sizeof(struUnitYX)*config_config.units[i].yxcount, 1, pfPoint); + fclose(pfPoint); + } + } } - if (!reader->parse((const char *)decrypted_data, (const char *)(decrypted_data + decrypted_data_length), &jsonRoot, &err)) { - vLog(LOG_ERROR, "reader->parse(out, out + strlen(out), &jsonRoot, &err) error<%d,%s>。\n", errno, err.c_str()); - break; + if (config_config.units[i].yccount > 0) { + if (config_config.units[i].ycs != NULL) { + snprintf(filename, sizeof(filename), FILE_UNIT_YC_CONFIG, configpath, i); + pfPoint = fopen(filename, "wb+"); + if (pfPoint != NULL) { + fwrite(config_config.units[i].ycs, sizeof(struUnitYC)*config_config.units[i].yccount, 1, pfPoint); + fclose(pfPoint); + } + } } - delete [] decrypted_data; - decrypted_data = NULL; - } while(0); - delete [] out; - out = NULL; + if (config_config.units[i].ymcount > 0) { + if (config_config.units[i].yms != NULL) { + snprintf(filename, sizeof(filename), FILE_UNIT_YM_CONFIG, configpath, i); + pfPoint = fopen(filename, "wb+"); + if (pfPoint != NULL) { + fwrite(config_config.units[i].yms, sizeof(struUnitYM)*config_config.units[i].ymcount, 1, pfPoint); + fclose(pfPoint); + } + } + } + if (config_config.units[i].ykcount > 0) { + if (config_config.units[i].yks != NULL) { + snprintf(filename, sizeof(filename), FILE_UNIT_YK_CONFIG, configpath, i); + pfPoint = fopen(filename, "wb+"); + if (pfPoint != NULL) { + fwrite(config_config.units[i].yks, sizeof(struUnitYK)*config_config.units[i].ykcount, 1, pfPoint); + fclose(pfPoint); + } + } + } + if (config_config.units[i].ytcount > 0) { + if (config_config.units[i].yts != NULL) { + snprintf(filename, sizeof(filename), FILE_UNIT_YT_CONFIG, configpath, i); + pfPoint = fopen(filename, "wb+"); + if (pfPoint != NULL) { + fwrite(config_config.units[i].yts, sizeof(struUnitYT)*config_config.units[i].ytcount, 1, pfPoint); + fclose(pfPoint); + } + } + } + } + fclose(pf); + return true; } - return jsonRoot; + return false; +} + +bool configWriteStaticUnitCFG(void) +{ + FILE *pf; + char pathName[512]; + + snprintf(pathName, sizeof(pathName), "%s/%s", configpath, FILE_UNIT_STATIC); + pf = fopen(pathName, "w+"); + if (pf != NULL) { + fwrite(config_static_units, sizeof(config_static_units), 1, pf); + fclose(pf); + return true; + } + return false; +} + +bool configWriteDatabaseCFG(void) +{ + FILE *pf; + char pathName[512]; + + snprintf(pathName, sizeof(pathName), "%s/%s", configpath, FILE_DATABASE_CONFIG); + pf = fopen(pathName, "wb+"); + if (pf != NULL) { + fwrite(config_database.yxs, sizeof(struYX) * DATABASE_YX_NUM, 1, pf); + fwrite(config_database.ycs, sizeof(struYC) * DATABASE_YC_NUM, 1, pf); + fwrite(config_database.yms, sizeof(struYM) * DATABASE_YM_NUM, 1, pf); + fwrite(config_database.yks, sizeof(struYK) * DATABASE_YK_NUM, 1, pf); + fwrite(config_database.yts, sizeof(struYT) * DATABASE_YT_NUM, 1, pf); + fclose(pf); + return true; + } + return false; } -bool publish_sensor_data(struct mosquitto *mosq, const std::string traceId, const char* command, const char* topic, const Json::Value payload) +BYTE *websocket_msg_join(noPollMsg **msg, int msg_count, int *buffer_size) +{ + BYTE* buffer; + + if (msg_count <= 0 || msg_count >= MAX_MSG_COUNT) return NULL; + if (msg[0] == NULL) return NULL; + *buffer_size = 0; + for (int i = 0; i < msg_count; i++) + { + *buffer_size += nopoll_msg_get_payload_size(msg[i]); + } + buffer = new BYTE[*buffer_size + 1]; + int len = 0; + for (int i = 0; i < msg_count; i++) + { + memcpy(&buffer[len], nopoll_msg_get_payload(msg[i]), nopoll_msg_get_payload_size(msg[i])); + len += nopoll_msg_get_payload_size(msg[i]); + } + buffer[*buffer_size] = 0; + /* return joined message */ + return buffer; +} + +int websocket_write(noPollConn* conn, const char * buffer, int buffer_len) +{ +#if 1 + int result; + result = nopoll_conn_send_text(conn, (const char *)buffer, buffer_len); + /* ensure we have written all bytes but limit operation to 2 seconds */ + result = nopoll_conn_flush_writes(conn, 2000000, result); + return (result == buffer_len) ? 0 : -1; +#else + // FIRST PART: normal send operation + int tries = 0; + int bytes_written; + // do write operation and check + bytes_written = nopoll_conn_send_text(conn, buffer, buffer_len); + if (bytes_written == length) + { // operation completed, just return bytes written + return 0; + } + // SECOND PART: retry in the case of failure + // some failure found, check errno + while (tries < 5 && errno == NOPOLL_EWOULDBLOCK && nopoll_conn_pending_write_bytes(conn) > 0) + { // ok, unable to write all data but that data is waiting to be flushed + // you can return here and then make your application to retry again or + // try it right now, but with a little pause before continue + nopoll_sleep(10000); // lets wait 10ns + // flush and check if write operation completed + if (nopoll_conn_complete_pending_write(conn) == 0) return 0; + // limit loop + tries++; + } + // failure, return error code reported by the first call or the last retry + return -1; +#endif +} + +bool publish_sensor_data(noPollConn* conn, const std::string traceId, const char* command, const Json::Value payload) { Json::StreamWriterBuilder builder; builder["indentation"] = ""; builder["emitUTF8"] = true; Json::Value jsonRoot; - jsonRoot["command"] = command; + jsonRoot["cmd"] = command; char str[128]; uuid_t uuid; uuid_generate_time(uuid); uuid_unparse_upper(uuid, str); - if (traceId == "") - { - jsonRoot["traceId"] = str; - } - else - { - jsonRoot["traceId"] = traceId; + if (traceId == "") { + jsonRoot["cmdId"] = str; + } else { + jsonRoot["cmdId"] = traceId; } Json::Int64 mtime = (Json::Int64)time(NULL); mtime *= 1000; - jsonRoot["mtime"] = mtime; - - std::vector tokens = split(std::string(str), '-'); - std::string iv = tokens.at(0) + tokens.at(1) + tokens.at(2); - jsonRoot["iv"] = iv; - jsonRoot["payload"] = _encode(payload, (unsigned char *)iv.c_str()).c_str(); + jsonRoot["time"] = mtime; + jsonRoot["data"] = payload; std::string outputConfig = Json::writeString(builder, jsonRoot); - vLog(LOG_DEBUG, "send topic: %s, payload: %d\n", topic, outputConfig.length()); - int rc = mosquitto_publish(mosq, NULL, topic, outputConfig.length(), outputConfig.c_str(), 0, false); - if (rc != MOSQ_ERR_SUCCESS) { - vLog(LOG_DEBUG, "publishing topic: %s is error<%d,%s>。\n", topic, rc, mosquitto_strerror(rc)); + vLog(LOG_DEBUG, "send cmd: %s, payload: %d\n", command, outputConfig.length()); + int rc = websocket_write(conn, outputConfig.c_str(), outputConfig.length()); + if (rc != 0) { + vLog(LOG_DEBUG, "websocket write is error<%d>。\n", rc); return false; } return true; } +#if 1 static int GetProcessIDByIRN(QLONG IRN) { int i; @@ -327,17 +435,12 @@ static int GetProcessIDByIRN(QLONG IRN) return -1; } -static QLONG GetProcessIRNByPid(int pid) -{ - if (pid < 0 || pid >= PROCESSES_NUM) return -1; - return config.processes[pid].irn; -} - static QLONG GetUnitIRNByUid(int uid) { if (uid < 0 || uid >= UNIT_NUM) return -1; return config.units[uid].irn; } +#endif static int GetUnitYXCount(int uid) { @@ -403,18 +506,19 @@ static float GetUnitYCRealFromValue(int uid, int order, long value) if (order < 0 || order >= pUnit->yccount) return 0; pYC = &pUnit->ycs[order]; udb = pYC->order; - if (udb < 0 || udb >= DATABASE_YC_NUM) { + if (udb < 0 || udb >= DATABASE_YC_NUM) + { coef = 1.0f; base = 0.0f; } - else { + else + { coef = pYC->coef; base = pYC->base; } return (float)(value * coef + base); } - static float GetUnitYMReal(int uid, int order) { int udb; @@ -527,53 +631,6 @@ static QLONG GetUnitYTIRNByPoint(int uid, int point) return pUnit->yts[point].irn; } -static int GetUnitYKPointByIRN(int& uid, QLONG IRN) -{ - int i; - int len; - struUnit* pUnit; - - if (ykirn2uid_map.find(IRN) != ykirn2uid_map.end()) - { - uid = ykirn2uid_map[IRN]; - if (uid < 0 || uid >= UNIT_NUM) return -1; - pUnit = &config.units[uid]; - if (pUnit->ykcount <= 0) return -1; - for (i = 0; i < pUnit->ykcount; i++) - { - if (pUnit->yks[i].irn == IRN) - { - return i; - } - } - } - return -1; -} - -static int GetUnitYTPointByIRN(int& uid, QLONG IRN) -{ - int i; - int len; - struUnit* pUnit; - - if (ytirn2uid_map.find(IRN) != ytirn2uid_map.end()) - { - uid = ykirn2uid_map[IRN]; - if (uid < 0 || uid >= UNIT_NUM) return -1; - pUnit = &config.units[uid]; - if (pUnit->ytcount <= 0) return -1; - - for (i = 0; i < pUnit->ytcount; i++) - { - if (pUnit->yts[i].irn == IRN) - { - return i; - } - } - } - return -1; -} - int GetUnitYXBW(int& uid, BOOLEAN& value, BYTE& qds, int& type, unionCP56Time& st) { int i; @@ -617,202 +674,6 @@ int GetUnitYCBW(int& uid, LONG& value, BYTE& qds, int& type, unionCP56Time& st) return -1; } -int get_pid_by_name(const char *process_name) -{ - DIR *dir; - struct dirent *entry; - char path[PATH_MAX]; - - // 打开/proc目录 - dir = opendir("/proc"); - if (dir == NULL) - { - perror("opendir failed"); - return -1; - } - - // 遍历/proc目录中的所有进程 - while ((entry = readdir(dir)) != NULL) - { - if (entry->d_type == DT_DIR) - { // 检查进程名是否匹配 - if (strcmp(entry->d_name, ".") != 0 && strcmp(entry->d_name, "..") != 0) - { - snprintf(path, sizeof(path), "/proc/%s/cmdline", entry->d_name); - FILE *fp = fopen(path, "r"); - if (fp) - { - char cmdline[PATH_MAX]; - fgets(cmdline, sizeof(cmdline), fp); - fclose(fp); - - // 找到匹配的进程 - if (strstr(cmdline, process_name) != NULL) - { - closedir(dir); - return atoi(entry->d_name); - } - } - } - } - } - - closedir(dir); - return -1; -} - -bool thread_created = false; -time_t last_time = time(NULL); //历史时间 -int file_total_parts = 0; -int file_current_parts = 0; -char temp_path[MAX_PATH]; - -int countFilesInDirectory(const char* directory) -{ - DIR *dir; - struct dirent *ent; - int count = 0; - - // 打开目录 - dir = opendir(directory); - if (dir == NULL) { - perror("无法打开目录"); - return -1; - } - - // 遍历目录 - while ((ent = readdir(dir))) { - if (strcmp(ent->d_name, ".") == 0 || strcmp(ent->d_name, "..") == 0) { - continue; - } - - // 统计文件数量 - if (ent->d_type == DT_REG) { - count++; - } - } - - // 关闭目录 - closedir(dir); - - return count; -} - -int countFilesByTypeInDirectory(const char* directory, const char* fileExtension) -{ - DIR *dir; - struct dirent *ent; - int count = 0; - - // 打开目录 - dir = opendir(directory); - if (dir == NULL) { - perror("无法打开目录"); - return -1; - } - - // 遍历目录 - while ((ent = readdir(dir))) { - if (strcmp(ent->d_name, ".") == 0 || strcmp(ent->d_name, "..") == 0) { - continue; - } - - // 判断文件类型 - if (ent->d_type == DT_REG) { - // 使用fnmatch函数匹配文件扩展名 - if (fnmatch(fileExtension, ent->d_name, 0) == 0) { - count++; - } - } - } - - // 关闭目录 - closedir(dir); - - return count; -} - -static void *waiting(void *param) -{ - //const char* filePathName = (const char*)param; - struct mosquitto *mosq = (struct mosquitto *)param; - char filePathName[MAX_PATH]; - time_t now_time = time(NULL); - int count = 0; - vLog(LOG_DEBUG, "判断文件夹:%s的文件是否存在。\n", temp_path); - - snprintf(filePathName, sizeof(filePathName), "%s/%s", temp_path, configfile); - while (true) - { - sleep(1); - //判断文件是否达到数量。 - count = countFilesByTypeInDirectory(temp_path, "*.pat"); - vLog(LOG_DEBUG, "现在有%d个指定文件,file current parts is: %d, 总共需要%d个文件。\n", count, file_current_parts, file_total_parts); - if (file_current_parts == file_total_parts) - { //打开指定文件 - vLog(LOG_DEBUG, "准备合并文件,文件名为:%s.\n", filePathName); - FILE* in = fopen(filePathName, "wb+"); - if (in == NULL) break; - for (int i = 0; i < count; i++) - { - char fileName[MAX_PATH]; - snprintf(fileName, sizeof(fileName), "%s/%d.pat", temp_path, (i+1)); - vLog(LOG_DEBUG, "打开文件名为:%s的文件.\n", fileName); - FILE* out = fopen(fileName, "rb"); - if (out == NULL) break; - while (!feof(out)) - { - char buffer[1024]; - int pos = fread(buffer, 1, sizeof(buffer), out); - int w = fwrite(buffer, 1, pos, in); - } - fclose(out); - - remove(fileName); - } - fclose(in); - file_current_parts = 0; - vLog(LOG_DEBUG, "文件下载完成.\n"); - break; - } - if (now_time >= last_time + 300) - { - vLog(LOG_DEBUG, "load config file timeout.\n"); - //发送超时报文 - break; - } - } - - count = countFilesByTypeInDirectory(temp_path, "*.pat"); - for (int i = 0; i < count; i++) - { - char fileName[MAX_PATH]; - snprintf(fileName, sizeof(fileName), "%s/%d.pat", temp_path, (i+1)); - remove(fileName); - } - - vLog(LOG_DEBUG, "结束线程。\n"); - file_current_parts = 0; - thread_created = false; - - //发送 - /* - { - "result": true, - "message": "success" - } - */ - Json::Value payload; - payload["result"] = true; - payload["message"] = "success"; - publish_sensor_data(mosq, g_traceId, "TERMINAL_CONFIG_UPDATE_RESPONSE", publish_topic, payload); - - g_dataAcquisitionReload = true; - - pthread_exit(0); - return NULL; -} - BOOLEAN GetUnitYK(int uid, int& order, BYTE& value, BYTE& act, BYTE& result) { int i; @@ -1085,20 +946,17 @@ void SetUnitYT(int uid, int order, DWORD value, BYTE act, BYTE result) break; } } -#define CMD_CONTROL_OPERATION 0 -#define CMD_CONTROL_SETTING 1 -int MakeYKFrame(struct mosquitto *mosq) +int MakeYKFrame(noPollConn* conn, int uid) { - int uid; int order; BYTE value, action, result; - for (int i = 0; i < m_total_units.size(); i++) + //for (int i = 0; i < m_total_units.size(); i++) { - uid = m_total_units.at(i); - if (uid < 0 || uid >= UNIT_NUM) continue; - if (!GetUnitYK(uid, order, value, action, result)) continue; + //uid = m_total_units.at(i); + if (uid < 0 || uid >= UNIT_NUM) return -1; + if (!GetUnitYK(uid, order, value, action, result)) return -1; vLog(LOG_WARN, "Unit(%d) yk(%d) %s state is %s result is %s\n", uid, order, (value ? "CLOSE" : "TRIP"), val_to_str(action, yk_state, "STATE=%d"), val_to_str(result, yk_result, "RESULT=%d")); @@ -1138,23 +996,22 @@ int MakeYKFrame(struct mosquitto *mosq) jsonRoot["result"] = true; } - publish_sensor_data(mosq, g_traceId, "CONTROL_REPLY", publish_topic, jsonRoot); + publish_sensor_data(conn, g_traceId, "deviceControlResp", jsonRoot); return 1; } return 0; } -int MakeYTFrame(struct mosquitto *mosq) +int MakeYTFrame(noPollConn* conn, int uid) { - int uid; int order; BYTE action, result; DWORD value; - for (int i = 0; i < m_total_units.size(); i++) + //for (int i = 0; i < m_total_units.size(); i++) { - if (uid < 0 || uid >= UNIT_NUM) continue; - if (!GetUnitYT(uid, order, value, action, result)) continue; + if (uid < 0 || uid >= UNIT_NUM) return -1; + if (!GetUnitYT(uid, order, value, action, result)) return -1; vLog(LOG_DEBUG, "Unit(%d) set point(%d) %d state is %s result is %s\n", uid, order, value, val_to_str(action, yt_state, "STATE=%d"), val_to_str(result, yt_result, "RESULT=%d")); @@ -1193,52 +1050,60 @@ int MakeYTFrame(struct mosquitto *mosq) jsonRoot["result"] = true; } - publish_sensor_data(mosq, g_traceId, "CONTROL_REPLY", publish_topic, jsonRoot); + publish_sensor_data(conn, g_traceId, "deviceControlResp", jsonRoot); return 1; } return 0; } -bool OnReceivedDeviceCommand(Json::Value jsonRoot) +bool OnReceivedDeviceCommand(const Json::Value jsonRoot) { - if (jsonRoot["irn"].isNull()) return FALSE; - if (jsonRoot["operType"].isNull()) return FALSE; - if (jsonRoot["operValue"].isNull()) return FALSE; + if (jsonRoot["deviceId"].isNull()) return FALSE; + if (jsonRoot["serviceName"].isNull()) return FALSE; + if (jsonRoot["opValue"].isNull()) return FALSE; - int uid = 0;//GetCurUnitID(); + std::string deviceId = jsonRoot["deviceId"].asString(); + std::string serviceName = jsonRoot["serviceName"].asString(); + if (unitname2service_map.find(deviceId) == unitname2service_map.end()) { + vLog(LOG_WARN, "下发了一个不存在services的控制设备id<%s>。\n", deviceId.c_str()); + return false; + } + name2servicemap name2service_map = unitname2service_map[deviceId]; + if (name2service_map.find(serviceName) == name2service_map.end()) { + vLog(LOG_WARN, "下发了一个不存在的service名称<%s>。\n", serviceName.c_str()); + return false; + } - Json::Int64 irn = jsonRoot["irn"].asInt64(); - int operType = jsonRoot["operType"].asInt(); - int operValue = jsonRoot["operValue"].asInt(); - int point; + struct_service_item item = name2service_map[serviceName]; + + int operType = item.type; + int uid = item.uid; + int point = item.point; - if (operType == CMD_CONTROL_OPERATION) //遥控 - { - //根据irn来查找point - point = GetUnitYKPointByIRN(uid, irn); - if (point < 0) - { + if (operType == CMD_CONTROL_OPERATION) { //遥控 + if (point < 0) { vLog(LOG_ERROR, "未能找到对应的遥控点号,请检查并确认。\n"); return FALSE; } + int operValue = jsonRoot["opValue"].asInt(); SetUnitYK(uid, point, (operValue & 0x03), YKS_SELREQ, YKR_IDLE); vLog(LOG_WARN, "Unit(%d) yk(%d) %s state is YKS_SELREQ result is YKR_IDLE.\n", uid, point, ((operValue & 0x03) ? "CLOSE" : "TRIP")); - } - else if (operType == CMD_CONTROL_SETTING) //遥调 - { - //根据irn来查找point - point = GetUnitYTPointByIRN(uid, irn); - if (point < 0) + } else if (operType == CMD_CONTROL_SETTING) { //遥调 + union main { + float fval; + DWORD dval; + } operValue; + if (point < 0) { vLog(LOG_ERROR, "未能找到对应的遥调点号,请检查并确认。\n"); return FALSE; } - SetUnitYT(uid, point, operValue, YTS_EXEREQ, YTR_IDLE); - vLog(LOG_DEBUG, "Unit(%d) set point(%d) %d state is YTS_EXEREQ result is YTR_IDLE.\n", uid, point, operValue); - } - else - { + if (jsonRoot["opValue"].isDouble()) operValue.fval = jsonRoot["opValue"].asFloat(); + else if (jsonRoot["opValue"].isInt()) operValue.dval = jsonRoot["opValue"].asInt(); + SetUnitYT(uid, point, operValue.dval, YTS_EXEREQ, YTR_IDLE); + vLog(LOG_DEBUG, "Unit(%d) set point(%d) %d state is YTS_EXEREQ result is YTR_IDLE.\n", uid, point, operValue.dval); + } else { vLog(LOG_ERROR, "平台下发的<%d>命令错误。operType不是<0-遥控或1-遥调>,系统不支持的命令!\n", operType); return FALSE; } @@ -1246,8 +1111,9 @@ bool OnReceivedDeviceCommand(Json::Value jsonRoot) return TRUE; } -bool OnReceivedLogCommand(Json::Value jsonRoot) +bool OnReceivedLogCommand(const Json::Value jsonRoot) { +#if 0 if (jsonRoot["linkIrn"].isNull()) return FALSE; QLONG linkIrn = -1; if (jsonRoot["linkIrn"].isInt64()) linkIrn = jsonRoot["linkIrn"].asInt64(); @@ -1256,219 +1122,476 @@ bool OnReceivedLogCommand(Json::Value jsonRoot) channelBuffer.enabled = TRUE; channelBuffer.mon_port = GetProcessIDByIRN(linkIrn);; vLog(LOG_DEBUG, "receive action LINK_LOG_REQUEST, irn is:%lld, pid is: %d\n", struMonLinkLog.m_iLinkIrn, channelBuffer.mon_port); +#endif + return true; +} + + +static BOOLEAN processUartParam(const char *jsonStr, int ord) +{ + Json::Value jsonRoot; + const int rawJsonLength = strlen(jsonStr); + std::string err; + + Json::CharReaderBuilder builder; + builder["emitUTF8"] = true; + Json::CharReader* reader(builder.newCharReader()); + if (!reader->parse(jsonStr, jsonStr + rawJsonLength, &jsonRoot, &err)) { + vLog(LOG_ERROR, "串口参数:Json 格式错误!\n"); + return FALSE; + } + + config_config.hardware.ports[ord].state = TRUE; + snprintf(config_config.hardware.ports[ord].name, sizeof(config_config.hardware.ports[ord].name), "ttyS%d", ord); + config_config.hardware.ports[ord].baud = 9600; + config_config.hardware.ports[ord].data = 8; + config_config.hardware.ports[ord].parity = 0; + config_config.hardware.ports[ord].stop = 0; + config_config.hardware.ports[ord].timeout = 1000; + + if (jsonRoot["name"].isString()) { + snprintf(config_config.hardware.ports[ord].name, sizeof(config_config.hardware.ports[ord].name), "%s", jsonRoot["name"].asCString()); + } + if (jsonRoot["baud"].isInt()) { + config_config.hardware.ports[ord].baud = jsonRoot["baud"].asInt(); + } else if (jsonRoot["baud"].isString()) { + config_config.hardware.ports[ord].baud = atoi(jsonRoot["baud"].asCString()); + } + if (jsonRoot["data"].isInt()) { + config_config.hardware.ports[ord].data = jsonRoot["data"].asInt(); + } else if (jsonRoot["data"].isString()) { + config_config.hardware.ports[ord].data = atoi(jsonRoot["data"].asCString()); + } + if (jsonRoot["parity"].isInt()) { + config_config.hardware.ports[ord].parity = jsonRoot["parity"].asInt(); + } else if (jsonRoot["parity"].isString()) { + config_config.hardware.ports[ord].parity = atoi(jsonRoot["parity"].asCString()); + } + if (jsonRoot["stop"].isInt()) { + if (jsonRoot["stop"].asInt() == 1) config_config.hardware.ports[ord].stop = 0; + else if (jsonRoot["stop"].asInt() == 2) config_config.hardware.ports[ord].stop = 2; + } else if (jsonRoot["stop"].isString()) { + if (jsonRoot["stop"].asString() == "1") config_config.hardware.ports[ord].stop = 0; + else if (jsonRoot["stop"].asString() == "2") config_config.hardware.ports[ord].stop = 2; + } + if (jsonRoot["timeout"].isInt()) { + config_config.hardware.ports[ord].timeout = jsonRoot["timeout"].asInt(); + } else if (jsonRoot["timeout"].isString()) { + config_config.hardware.ports[ord].timeout = atoi(jsonRoot["timeout"].asCString()); + } + return TRUE; } -bool OnReceivedSystemAction(struct mosquitto *mosq, const std::string traceId, const std::string command, Json::Value payload, const void* obj) +static BOOLEAN processHostIEC104ProcessParam(const char *jsonStr, int pid) +{ + Json::Value jsonRoot; + const int rawJsonLength = strlen(jsonStr); + std::string err; + + Json::CharReaderBuilder builder; + builder["emitUTF8"] = true; + Json::CharReader* reader(builder.newCharReader()); + if (!reader->parse(jsonStr, jsonStr + rawJsonLength, &jsonRoot, &err)) { + vLog(LOG_ERROR, "HostIEC104参数:Json 格式错误!\n"); + return FALSE; + } + config_config.processes[pid].option.iec104.net.socket_type = SOCK_STREAM; + config_config.processes[pid].option.iec104.net.target_addr = INADDR_ANY; + config_config.processes[pid].option.iec104.net.target_port = 2404; + config_config.processes[pid].option.iec104.asdu_addr_size = 2; + config_config.processes[pid].option.iec104.cot_size = 2; + config_config.processes[pid].option.iec104.info_addr_size = 3; + config_config.processes[pid].option.iec104.t0 = 30; + config_config.processes[pid].option.iec104.t1 = 15; + config_config.processes[pid].option.iec104.t2 = 10; + config_config.processes[pid].option.iec104.t3 = 20; + config_config.processes[pid].option.iec104.k = 12; + config_config.processes[pid].option.iec104.w = 8; + + if (jsonRoot["host"].isInt()) { + config_config.processes[pid].option.iec104.net.target_addr = jsonRoot["host"].asInt(); + } else if (jsonRoot["host"].isString()) { + if (inet_pton(AF_INET, jsonRoot["host"].asCString(), &config_config.processes[pid].option.iec104.net.target_addr) == 1) { + vLog(LOG_DEBUG, "IPv4 地址转换成功,网络字节序为: %u.\n", config_config.processes[pid].option.iec104.net.target_addr); + } else { + vLog(LOG_ERROR, "inet_pton error(%d,%s).\n", errno, strerror(errno)); + } + } + if (jsonRoot["port"].isInt()) { + config_config.processes[pid].option.iec104.net.target_port = jsonRoot["port"].asInt(); + } else if (jsonRoot["port"].isString()) { + config_config.processes[pid].option.iec104.net.target_port = atoi(jsonRoot["port"].asCString()); + } + if (jsonRoot["t0"].isInt()) { + config_config.processes[pid].option.iec104.t0 = jsonRoot["t0"].asInt(); + } else if (jsonRoot["t0"].isString()) { + config_config.processes[pid].option.iec104.t0 = atoi(jsonRoot["t0"].asCString()); + } + if (jsonRoot["t1"].isInt()) { + config_config.processes[pid].option.iec104.t0 = jsonRoot["t1"].asInt(); + } else if (jsonRoot["t1"].isString()) { + config_config.processes[pid].option.iec104.t0 = atoi(jsonRoot["t1"].asCString()); + } + if (jsonRoot["t2"].isInt()) { + config_config.processes[pid].option.iec104.t0 = jsonRoot["t2"].asInt(); + } else if (jsonRoot["t2"].isString()) { + config_config.processes[pid].option.iec104.t0 = atoi(jsonRoot["t2"].asCString()); + } + if (jsonRoot["t3"].isInt()) { + config_config.processes[pid].option.iec104.t0 = jsonRoot["t3"].asInt(); + } else if (jsonRoot["t3"].isString()) { + config_config.processes[pid].option.iec104.t0 = atoi(jsonRoot["t3"].asCString()); + } + + return TRUE; +} + +static bool dealConfigFile(const Json::Value jsonRoot) +{ + do { + if (!configInitializeMemory()) { + vLog(LOG_ERROR, "Fail initialize memory!\n"); + break; + } + if (!jsonRoot["nodeId"].isString()) { + vLog(LOG_ERROR, "配置文件格式错误,缺少节点ID的配置信息,请检查。\n"); + break; + } + + name2intmap deviceIDs; + deviceIDs.clear(); + //更新节点配置 + config_nodes.m_node[0].m_netnode_no = 0; + config_nodes.m_node[0].m_tcitype = 1; + config_nodes.m_node[0].m_target_addr = INADDR_LOOPBACK; + config_nodes.m_node[0].m_target_port = 15000; + snprintf(config_nodes.m_node[0].m_machine_name, sizeof(config_nodes.m_node[0].m_machine_name), "%s", jsonRoot["nodeId"].asCString()); + //更新设备列表 + const Json::Value equipments = jsonRoot["equipments"]; + { + long yxorder = 0, ycorder = 0, ymorder = 0, ykorder = 0, ytorder = 0; + char dbyxname[512]; + snprintf(dbyxname, sizeof(dbyxname), "%s/%s", configpath, FILE_DATABASE_YX_STATIC); + FILE *pfdbyxname = fopen(dbyxname, "wb+"); + char dbycname[512]; + snprintf(dbycname, sizeof(dbycname), "%s/%s", configpath, FILE_DATABASE_YC_STATIC); + FILE *pfdbycname = fopen(dbycname, "wb+"); + char dbymname[512]; + snprintf(dbymname, sizeof(dbymname), "%s/%s", configpath, FILE_DATABASE_YM_STATIC); + FILE *pfdbymname = fopen(dbymname, "wb+"); + char dbykname[512]; + snprintf(dbykname, sizeof(dbykname), "%s/%s", configpath, FILE_DATABASE_YK_STATIC); + FILE *pfdbykname = fopen(dbykname, "wb+"); + char dbytname[512]; + snprintf(dbytname, sizeof(dbytname), "%s/%s", configpath, FILE_DATABASE_YT_STATIC); + FILE *pfdbytname = fopen(dbytname, "wb+"); + + int count = equipments.size(); + for (int i = 0; i < count; i++) { + const Json::Value equipment = equipments[i]; + int uid = i; + + snprintf(config_static_units[uid].name, sizeof(config_static_units[uid].name), "%s", "device"); + snprintf(config_static_units[uid].model, sizeof(config_static_units[uid].model), "%s", "model"); + snprintf(config_static_units[uid].manufacturerId, sizeof(config_static_units[uid].manufacturerId), "%s", "iss"); + if (equipment["id"].isString()) { + std::string id = equipment["id"].asString(); + deviceIDs.insert(name2intmap::value_type(id, uid)); + snprintf(config_static_units[uid].deviceId, sizeof(config_static_units[uid].deviceId), "%s", id.c_str()); + } else { + snprintf(config_static_units[uid].deviceId, sizeof(config_static_units[uid].deviceId), "%s", "iss"); + } + + attrvector yxs, ycs, yms, yks, yts; + const Json::Value attrs = equipment["attrs"]; + if (!attrs.isNull()) { + int size = attrs.size(); + for (int j = 0; j < size; j++) { + const Json::Value attr = attrs[j]; + std::string type = ""; + if (attr["type"].isString()) type = attr["type"].asString(); + struct_attr name_param; + name_param.name = ""; + if (attr["name"].isString()) name_param.name = attr["name"].asString(); + if (attr["params"].isObject()) name_param.value = attr["params"]; + if (type == "yc") ycs.push_back(name_param); + else if (type == "yx") yxs.push_back(name_param); + else if (type == "ym") yms.push_back(name_param); + } + } + const Json::Value services = equipment["services"]; + if (!services.isNull()) { + int size = services.size(); + for (int j = 0; j < size; j++) { + const Json::Value service = services[j]; + std::string type = ""; + if (service["type"].isString()) type = service["type"].asString(); + struct_attr name_param; + name_param.name = ""; + if (service["name"].isString()) name_param.name = service["name"].asString(); + if (service["params"].isObject()) name_param.value = service["params"]; + if (type == "yk") yks.push_back(name_param); + else if (type == "yt") yts.push_back(name_param); + } + } + + config_config.units[uid].state = TRUE; + config_config.units[uid].type = MASTER_UNIT; + config_config.units[uid].yxcount = yxs.size(); + config_config.units[uid].yccount = ycs.size(); + config_config.units[uid].ymcount = yms.size(); + config_config.units[uid].ykcount = yks.size(); + config_config.units[uid].ytcount = yts.size(); + + if (config_config.units[uid].yccount > 0) { + config_config.units[uid].ycs = new struUnitYC[config_config.units[uid].yccount]; + if (NULL != config_config.units[uid].ycs) { + memset(config_config.units[uid].ycs, 0, sizeof(struUnitYC) * config_config.units[uid].yccount); + for (int k = 0; k < config_config.units[uid].yccount; k++) { + snprintf(config_config.units[uid].ycs[k].name, sizeof(config_config.units[uid].ycs[k].name), "%s", ycs[k].name.c_str()); + config_config.units[uid].ycs[k].order = ycorder++; + config_config.units[uid].ycs[k].factor = 1; + config_config.units[uid].ycs[k].change_pos = 2; + config_config.units[uid].ycs[k].coef = 1.0f; + config_config.units[uid].ycs[k].base = 0; + config_config.units[uid].ycs[k].upBound = 9999999.999f; + config_config.units[uid].ycs[k].lowBound = -9999999.999f; + config_config.units[uid].ycs[k].limit1Enable = FALSE; + config_config.units[uid].ycs[k].limit1Low = 0; + config_config.units[uid].ycs[k].limit1High = 0.0f; + config_config.units[uid].ycs[k].limit2Enable = FALSE; + config_config.units[uid].ycs[k].limit2Low = 0; + config_config.units[uid].ycs[k].limit2High = 0.0f; + Json::Value param = ycs[k].value; + if (!param.isNull()) { + if (param["coef"].isDouble()) config_config.units[uid].ycs[k].coef = param["coef"].asFloat(); + if (param["base"].isDouble()) config_config.units[uid].ycs[k].base = param["base"].asFloat(); + if (param["upBound"].isDouble()) config_config.units[uid].ycs[k].upBound = param["upBound"].asFloat(); + if (param["lowBound"].isDouble()) config_config.units[uid].ycs[k].lowBound = param["lowBound"].asFloat(); + if (param["limit1Enable"].isInt()) config_config.units[uid].ycs[k].limit1Enable = param["limit1Enable"].asInt(); + if (param["limit2Enable"].isInt()) config_config.units[uid].ycs[k].limit2Enable = param["limit2Enable"].asInt(); + if (param["limit1Low"].isDouble()) config_config.units[uid].ycs[k].limit1Low = param["limit1Low"].asFloat(); + if (param["limit2Low"].isDouble()) config_config.units[uid].ycs[k].limit2Low = param["limit2Low"].asFloat(); + if (param["limit1High"].isDouble()) config_config.units[uid].ycs[k].limit1High = param["limit1High"].asFloat(); + if (param["limit2High"].isDouble()) config_config.units[uid].ycs[k].limit2High = param["limit2High"].asFloat(); + } + if (config_config.units[uid].type == MASTER_UNIT) { + config_database.ycs[config_config.units[uid].ycs[k].order].coef = config_config.units[uid].ycs[k].coef; + config_database.ycs[config_config.units[uid].ycs[k].order].base = config_config.units[uid].ycs[k].base; + } + if (pfdbycname) { + fseek(pfdbycname, sizeof(struYCStatic) * config_config.units[uid].ycs[k].order, SEEK_SET); + fwrite(ycs[k].name.c_str(), (MAX_NAME_SIZE << 2), 1, pfdbycname); + } + } + } + } + if (config_config.units[uid].ymcount > 0) { + config_config.units[uid].yms = new struUnitYM[config_config.units[uid].ymcount]; + if (NULL != config_config.units[uid].yms) { + memset(config_config.units[uid].yms, 0, sizeof(struUnitYM) * config_config.units[uid].ymcount); + for (int k = 0; k < config_config.units[uid].ymcount; k++) { + snprintf(config_config.units[uid].yms[k].name, sizeof(config_config.units[uid].yms[k].name), "%s", yms[k].name.c_str()); + config_config.units[uid].yms[k].order = ymorder++; + config_config.units[uid].yms[k].coef = 1.0f; + config_config.units[uid].yms[k].base = 0; + if (config_config.units[uid].type == MASTER_UNIT) { + config_database.yms[config_config.units[uid].yms[k].order].coef = config_config.units[uid].yms[k].coef; + config_database.yms[config_config.units[uid].yms[k].order].base = config_config.units[uid].yms[k].base; + } + + if (pfdbymname) { + fseek(pfdbymname, sizeof(struYMStatic) * config_config.units[uid].yms[k].order, SEEK_SET); + fwrite(yms[k].name.c_str(), (MAX_NAME_SIZE << 2), 1, pfdbymname); + } + } + } + } + if (config_config.units[uid].ykcount > 0) { + config_config.units[uid].yks = new struUnitYK[config_config.units[uid].ykcount]; + if (NULL != config_config.units[uid].yks) { + memset(config_config.units[uid].yks, 0, sizeof(struUnitYK) * config_config.units[uid].ykcount); + for (int k = 0; k < config_config.units[uid].ykcount; k++) { + snprintf(config_config.units[uid].yks[k].name, sizeof(config_config.units[uid].yks[k].name), "%s", yks[k].name.c_str()); + config_config.units[uid].yks[k].order = ykorder++; + + if (pfdbykname) { + fseek(pfdbykname, sizeof(struYKStatic) * config_config.units[uid].yks[k].order, SEEK_SET); + fwrite(yks[k].name.c_str(), (MAX_NAME_SIZE << 2), 1, pfdbykname); + } + } + } + } + if (config_config.units[uid].ytcount > 0) { + config_config.units[uid].yts = new struUnitYT[config_config.units[uid].ytcount]; + if (NULL != config_config.units[uid].yts) { + memset(config_config.units[uid].yts, 0, sizeof(struUnitYT) * config_config.units[uid].ytcount); + for (int k = 0; k < config_config.units[uid].ytcount; k++) { + snprintf(config_config.units[uid].yts[k].name, sizeof(config_config.units[uid].yts[k].name), "%s", yts[k].name.c_str()); + config_config.units[uid].yts[k].order = ytorder++; + + if (pfdbytname) { + fseek(pfdbytname, sizeof(struYTStatic) * config_config.units[uid].yts[k].order, SEEK_SET); + fwrite(yts[k].name.c_str(), (MAX_NAME_SIZE << 2), 1, pfdbytname); + } + } + } + } + if (config_config.units[uid].yxcount > 0) { + config_config.units[uid].yxs = new struUnitYX[config_config.units[uid].yxcount]; + if (NULL != config_config.units[uid].yxs) { + memset(config_config.units[uid].yxs, 0, sizeof(struUnitYX) * config_config.units[uid].yxcount); + for (int k = 0; k < config_config.units[uid].yxcount; k++) { + snprintf(config_config.units[uid].yxs[k].name, sizeof(config_config.units[uid].yxs[k].name), "%s", yxs[k].name.c_str()); + config_config.units[uid].yxs[k].order = yxorder++; + config_config.units[uid].yxs[k].invert = 0; + config_database.yxs[config_config.units[uid].yxs[k].order].auto_reset = 0; + + if (pfdbyxname) { + fseek(pfdbyxname, sizeof(struYKStatic) * config_config.units[uid].yxs[k].order, SEEK_SET); + fwrite(yxs[k].name.c_str(), (MAX_NAME_SIZE << 2), 1, pfdbyxname); + } + Json::Value param = yxs[k].value; + if (!param.isNull()) { + if (param["invert"].asInt()) config_config.units[uid].yxs[k].invert = param["invert"].asInt(); + } + } + } + } + } + if (pfdbyxname) fclose(pfdbyxname); + if (pfdbycname) fclose(pfdbycname); + if (pfdbymname) fclose(pfdbymname); + if (pfdbykname) fclose(pfdbykname); + if (pfdbytname) fclose(pfdbytname); + } + //更新链路配置文件 + const Json::Value links = jsonRoot["links"]; + int count = links.size(); + int uid = 0; + int uartId = 0; + //my_units_map.clear(); + m_gLinkIDs.clear(); + for (int i = 0; i < count; i++) { + const Json::Value link = links[i]; + + config_config.processes[i].state = TRUE; + config_config.processes[i].type = MASTER_UNIT; + config_config.processes[i].time_gap = 300; //默认参数 + config_config.processes[i].poll_gap = 5; + config_config.processes[i].mode = PROCESS_MODE_MASTER; + + if (link["linkName"].isString()) { + snprintf(config_config.processes[i].name, sizeof(config_config.processes[i].name), "%s", link["linkName"].asCString()); + } + if (link["linkId"].isString()) { + m_gLinkIDs.push_back(link["linkId"].asString()); + } + if (link["protocol"].isInt()) { + config_config.processes[i].proto = link["protocol"].asInt(); + } + + //处理链路参数,根据协议参数的不同来处理 + const Json::Value params = link["params"]; + if (!params.isNull()) { + switch (config_config.processes[i].proto) { + case PROTOCOL_HOST_MODBUS_RTU: + processUartParam(params.asCString(), uartId); + config_config.processes[i].order = uartId++; + break; + case PROTOCOL_HOST_MODBUS_TCP: + case PROTOCOL_HOST_MODBUS_RTU_TCP: + break; + case PROTOCOL_HOST_IEC104: + processHostIEC104ProcessParam(params.asCString(), i); + break; + } + } + //处理链接设备 + const Json::Value devices = link["devices"]; + if (devices.isArray()) { + int size = devices.size(); + for (int j = 0; j < size; j++) { + std::string id = devices[j].asCString(); + if (deviceIDs.find(id) != deviceIDs.end()) { + config_config.processes[i].units[j] = deviceIDs[id]; + } + } + } + } + + //保存数据 + configWriteNodeCFG(); + configWriteSystemCFG(); + configWriteHardwareCFG(); + configWriteProcessCFG(); + configWriteUnitCFG(); + configWriteDatabaseCFG(); + //保存静态文件 + configWriteStaticUnitCFG(); //units.sta + } while (0); + //查询设备配置 + + //vLog(LOG_DEBUG, "mosq is: %ld, App name is: %s, and messageId is: %s\n", response->mosq, response->app, response->messageId); + + pthread_exit(0); + return NULL; +} + +bool OnReceivedSystemAction(noPollConn* conn, const std::string cmdId, const std::string cmd, const Json::Value data) { int rc; int code = 0; std::string reason; - do - { - if (command == "TERMINAL_CONFIG_UPDATE") - { //数据网关配置更新动作 - int version = -1; - int totalParts = 0; - int currentPart = 0; - if (payload["totalParts"].isNull()) { - vLog(LOG_DEBUG, "no totalParts part.\n"); - return false; - } - if (payload["currentPart"].isNull()) { - vLog(LOG_DEBUG, "no currentPart part.\n"); - return false; - } - if (payload["data"].isNull()) { - vLog(LOG_DEBUG, "no data part.\n"); - return false; - } - if (payload["partMD5"].isNull()) { - vLog(LOG_DEBUG, "no partMD5 part.\n"); - return false; - } - if (payload["version"].isInt()) version = payload["version"].asInt(); - if (payload["totalParts"].isInt()) totalParts = payload["totalParts"].asInt(); - if (payload["currentPart"].isInt()) currentPart = payload["currentPart"].asInt(); - - vLog(LOG_DEBUG, "totalParts is: %d, and currentPart is: %d.\n", totalParts, currentPart); - - g_traceId = traceId; //response命令的traceId -#if 0 - if (config.version < 0) - { - code = 201; - reason = "当前版本采用离线运行方式"; - vLog(LOG_DEBUG, "当前版本采用离线运行方式。请联系工作人员\n"); - } - - else if (config.version >= version) - { - code = 200; - reason = "当前版本大于需更新版本"; - vLog(LOG_DEBUG, "当前版本大于需更新版本,本次无需更新。\n"); - } -#endif -// else - { - //此处判断一下开启一个线程 - if (!thread_created) - { - vLog(LOG_DEBUG, "创建一个等待线程。\n"); - pthread_t tid; - pthread_create(&tid, NULL, waiting, (void *)mosq); - thread_created = true; - - file_total_parts = totalParts; - } - //更新最后收到报文时间 - last_time = time(NULL); - -// config.version = version; - //解码 - std::string data = payload["data"].asString(); - std::string partMD5 = payload["partMD5"].asString(); - int r; - char *out; - int len = Base64_decodeLength(data.c_str(), data.length()); - vLog(LOG_DEBUG, "base64解码长度 is: %d\n", len); - if (len > 0) - { - out = new char[len+1]; - memset(out, 0, sizeof(char)*(len + 1)); - r = Base64_decode((b64_data_t *)out, (len + 1), data.c_str(), data.length()); - MD5_CTX md5; - MD5_Init(&md5); - MD5_Update(&md5, (void *)out, len); - unsigned char result[32]; - MD5_Final(result, &md5); - std::string check; - char sum[54]; - snprintf(sum, sizeof(sum), "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", - result[0], result[1], result[2], result[3], result[4], result[5], result[6], result[7], - result[8], result[9], result[10], result[11], result[12], result[13], result[14], result[15]); - check = sum; - vLog(LOG_DEBUG, "partMD5 is: %s, checkMD5 is: %s\n", partMD5.c_str(), check.c_str()); - if (check == partMD5) - { - char file[128]; - snprintf(file, sizeof(file), "%s/%d.pat", temp_path, currentPart); - vLog(LOG_DEBUG, "准备写入文件:%s,%s.\n", file, (const char*)obj); - FILE* pf = fopen(file, "w+"); - if (pf) - { - fwrite(out, len, 1, pf); - fclose(pf); - file_current_parts++; - } - } - - delete [] out; - } - } - } - else if (command == "CMD_CONTROL") - { - g_traceId = traceId; //response命令的traceId - OnReceivedDeviceCommand(payload); - } - else if (command == "LINK_LOG_REQUEST") - { - OnReceivedLogCommand(payload); + do { + if (cmd == "configUpdate") { //配置更新 + g_traceId = cmdId; + dealConfigFile(data); + } else if (cmd == "deviceControl") { + g_traceId = cmdId; //response命令的traceId + OnReceivedDeviceCommand(data); + } else if (cmd == "LINK_LOG_REQUEST") { + OnReceivedLogCommand(data); + } else { + vLog(LOG_DEBUG, "command: %s is not supported.\n", cmd.c_str()); } } while (0); return true; } -/* Callback called when the client receives a CONNACK message from the broker. */ -void on_connect(struct mosquitto *mosq, void *obj, int reason_code) +void on_message(noPollConn* conn, const char *msg, const int size) { - int rc; - vLog(LOG_DEBUG, "on_connect: %s\n", mosquitto_connack_string(reason_code)); - if (reason_code != 0) - { - mosquitto_disconnect(mosq); - } + if (msg == NULL) return; + if (size <= 0) return; - rc = mosquitto_subscribe(mosq, NULL, subscribe_topic, 0); - if (rc != MOSQ_ERR_SUCCESS) - { - vLog(LOG_ERROR, "Error subscribing: %s\n", mosquitto_strerror(rc)); - mosquitto_disconnect(mosq); - } -} - -/* Callback called when the broker sends a SUBACK in response to a SUBSCRIBE. */ -void on_subscribe(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos) -{ - int i; - bool have_subscription = false; - - for (i = 0; i < qos_count; i++) - { - vLog(LOG_DEBUG, "on_subscribe: %d:granted qos = %d\n", i, granted_qos[i]); - if (granted_qos[i] <= 2) - { - have_subscription = true; - } - } - if (have_subscription == false) - { - vLog(LOG_ERROR, "Error: All subscriptions rejected.\n"); - mosquitto_disconnect(mosq); - } -} - -/* Callback called when the client receives a message. */ -void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) -{ /* This blindly prints the payload, but the payload can be anything so take care. */ - vLog(LOG_DEBUG, "topic: %s, payload is: %.*s\n", msg->topic, 128, (char *)msg->payload); - Json::Value jsonRoot; + Json::Value jsonRoot; jsonRoot.clear(); - char *buffer = new char[msg->payloadlen+1]; - if (buffer == NULL) return; - memset(buffer, 0, msg->payloadlen+1); - strncpy(buffer, (char *)msg->payload, msg->payloadlen); std::string err; Json::CharReaderBuilder builder; Json::CharReader* reader(builder.newCharReader()); do { - if (!reader->parse(buffer, buffer + msg->payloadlen, &jsonRoot, &err)) - { - vLog(LOG_ERROR, "reader->parse(buffer, buffer + msg->payloadlen, &jsonRoot, &err) error<%d,%s>。\n", errno, err.c_str()); + if (!reader->parse(msg, msg + size, &jsonRoot, &err)) { + vLog(LOG_ERROR, "reader->parse(msg, msg + size, &jsonRoot, &err) error<%d,%s>。\n", errno, err.c_str()); break; } - if (jsonRoot["command"].isNull()) - { - vLog(LOG_ERROR, "json format lost command part.\n"); + if (jsonRoot["cmd"].isNull()) { + vLog(LOG_ERROR, "json format lost cmd part.\n"); break; } - if (jsonRoot["iv"].isNull()) - { - vLog(LOG_ERROR, "json format lost iv part.\n"); + if (jsonRoot["data"].isNull()) { + vLog(LOG_ERROR, "json format lost data part.\n"); break; } - if (jsonRoot["payload"].isNull()) - { - vLog(LOG_ERROR, "json format lost payload part.\n"); - break; - } - std::string command = jsonRoot["command"].asString(); - std::string traceId = jsonRoot["traceId"].asString(); - std::string iv = jsonRoot["iv"].asString(); - Json::Int64 mtime = jsonRoot["mtime"].asInt64(); - std::string datas = jsonRoot["payload"].asString(); - //解码 - Json::Value payload = _decode(datas.c_str(), datas.length(), (unsigned char *)iv.c_str()); - - OnReceivedSystemAction(mosq, traceId, command, payload, (const void*)obj); + std::string cmd = jsonRoot["cmd"].asString(); + std::string cmdId = jsonRoot["cmdId"].asString(); + Json::Int64 mtime = jsonRoot["time"].asInt64(); + Json::Value datas = jsonRoot["data"]; + OnReceivedSystemAction(conn, cmdId, cmd, datas); } while (0); - - delete [] buffer; - buffer = NULL; } #endif @@ -1480,9 +1603,6 @@ void stop(int signo) int i; vLog(LOG_ERROR, "cmg received exit signel(%d)\n", signo); -//#ifndef WIN32 -// if (signo != SIGPIPE) -//#endif { for (i = 0; i < PROCESSES_NUM; i++) { @@ -1497,16 +1617,13 @@ void stop(int signo) } if (zlog_inited) zlog_fini(); -#ifdef WIN32 - if (g_bSocketStarted) WSACleanup(); -#endif signo = 0; pthread_mutex_unlock(&mutex); exit(signo); } -void heart_beat(struct mosquitto* mosq, int status) +void heart_beat(noPollConn* conn, int status) { //发送心跳报文 Json::Value payload; @@ -1517,22 +1634,19 @@ void heart_beat(struct mosquitto* mosq, int status) { Json::Value jsonItem; Json::Value jsonValue; - for (int i = 0; i < (PROCESSES_NUM - 1); i++) - { //最后一个为系统设置的本地调试协议 - if (config.processes[i].state == TRUE) - { - jsonValue["linkIrn"] = (Json::Int64)GetProcessIRNByPid(i); + for (int i = 0; i < m_gLinkIDs.size(); i++) { + if (config.processes[i].state == TRUE) { + jsonValue["linkId"] = m_gLinkIDs.at(i); jsonValue["online"] = (config.processes[i].softdog >= PROCESS_WATCHDOG_TIME) ? false : true; jsonItem.append(jsonValue); } } - if (jsonItem.size() > 0) - { + if (jsonItem.size() > 0) { payload["links"] = jsonItem; } } - publish_sensor_data(mosq, "", "HEARTBEAT", publish_realdata_topic, payload); + publish_sensor_data(conn, "", "heartbeat", payload); } DWORD OnReadChannelRingBuff(char *rbuff, DWORD len) @@ -1549,7 +1663,7 @@ DWORD OnReadChannelRingBuff(char *rbuff, DWORD len) return len; } -BOOLEAN publishMonLinkLog(struct mosquitto* mosq) +BOOLEAN publishMonLinkLog(noPollConn* conn) { Json::Value jsonRoot; Json::Value jsonItem; @@ -1561,15 +1675,12 @@ BOOLEAN publishMonLinkLog(struct mosquitto* mosq) char disp[2048]; int disp_len = 0; int len = 0; - while (channelBuffer.load != channelBuffer.save) - { //读取一个字节判断该字节是否位t和r + while (channelBuffer.load != channelBuffer.save) { //读取一个字节判断该字节是否位t和r len = OnReadChannelRingBuff(&disp[disp_len], 1); - if (disp[disp_len] == 't' || disp[disp_len] == 'r') - { + if (disp[disp_len] == 't' || disp[disp_len] == 'r') { OnReadChannelRingBuff(str_len, 4); len = atoi(str_len); - if (len) - { + if (len) { disp_len = 0; if (disp[0] == 'r') disp[0] = 'R'; else if (disp[0] == 't') disp[0] = 'S'; @@ -1589,12 +1700,52 @@ BOOLEAN publishMonLinkLog(struct mosquitto* mosq) if (jsonItem.size() <= 0) return TRUE; - publish_sensor_data(mosq, "", "LINK_LOG", publish_topic, jsonRoot); + publish_sensor_data(conn, "", "LINK_LOG", jsonRoot); } - -bool publishRealData(struct mosquitto* mosq) +bool publishinitDeviceData(noPollConn* conn, int uid) { + if (uid < 0 || uid >= UNIT_NUM) return false; + + Json::Value root; + Json::Value values; + + int i; + int count = GetUnitYCCount(uid); + if (count) { + //Json::Value jsonValue; + for (i = 0; i < count; i++) { + values[(const char *)config.units[uid].ycs[i].name] = GetUnitYCReal(uid, i); + //values.append(jsonValue); + } + } + count = GetUnitYMCount(uid); + if (count) { + //Json::Value jsonValue; + for (i = 0; i < count; i++) { + values[(const char *)config.units[uid].yms[i].name] = GetUnitYMReal(uid, i); + //values.append(jsonValue); + } + } + count = GetUnitYXCount(uid); + if (count) { + //Json::Value jsonValue; + for (i = 0; i < count; i++) { + values[(const char *)config.units[uid].yxs[i].name] = GetUnitYX(uid, i); + //values.append(jsonValue); + } + } + if (values.size()) { + root["deviceId"] = static_units[uid].deviceId; + root["values"] = values; + return publish_sensor_data(conn, "", "initDeviceData", root); + } + return false; +} + +bool publishRealData(noPollConn* conn, int uid) +{ +#if 0 int i; int uid; int count = 0; @@ -1609,7 +1760,6 @@ bool publishRealData(struct mosquitto* mosq) for (int k = 0; k < m_total_units.size(); k++) { uid = m_total_units.at(k); - //vLog(LOG_DEBUG, "config.units[%d].value is %d.\n", uid, config.units[uid].value); if (config.units[uid].value == SPI_ON) continue; count = GetUnitYCCount(uid); if (count) @@ -1665,10 +1815,12 @@ bool publishRealData(struct mosquitto* mosq) if (!ycs.isNull()) jsonRoot["ycs"] = ycs; if (!yms.isNull()) jsonRoot["yms"] = yms; if (jsonRoot.isNull()) return true; - return publish_sensor_data(mosq, "", "REAL_DATA", publish_realdata_topic, jsonRoot); + return publish_sensor_data(conn, "", "REAL_DATA", jsonRoot); +#endif + return false; } -bool publishSOEData(struct mosquitto* mosq) +bool publishSOEData(noPollConn* conn) { int i, uid; @@ -1703,10 +1855,10 @@ bool publishSOEData(struct mosquitto* mosq) #endif jsonRoot["soe"] = jsonItem; - return publish_sensor_data(mosq, "", "SOE_DATA", publish_realdata_topic, jsonRoot); + return publish_sensor_data(conn, "", "SOE_DATA", jsonRoot); } -bool publishYXBWData(struct mosquitto* mosq) +bool publishYXBWData(noPollConn* conn) { int i, uid; @@ -1741,10 +1893,10 @@ bool publishYXBWData(struct mosquitto* mosq) pushTime *= 1000; jsonRoot["pushTime"] = (Json::Int64)pushTime; #endif - return publish_sensor_data(mosq, "", "REAL_DATA", publish_realdata_topic, jsonRoot); + return publish_sensor_data(conn, "", "stateData", jsonRoot); } -bool publishYCBWData(struct mosquitto* mosq) +bool publishYCBWData(noPollConn* conn) { int i; int uid; @@ -1776,9 +1928,63 @@ bool publishYCBWData(struct mosquitto* mosq) if (jsonItem.size() <= 0) return FALSE; - return publish_sensor_data(mosq, "", "REAL_DATA", publish_realdata_topic, jsonRoot); + return publish_sensor_data(conn, "", "analogData", jsonRoot); } +void freeMem(void) +{ + if (database.yxs) { + delete [] database.yxs; + database.yxs = NULL; + } + if (database.ycs) { + delete [] database.ycs; + database.ycs = NULL; + } + if (database.yms) { + delete [] database.yms; + database.yms = NULL; + } + if (database.yks) { + delete [] database.yks; + database.yks = NULL; + } + if (database.yts) { + delete [] database.yts; + database.yts = NULL; + } + for (int i = 0; i < UNIT_NUM; i++) { + if (config.units[i].yxs) { + delete [] config.units[i].yxs; + config.units[i].yxs = NULL; + } + if (config.units[i].ycs) { + delete [] config.units[i].ycs; + config.units[i].ycs = NULL; + } + if (config.units[i].yms) { + delete [] config.units[i].yms; + config.units[i].yms = NULL; + } + if (config.units[i].yks) { + delete [] config.units[i].yks; + config.units[i].yks = NULL; + } + if (config.units[i].yts) { + delete [] config.units[i].yts; + config.units[i].yts = NULL; + } + } + if (msgBuffer.buf) { + delete [] msgBuffer.buf; + msgBuffer.buf = NULL; + } + + if (channelBuffer.buf) { + delete [] channelBuffer.buf; + channelBuffer.buf = NULL; + } +} int main(int argc, char** argv) { @@ -1787,38 +1993,26 @@ int main(int argc, char** argv) BOOLEAN enable_auto_platform = TRUE; char issmqtt_config_pathName[256] = {'\0'}; - char host[256] = {"192.168.129.31"}; - int port = 1883; - char clientid[128] = {"iss_cmg"}; - char username[128] = {"usp3"}; - char password[128] = {"zaq12WSX"}; - char topic[128] = {"USP/v99"}; + char host[256] = {"127.0.0.1"}; + int port = 7790; + char nodeId[128] = {"runyang_dn"}; + char version[128] = {"v1.0"}; - uuid_t uuid; - char cid[128]; - uuid_generate_time(uuid); - uuid_unparse_upper(uuid, cid); - std::vector tokens = split(std::string(cid), '-'); - std::string iv = tokens.back(); - snprintf(clientid, sizeof(clientid), "iss_cmg_%s", iv.c_str()); //获取可执行文件所在目录 const char default_config[] = "[global]\n" "default format = \"%d.%ms [%-6V] - %m%n\"\n" "[rules]\n" "my_cat.* >stderr\n"; int rc = dzlog_init("./application.conf", "my_cat"); - if (rc < 0) - { + if (rc < 0) { rc = dzlog_init(default_config, "my_cat"); - if (rc < 0) zlog_inited = FALSE; - else - { + if (rc < 0) { + zlog_inited = FALSE; + } else { fprintf(stderr, "dzlog_init(\"./application.conf\", \"my_cat\") failed, load default config.\n"); zlog_inited = TRUE; } - } - else - { + } else { zlog_inited = TRUE; } @@ -1827,96 +2021,55 @@ int main(int argc, char** argv) static struct option long_options[] = { { "directory", required_argument, NULL, 'd' }, - { "file", required_argument, NULL, 'f' }, { "host", required_argument, NULL, 'h'}, { "port", required_argument, NULL, 'p'}, - { "key", required_argument, NULL, 'k'}, - { "username", required_argument, NULL, 'u' }, - { "password", required_argument, NULL, 'P' }, - { "clientid", required_argument, NULL, 'i'}, - { "topic", required_argument, NULL, 't'}, + { "nodeId", required_argument, NULL, 'n' }, + { "version", required_argument, NULL, 'v'}, { "help", no_argument, NULL, 'H' }, { NULL, 0, NULL, 0 } }; while (1) { int opt_index = 0; - c = getopt_long(argc, argv, "d:f:h:p:i:u:P:t:k:H", long_options, &opt_index); + c = getopt_long(argc, argv, "d:h:p:n:v:H", long_options, &opt_index); if (c == -1) break; switch (c) { case 'd': snprintf(configpath, sizeof(configpath), "%s", optarg); break; - case 'f': - snprintf(configfile, sizeof(configfile), "%s", optarg); - break; case 'h': snprintf(host, sizeof(host), "%s", optarg); break; case 'p': port = strtol(optarg, NULL, 10); break; - case 'i': - snprintf(clientid, sizeof(clientid), "%s", optarg); + case 'n': + snprintf(nodeId, sizeof(nodeId), "%s", optarg); break; - case 'u': - snprintf(username, sizeof(username), "%s", optarg); - break; - case 'P': - snprintf(password, sizeof(password), "%s", optarg); - break; - case 't': - snprintf(topic, sizeof(topic), "%s", optarg); - break; - case 'k': - snprintf((char *)key, sizeof(key), "%s", optarg); + case 'v': + snprintf(version, sizeof(version), "%s", optarg); break; case '?': case 'H': vLog(LOG_DEBUG, "Usage: %s [OPTION]... \n" " -d, --directory : set configuration file directory. Default is /data/config/rtufiles\n" - " -f, --file : set configuration file name. Default is config.db\n" - " -h, --host : mqtt ip. Default is localhost\n" - " -p, --port : mqtt port, default is 1883\n" - " -u, --username : mqtt user name, default is usp3\n" - " -P, --password : mqtt user password, default is zaq12WSX\n" - " -i, --clientid : mqtt client id, default is dataAcquisition_xx\n" - " -t, --topic : mqtt base topic, default is USP/v99\n" + " -h, --host : ws ip. Default is localhost\n" + " -p, --port : ws port, default is 1883\n" + " -n, --nodeid : ws nodeId, default is runyang_dn\n" + " -v, --version : sw version, default is v1.0\n" " -H, --help : print this usage\n", argv[0]); return (EXIT_SUCCESS); default: vLog(LOG_DEBUG, "?? getopt returned character code 0%c ??\n", c); } } - -#ifdef WIN32 - int err; - WORD wVersionRequested; - WSADATA wsaData; - wVersionRequested = MAKEWORD(2, 2); - err = WSAStartup(wVersionRequested, &wsaData); - if (err != 0) - { - vLog(LOG_ERROR, "network init error(%d,%s)\n", errno, strerror(errno)); - return FALSE; - } - if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2) - { - WSACleanup(); - vLog(LOG_ERROR, "network version error.\r\n"); - return FALSE; - } - g_bSocketStarted = TRUE; - - signal(SIGBREAK, stop); -#else + signal(SIGHUP, stop); signal(SIGQUIT, stop); signal(SIGKILL, stop); signal(SIGPIPE, stop); signal(SIGSTOP, stop); -#endif //WIN32 signal(SIGINT, stop); signal(SIGILL, stop); signal(SIGSEGV, stop); @@ -1924,21 +2077,19 @@ int main(int argc, char** argv) signal(SIGABRT, stop); signal(SIGFPE, stop); - struct mosquitto *mosq = NULL; - mosquitto_lib_init(); - - do - { +#if 1 + noPollCtx *ctx = nopoll_ctx_new(); + noPollConn *conn; +#endif + do { int status = 2; //0 - 离线, 1 - 在线, 2 - 未配置 g_dataAcquisitionReload = false; vLog(LOG_DEBUG, "system initialize...\n"); char szHostCode[128]; memset(szHostCode, '\0', sizeof(szHostCode)); - if (!initialize_system(FALSE, FALSE, NULL, szHostCode)) - { + if (!initialize_system(FALSE, FALSE, NULL, szHostCode)) { vLog(LOG_ERROR, "system initialize error.\n"); - if (enable_auto_platform) - { //主动和平台链接 + if (enable_auto_platform) { //主动和平台链接 char szHostName[32] = ""; gethostname(szHostName, sizeof(szHostName)); nodes.m_node[0].m_netnode_no = 0; @@ -1946,103 +2097,67 @@ int main(int argc, char** argv) nodes.m_node[0].m_target_addr = INADDR_LOOPBACK; nodes.m_node[0].m_target_port = 15000; snprintf(nodes.m_node[0].m_machine_name, sizeof(nodes.m_node[0].m_machine_name), "%s", szHostName); - nodes.m_node[0].irn = strtoll(irn, NULL, 10); + nodes.m_node[0].irn = 0; //没有配置 vLog(LOG_DEBUG, "nodes.m_node[0].m_machine_name is: %s\n", nodes.m_node[0].m_machine_name); } - } - else - { - snprintf(clientid, sizeof(clientid), "%s", szHostCode); - vLog(LOG_DEBUG, "configed client id is: %s.\n", clientid); - if (enable_auto_platform) - { //增加协议和单元配置的数量统计 - m_total_processes.clear(); - m_total_units.clear(); - ykirn2uid_map.clear(); - ytirn2uid_map.clear(); - for (int i = 0; i < (PROCESSES_NUM - 1); i++) - { - if (config.processes[i].state) - { - m_total_processes.push_back(i); - for (int j = 0; j < PROCESS_UNIT_NUM; j++) - { - int uid = config.processes[i].units[j]; - if (uid >= 0) - { - //if (config.units[uid].type == MASTER_UNIT) - { //平台下发的配置,装置都为主设备 - m_total_units.push_back(uid); - for (int k = 0; k < config.units[uid].ykcount; k++) - { - QLONG key = config.units[uid].yks[k].irn; - if (ykirn2uid_map.find(key) == ykirn2uid_map.end()) - { - ykirn2uid_map.insert(std::unordered_map::value_type(key, uid)); - } - else - { - vLog(LOG_DEBUG, "at least two yk points have the same irn.\n"); - } - } - for (int k = 0; k < config.units[uid].ytcount; k++) - { - QLONG key = config.units[uid].yts[k].irn; - if (ytirn2uid_map.find(key) == ytirn2uid_map.end()) - { - ytirn2uid_map.insert(std::unordered_map::value_type(key, uid)); - } - else - { - vLog(LOG_DEBUG, "at least two yt points have the same irn.\n"); - } - } - } - } + } else { + //snprintf(nodeId, sizeof(nodeId), "%s", szHostCode); + vLog(LOG_DEBUG, "configed node id is: %s.\n", nodeId); + if (enable_auto_platform) { //增加协议和单元配置的数量统计 +// m_total_processes.clear(); + //m_total_units.clear(); + for (int i = 0; i < UNIT_NUM; i++) { + if (config.units[i].state != TRUE) continue; + std::string unit_id = static_units[i].deviceId; + name2servicemap name2service_map; + for (int j = 0; j < config.units[i].ykcount; j++) { + std::string name = config.units[i].yks[j].name; + if (name2service_map.find(name) == name2service_map.end()) { + struct_service_item item; + item.type = CMD_CONTROL_OPERATION; + item.uid = i; + item.point = j; + item.order = config.units[i].yks[j].order; + name2service_map.insert(name2servicemap::value_type(name, item)); + } else { + vLog(LOG_WARN, "同一个设备不能有两个相同的遥控名<%s>,请检查。\n", name.c_str()); } } + for (int j = 0; j < config.units[i].ytcount; j++) { + std::string name = config.units[i].yts[j].name; + if (name2service_map.find(name) == name2service_map.end()) { + struct_service_item item; + item.type = CMD_CONTROL_SETTING; + item.uid = i; + item.point = j; + item.order = config.units[i].yts[j].order; + name2service_map.insert(name2servicemap::value_type(name, item)); + } else { + vLog(LOG_WARN, "同一个设备不能有两个相同的遥调名<%s>,请检查。\n", name.c_str()); + } + } + if (unitname2service_map.find(unit_id) == unitname2service_map.end()) { + unitname2service_map.insert(unitname2servicemap::value_type(unit_id, name2service_map)); + } else { + vLog(LOG_WARN, "系统配置了两个相同的设备ID<%s>,请检查。\n", unit_id.c_str()); + } } - vLog(LOG_DEBUG, "here have %d processes, and %d units.\n", m_total_processes.size(), m_total_units.size()); status = 1; } } - if (enable_auto_platform) - { - if (irn[0] == '\0') - { - vLog(LOG_ERROR, "node irn cann't be empty.\n"); - return (EXIT_FAILURE); - } - - //创建mqtt服务 - mosq = mosquitto_new(clientid, true, issmqtt_config_pathName); - if (mosq == NULL) - { - vLog(LOG_ERROR, "Error: Out of memory.\n"); - break; - } - - /* Configure callbacks. This should be done before connecting ideally. */ - mosquitto_username_pw_set(mosq, username, password); - mosquitto_connect_callback_set(mosq, on_connect); - mosquitto_subscribe_callback_set(mosq, on_subscribe); - mosquitto_message_callback_set(mosq, on_message); - - rc = mosquitto_connect(mosq, host, port, 60); - if (rc != MOSQ_ERR_SUCCESS) - { - vLog(LOG_ERROR, "mosquitto_connect Error: %s\n", mosquitto_strerror(rc)); - } - - /* Run the network loop in a background thread, this call returns quickly. */ - rc = mosquitto_loop_start(mosq); - if (rc != MOSQ_ERR_SUCCESS) - { - mosquitto_destroy(mosq); - vLog(LOG_ERROR, "mosquitto_loop_start Error: %s\n", mosquitto_strerror(rc)); - break; + if (enable_auto_platform) { +#if 1 + //创建WS链接 + if (ctx != NULL) { + nopoll_conn_connect_timeout(ctx, 200); + char url[512]; + char cPort[64]; + snprintf(url, sizeof(url), "/node/%s/%s/", nodeId, version); + snprintf(cPort, sizeof(cPort), "%d", port); + conn = nopoll_conn_new(ctx, host, cPort, NULL, url, NULL, NULL); } +#endif } unsigned int m_runCount = 0; @@ -2050,94 +2165,164 @@ int main(int argc, char** argv) unsigned int critical = 0; CChangeMaster masterTci; - if (!masterTci.Init()) - { + if (!masterTci.Init()) { break; } masterTci.MasterTciFirstRun(); time_t last_sec = 0; - while (TRUE) - { + while (TRUE) { m_runCount++; masterTci.MasterSend(); usleep(MASTER_TCI_SEND_INTERVAL); - if (MASTER_TCI == CChangeMaster::m_tcitype) - { - if (m_runCount > count) - { + if (MASTER_TCI == CChangeMaster::m_tcitype) { + if (m_runCount > count) { count = m_runCount; critical = 0; - } - else - { + } else { critical++; - if (critical > 15) - { + if (critical > 15) { break; } } - if (enable_auto_platform) - { + if (enable_auto_platform) { +#if 1 + nopoll_bool isOk = nopoll_conn_is_ok(conn); + if (nopoll_conn_is_ok(conn) != nopoll_true) { + nopoll_conn_close(conn); + char url[512]; + char cPort[64]; + snprintf(url, sizeof(url), "/node/%s/%s/", nodeId, version); + snprintf(cPort, sizeof(cPort), "%d", port); + conn = nopoll_conn_new(ctx, host, cPort, NULL, url, NULL, NULL); + continue; + } else { + if (!nopoll_conn_wait_until_connection_ready(conn, 5)) { + // some error handling + nopoll_conn_close(conn); + char url[512]; + char cPort[64]; + snprintf(url, sizeof(url), "/node/%s/%s/", nodeId, version); + snprintf(cPort, sizeof(cPort), "%d", port); + conn = nopoll_conn_new(ctx, host, cPort, NULL, url, NULL, NULL); + continue; + } + } +#if 0 + int retries = 0; + char recv_buffer[4096]; + int bytes_read = nopoll_conn_read(conn, (char *)recv_buffer, sizeof(recv_buffer), nopoll_true, 1000); + if (bytes_read < 0) { + vLog(LOG_ERROR, "expected to find bytes from the connection but found: %d\n", bytes_read); + /* close the connection */ + if (conn) { + nopoll_conn_close(conn); + continue; + } + } else if (bytes_read > 0) { + recv_buffer[bytes_read] = '\0'; + vLog(LOG_DEBUG, "\nrecv: %s \nlength = %d\n", recv_buffer, bytes_read); + } /* end if */ +#else +#if 1 + int msg_count = 0; + noPollMsg *msg[40]; + while ((msg[msg_count] = nopoll_conn_get_msg(conn)) != NULL) { + vLog(LOG_DEBUG, "recv length = %d, %d\n", nopoll_msg_get_payload_size(msg[msg_count]), nopoll_msg_is_final(msg[msg_count])); + if (nopoll_msg_is_final(msg[msg_count])) { + msg_count++; + break; + } else { + msg_count++; + } + } + if (msg_count > 0) { + int buffer_len; + BYTE *buffer = websocket_msg_join(msg, msg_count, &buffer_len); + if (buffer) { + on_message(conn, (const char *)buffer, buffer_len); + delete [] buffer; + buffer = NULL; + } + } + for (int i = 0; i < msg_count; i++) { + nopoll_msg_unref(msg[i]); + } +#else + int bytes_read; + char recv_buffer[2048]; + do { + bytes_read = nopoll_conn_read(conn, (char *)recv_buffer, sizeof(recv_buffer), nopoll_false, 0); + if (bytes_read > 0) { + recv_buffer[bytes_read] = '\0'; + vLog(LOG_DEBUG, "\nrecv: %s \nlength = %d\n", recv_buffer, bytes_read); + } + if (nopoll_conn_read_pending(conn) > 0) { //接收没有结束 + vLog(LOG_DEBUG, "nopoll_conn_read_pending: %d.\n", nopoll_conn_read_pending(conn)); + } + } while (nopoll_conn_read_pending(conn) > 0); +#endif +#endif BOOLEAN sec_changed = FALSE; - if (last_sec != (time_t)system32.timers) - { + if (last_sec != (time_t)system32.timers) { last_sec = system32.timers; sec_changed = TRUE; } - MakeYKFrame(mosq); - MakeYTFrame(mosq); - - if (sec_changed) - { - if (struMonLinkLog.m_iStartTime > 0) - { - if (struMonLinkLog.m_iStartTime > 0 && system32.timers <= (struMonLinkLog.m_iStartTime + 30)) //默认为30s,调试设置为2s - { //发送报文 - publishMonLinkLog(mosq); - } - else - { + if (sec_changed) { + if (struMonLinkLog.m_iStartTime > 0) { + if (struMonLinkLog.m_iStartTime > 0 && system32.timers <= (struMonLinkLog.m_iStartTime + 30)) { //默认为30s,调试设置为2s + //发送报文 + publishMonLinkLog(conn); + } else { channelBuffer.enabled = FALSE; channelBuffer.mon_port = -1; struMonLinkLog.m_iLinkIrn = -1; struMonLinkLog.m_iStartTime = 0; } } - publishYXBWData(mosq); - publishSOEData(mosq); - if ((last_sec % 60) == 0) - { //更新数据 - publishRealData(mosq); - } - if ((last_sec % 20) == 0) - { - heart_beat(mosq, status); + publishYXBWData(conn); + publishSOEData(conn); + if ((last_sec % 20) == 0) { + heart_beat(conn, status); } } + for (int i = 0; i < UNIT_NUM; i++) { + if (config.units[i].state != TRUE) continue; + MakeYKFrame(conn, i); + MakeYTFrame(conn, i); + if (sec_changed) { + if ((last_sec % 60) == 0) { //更新数据 + //publishRealData(conn, i); + publishinitDeviceData(conn, i); + } + } + } +#endif } } - if (g_dataAcquisitionReload) - { + if (g_dataAcquisitionReload) { break; } } - if (critical > 15) - { + if (critical > 15) { vLog(LOG_ERROR, "unknow error.\n"); } masterTci.ChangeDelete(); + + destroy_thread(); + freeMem(); + vLog(LOG_DEBUG, "App: dataAcquisition start reload.\n"); } while(0); pthread_mutex_destroy(&mutex); - mosquitto_lib_cleanup(); - if (zlog_inited) zlog_fini(); -#ifdef WIN32 - if (g_bSocketStarted) WSACleanup(); -#endif //WIN32 + if (zlog_inited) zlog_fini(); +#if 1 + nopoll_conn_close(conn); + nopoll_ctx_unref(ctx); +#endif vLog(LOG_DEBUG, "system stop okay.\n"); return EXIT_SUCCESS; } diff --git a/das-dn/comm/public.cpp b/das-dn/comm/public.cpp index da7f58c7..d5f78555 100644 --- a/das-dn/comm/public.cpp +++ b/das-dn/comm/public.cpp @@ -23,11 +23,6 @@ #include "json.h" -#define ADDR_TYPE_NORMAL 0 -#define ADDR_TYPE_HEX 1 -#define ADDR_TYPE_IPV4 2 -#define ADDR_TYPE_IPV4_FACNO 3 - #define PARAMTER_TYPE_MODBUS 1 #define PARAMTER_TYPE_104 2 #define PARAMTER_TYPE_9712 3 @@ -1040,6 +1035,7 @@ BOOLEAN WriteDatabaseCFG(void) return FALSE; } +#if 0 //此处定义一个map来放置irn和协议的order的关系 typedef std::map mapIRN2Order; BOOLEAN processUartParam(std::string jsonvalue, int ord) @@ -2101,6 +2097,7 @@ BOOLEAN ReadConfigFromSQLite(BOOLEAN enable_auto_platform, const char* issmqtt_c return TRUE; } +#endif void dumpLogs(void) { @@ -2354,9 +2351,9 @@ BOOLEAN initialize_system(BOOLEAN usesqlite, BOOLEAN enable_auto_platform, const if (usesqlite) { vLog(LOG_DEBUG, "read configuration from sqlite3 db file.\n"); - BOOLEAN ret; - ret = ReadConfigFromSQLite(enable_auto_platform, pathName, codeName); + BOOLEAN ret = FALSE; #if 0 + ret = ReadConfigFromSQLite(enable_auto_platform, pathName, codeName); if (ret) { WriteSystemCFG(); @@ -2377,6 +2374,7 @@ BOOLEAN initialize_system(BOOLEAN usesqlite, BOOLEAN enable_auto_platform, const { vLog(LOG_ERROR, "Fail load system config!\n"); alarm(); + return FALSE; } vLog(LOG_DEBUG, "Loading node config...\n"); @@ -2384,6 +2382,7 @@ BOOLEAN initialize_system(BOOLEAN usesqlite, BOOLEAN enable_auto_platform, const { vLog(LOG_ERROR, "Fail load node config!\n"); alarm(); + return FALSE; } vLog(LOG_DEBUG, "Loading database config...\n"); @@ -2391,6 +2390,7 @@ BOOLEAN initialize_system(BOOLEAN usesqlite, BOOLEAN enable_auto_platform, const { vLog(LOG_ERROR, "Fail load database config!\n"); alarm(); + return FALSE; } vLog(LOG_DEBUG, "Loading hardware config...\n"); @@ -2398,6 +2398,7 @@ BOOLEAN initialize_system(BOOLEAN usesqlite, BOOLEAN enable_auto_platform, const { vLog(LOG_ERROR, "Fail load hardware config!\n"); alarm(); + return FALSE; } vLog(LOG_DEBUG, "Loading process config...\n"); @@ -2405,6 +2406,7 @@ BOOLEAN initialize_system(BOOLEAN usesqlite, BOOLEAN enable_auto_platform, const { vLog(LOG_ERROR, "Fail load processes config!\n"); alarm(); + return FALSE; } vLog(LOG_DEBUG, "Loading unit config...\n"); @@ -2412,6 +2414,7 @@ BOOLEAN initialize_system(BOOLEAN usesqlite, BOOLEAN enable_auto_platform, const { vLog(LOG_ERROR, "Fail load unit config!\n"); alarm(); + return FALSE; } vLog(LOG_DEBUG, "Loading static unit config...\n"); @@ -2419,6 +2422,7 @@ BOOLEAN initialize_system(BOOLEAN usesqlite, BOOLEAN enable_auto_platform, const { vLog(LOG_ERROR, "Fail load static unit config!\n"); alarm(); + return FALSE; } } return TRUE; diff --git a/das-dn/comm/soe.cpp b/das-dn/comm/soe.cpp index 140403c1..c1a7ac24 100644 --- a/das-dn/comm/soe.cpp +++ b/das-dn/comm/soe.cpp @@ -71,7 +71,7 @@ void CSOE::DumpSOE(void) static_unit = fopen(pathName, "rb"); //打开yx.sta静态摁键 FILE* static_yx; - struUnitYXStatic yx; + struYXStatic yx; snprintf(pathName, sizeof(pathName), "%s/%s", configpath, FILE_DATABASE_YX_STATIC); static_yx = fopen(pathName, "rb"); @@ -85,9 +85,9 @@ void CSOE::DumpSOE(void) unit.model[15] = '\0'; } memset(&yx, 0, sizeof(yx)); - if (static_yx != NULL && fseek(static_yx, sizeof(struUnitYXStatic)*m_soes[m_load].order, SEEK_SET) == 0) + if (static_yx != NULL && fseek(static_yx, sizeof(struYXStatic)*m_soes[m_load].order, SEEK_SET) == 0) { - fread(&yx, sizeof(struUnitYXStatic), 1, static_yx); + fread(&yx, sizeof(struYXStatic), 1, static_yx); yx.name[63] = '\0'; } fprintf(pf, "%04d/%02d/%02d %02d:%02d:%02d.%03d;%d;%s;%d;%s;%s;%d;%s\n", \ diff --git a/das-dn/comm/ycbw.cpp b/das-dn/comm/ycbw.cpp index 2df67cbc..62a815cd 100644 --- a/das-dn/comm/ycbw.cpp +++ b/das-dn/comm/ycbw.cpp @@ -74,7 +74,7 @@ void CYCBW::DumpYCBW(void) static_unit = fopen(pathName, "rb"); FILE* static_yc; - struUnitYCStatic yc; + struYCStatic yc; snprintf(pathName, sizeof(pathName), "%s/%s", configpath, FILE_DATABASE_YC_STATIC); static_yc = fopen(pathName, "rb"); @@ -88,9 +88,9 @@ void CYCBW::DumpYCBW(void) unit.model[15] = '\0'; } memset(&yc, 0, sizeof(yc)); - if (static_yc != NULL && fseek(static_yc, sizeof(struUnitYCStatic)*m_ycbws[m_load].order, SEEK_SET) == 0) + if (static_yc != NULL && fseek(static_yc, sizeof(struYCStatic)*m_ycbws[m_load].order, SEEK_SET) == 0) { - fread(&yc, sizeof(struUnitYCStatic), 1, static_yc); + fread(&yc, sizeof(struYCStatic), 1, static_yc); yc.name[63] = '\0'; } fprintf(pf, "%s;%d;%d;%d;%s;%s;%d;%s\n", diff --git a/das-dn/comm/yxbw.cpp b/das-dn/comm/yxbw.cpp index 5e7a2b9c..27f4bb2f 100644 --- a/das-dn/comm/yxbw.cpp +++ b/das-dn/comm/yxbw.cpp @@ -73,7 +73,7 @@ void CYXBW::DumpYXBW(void) static_unit = fopen(pathName, "rb"); FILE* static_yx; - struUnitYXStatic yx; + struYXStatic yx; snprintf(pathName, sizeof(pathName), "%s/%s", configpath, FILE_DATABASE_YX_STATIC); static_yx = fopen(pathName, "rb"); @@ -87,9 +87,9 @@ void CYXBW::DumpYXBW(void) unit.model[15] = '\0'; } memset(&yx, 0, sizeof(yx)); - if (static_yx != NULL && fseek(static_yx, sizeof(struUnitYXStatic)*m_yxbws[m_load].order, SEEK_SET) == 0) + if (static_yx != NULL && fseek(static_yx, sizeof(struYXStatic)*m_yxbws[m_load].order, SEEK_SET) == 0) { - fread(&yx, sizeof(struUnitYXStatic), 1, static_yx); + fread(&yx, sizeof(struYXStatic), 1, static_yx); yx.name[63] = '\0'; } fprintf(pf, "%s;%d;%s;%d;%s;%s;%d;%s\n", diff --git a/das-dn/inc/public.h b/das-dn/inc/public.h index 4ada21ea..25fcf22f 100644 --- a/das-dn/inc/public.h +++ b/das-dn/inc/public.h @@ -14,52 +14,28 @@ const char VERSION_NO[] = "1.3.0001"; #include #include #include - -#ifdef WIN32 - #include "pthread.h" - #include - #include - - typedef char optval_t; - typedef int socklen_t; - - #define msleep(msec) Sleep(msec) - #define sleep(sec) Sleep(sec * 1000) - - void usleep(unsigned long usec); - int close(SOCKET s); - int ioctl(SOCKET s, long cmd, u_long FAR* argp); - int gettimeofday(struct timeval *tp, struct timezone *tz); - int settimeofday(struct timeval *tp, struct timezone *tz); - struct tm *localtime_r(long *close, struct tm *res); -#else - #include - #include - #include - #include - #include - #include - #include - #include - #include - #include - #include - #include - #include - #include - #include - #include - #include - #include - #include - #include -#if 0 - #include "threadpool.h" - #define THREADPOOL_MAX_NUM 68 -#endif - typedef int optval_t; - typedef int SOCKET; -#endif //WIN32 +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +typedef int optval_t; +typedef int SOCKET; #include #include #include @@ -212,30 +188,36 @@ const char VERSION_NO[] = "1.3.0001"; #define PROTOCOL_HOST_MODBUS_TCP 16 //MODBUS tcp主 #define PROTOCOL_SUB_MODBUS_TCP 17 //MODBUS RTU over tcp从 #define PROTOCOL_HOST_MODBUS_RTU_TCP 18 //MODBUS RTU over tcp主 -#define PROTOCOL_HOST_MODBUS_RTU_NCY 19 //NCY-6100系列微机保护装置MODBUS RTU主 -#define PROTOCOL_HOST_MODBUS_RTU_LIYEZG 20 //浙江立业电器MODBUS RTU主 +//#define PROTOCOL_HOST_MODBUS_RTU_NCY 19 //NCY-6100系列微机保护装置MODBUS RTU主 +//#define PROTOCOL_HOST_MODBUS_RTU_LIYEZG 20 //浙江立业电器MODBUS RTU主 #define PROTOCOL_RTU_STATE 21 //主控状态 #define PROTOCOL_LOCAL_DEBUG 22 //本地调试 -#define PROTOCOL_SUB_XT9712 23 //XT9712从 -#define PROTOCOL_HISDATA 24 //历史数据协议 +//#define PROTOCOL_SUB_XT9712 23 //XT9712从 +//#define PROTOCOL_HISDATA 24 //历史数据协议 #define PROTOCOL_BF_FTP 25 //倍福FTP数据协议 -#define PROTOCOL_HOST_NSA 27 //NSA主 +//#define PROTOCOL_HOST_NSA 27 //NSA主 #define PROTOCOL_HOST_DLT645V2007 30 //dlt645v2007协议 #define PROTOCOL_HOST_DLT645V2007_HR 31 //中电华瑞dlt645v2007协议 #define PROTOCOL_HOST_DLT645V2007_OVERTCP 32 //中电华瑞dlt645v2007 over tcp协议 #define PROTOCOL_CALC 37 //计算 -#define PROTOCOL_DLT_1867 40 //需求响应协议dlt1867-2008 +//#define PROTOCOL_DLT_1867 40 //需求响应协议dlt1867-2008 #define PROTOCOL_SUB_GDW104 47 //国网104从 -#define PROTOCOL_AGC 50 //功率自动控制app -#define PROTOCOL_AVC 51 //电压自动控制app +//#define PROTOCOL_AGC 50 //功率自动控制app +//#define PROTOCOL_AVC 51 //电压自动控制app #define PROTOCOL_HW_MQTT 72 //华为物联平台ROMA -#define PROTOCOL_ISS_MQTT 73 //软通动力mqtt -#define PROTOCOL_LM_MQTT 74 //罗米mqtt -#define PROTOCOL_GRPC_PUBLISH 76 //gRPC上传发布 -#define PROTOCOL_GRPC_SUBSCRIBE 77 //gRPC下载订阅 +//#define PROTOCOL_ISS_MQTT 73 //软通动力mqtt +//#define PROTOCOL_LM_MQTT 74 //罗米mqtt +//#define PROTOCOL_GRPC_PUBLISH 76 //gRPC上传发布 +//#define PROTOCOL_GRPC_SUBSCRIBE 77 //gRPC下载订阅 #define PROTOCOL_OPCUA 78 //opcua协议 -#define PROTOCOL_HOST_MODBUS_RTU_RDS100AFT 80 //珠海瑞捷电气股份有限公司RD系列保护装置MODBUS RTU主 -#define PROTOCOL_HOST_MODBUS_RTU_APF 81 //江苏沃海电气有限公司APF(SVG)MODBUS RTU主 +//#define PROTOCOL_HOST_MODBUS_RTU_RDS100AFT 80 //珠海瑞捷电气股份有限公司RD系列保护装置MODBUS RTU主 +//#define PROTOCOL_HOST_MODBUS_RTU_APF 81 //江苏沃海电气有限公司APF(SVG)MODBUS RTU主 + + +#define ADDR_TYPE_NORMAL 0 +#define ADDR_TYPE_HEX 1 +#define ADDR_TYPE_IPV4 2 +#define ADDR_TYPE_IPV4_FACNO 3 #define YKS_IDLE 0 @@ -858,6 +840,7 @@ typedef struct typedef struct { + char name[(MAX_NAME_SIZE << 2)]; short order; BYTE value; BYTE qds; //品质描述 @@ -865,18 +848,20 @@ typedef struct BYTE m_param[MAX_UNIT_POINT_PARAM_SIZE]; DWORD update_time; QLONG irn; + //是否取反,default: 0 + BOOLEAN invert; } struUnitYX; +#if 0 typedef struct { char name[(MAX_NAME_SIZE << 2)]; } struUnitYXStatic; - -#if 0 -#define EXTREME_SAMPLE_NUM 32 #endif + typedef struct { + char name[(MAX_NAME_SIZE << 2)]; short order; short factor; //比例系数 LONG value; //遥测值 @@ -888,12 +873,31 @@ typedef struct BYTE m_param[MAX_UNIT_POINT_PARAM_SIZE]; DWORD update_time; QLONG irn; + + //上界 + float upBound; + //下界 + float lowBound; + //限值1, default: 0 + BOOLEAN limit1Enable; + //限值1下限 + float limit1Low; + //限值1上限 + float limit1High; + //限值2, default: 0 + BOOLEAN limit2Enable; + //限值2上限 + float limit2High; + //限值2下限 + float limit2Low; } struUnitYC; +#if 0 typedef struct { char name[(MAX_NAME_SIZE << 2)]; } struUnitYCStatic; +#endif typedef struct { @@ -908,6 +912,7 @@ typedef struct typedef struct { + char name[(MAX_NAME_SIZE << 2)]; short order; DWORD value; float coef; @@ -917,13 +922,16 @@ typedef struct QLONG irn; } struUnitYM; +#if 0 typedef struct { char name[(MAX_NAME_SIZE << 2)]; } struUnitYMStatic; +#endif typedef struct { + char name[(MAX_NAME_SIZE << 2)]; short order; BOOLEAN selected; BYTE m_param[MAX_UNIT_POINT_PARAM_SIZE]; @@ -937,16 +945,19 @@ typedef struct typedef struct { + char name[(MAX_NAME_SIZE << 2)]; short order; BOOLEAN selected; BYTE m_param[MAX_UNIT_POINT_PARAM_SIZE]; QLONG irn; } struUnitYT; +#if 0 typedef struct { char name[(MAX_NAME_SIZE << 2)]; } struUnitYTStatic; +#endif typedef struct { diff --git a/das-dn/third_party/json/jsoncpp.cpp b/das-dn/third_party/json/jsoncpp.cpp index 3390e79c..6954d853 100644 --- a/das-dn/third_party/json/jsoncpp.cpp +++ b/das-dn/third_party/json/jsoncpp.cpp @@ -301,6 +301,8 @@ bool Reader::parse(const char* beginDoc, const char* endDoc, Value& root, bool collectComments) { + + fprintf(stderr, "here.\n"); if (!features_.allowComments_) { collectComments = false; }