/* * File: main.cpp * Author: zhouhuang * * Created on October 10, 2015, 3:17 PM */ #include #include //#include "public.h" #ifdef WIN32 #include "getopt.h" BOOLEAN g_bSocketStarted = FALSE; #else #include #endif #include "changemaster.h" #include "process.h" #if 1 #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "soe.h" #include "yklog.h" #include "ytlog.h" char publish_topic[512] = {'\0'}; char publish_realdata_topic[512] = {'\0'}; char subscribe_topic[512] = {'\0'}; bool g_dataAcquisitionReload = false; std::string g_traceId; #define AES_BLOCK_SIZE 16 //AES密钥 unsigned char key[17] = {"1234567890123456"}; LONG m_soeload = 0; LONG m_yxbwload = 0; LONG m_ycbwload = 0; std::vector m_total_processes; std::vector m_total_units; std::unordered_map ykirn2uid_map; std::unordered_map ytirn2uid_map; struct { Json::Int64 m_iLinkIrn; time_t m_iStartTime; } struMonLinkLog; std::vector split(const std::string &s, char delimiter) { std::vector tokens; std::istringstream tokenStream(s); std::string token; while (std::getline(tokenStream, token, delimiter)) { tokens.push_back(token); } return tokens; } /* * 一次输入所有待加密数据,并使用AES CBC模式进行加密 * * 输入: * input:待加密数据 * input_length:待加密数据的长度 * encrypted_data:加密后的密文存放位置,注意此指针指向的区域有效长度计算如下:使用PKCS#7填充的output长度计算应为 decrypted_data_length = ((input_length/16)+1)*16 * 返回值:加密后密文的长度,若为负值,则存在错误。 */ int encrypt_data_by_aes_cbc(unsigned char *input, int input_length, unsigned char* iv, unsigned char *encrypted_data) { mbedtls_aes_context aes_context; int result = 0; int padding = 0; int real_input_length = 0; unsigned char padding_code; unsigned char *input_tmp; unsigned char error[64]; //PKCS7填充到16字节整倍数,若待加密数据正好为16字节整倍数,仍需填充16个字节 //计算当前长度距离16整倍数的差值,此值即为填充值 padding = 16 - (input_length % 16); padding_code = (unsigned char)padding; //real_input_length即为经过填充后待加密数据的长度,值必然为16整倍数,同时此长度也为加密后的密文长度 real_input_length = input_length + padding; if (real_input_length % 16 != 0) { //填充后必然为16字节整倍数,若不为16字节整倍数,则填充存在问题 vLog(LOG_ERROR, "failed to padding\n"); return -1; } input_tmp = (unsigned char *)calloc(1, real_input_length); if (input_tmp == NULL) { //分配内存错误 return -2; } //进行填充 memset(input_tmp, 0, real_input_length); memcpy(input_tmp, input, input_length); memset(input_tmp + input_length, padding_code, padding); mbedtls_aes_init(&aes_context); //若使用AES-128,则必要长度为128bits。若使用AES-256,则密钥长度为256bits result = mbedtls_aes_setkey_enc(&aes_context, key, 128); if (result != 0) { mbedtls_strerror(result, (char *)error, sizeof(error)); vLog(LOG_ERROR, "failed to set key:%s\n",error); free(input_tmp); mbedtls_aes_free(&aes_context); return -3; } result = mbedtls_aes_crypt_cbc(&aes_context, MBEDTLS_AES_ENCRYPT, real_input_length, iv, input_tmp, encrypted_data); if (result != 0) { mbedtls_strerror(result, (char *)error, sizeof(error)); vLog(LOG_ERROR, "failed to encrypt data:%s\n",error); free(input_tmp); mbedtls_aes_free(&aes_context); return -4; } free(input_tmp); mbedtls_aes_free(&aes_context); //返回密文长度 return real_input_length; } /* * 一次输入所有待加密数据,并使用AES CBC模式进行加密 * 使用PKCS#7填充的output长度计算应为 output_length = ((input_length/16)+1)*16 * 输入: * encrypted_data:待解密的密文数据 * encrypted_length:待解密的密文数据的有效长度 * decrypted_data:解密的明文数据 *返回值:解密后的明文数据的有效长度,即decrypted_data中数据的有效长度。若为负值,则存在错误。 */ int decrypt_data_by_aes_cbc(unsigned char *encrypted_data, int encrypted_length, unsigned char* iv, unsigned char *decrypted_data) { mbedtls_aes_context aes_context; int result = 0; int padding = 0; unsigned char padding_code; unsigned char error[64]; mbedtls_aes_init(&aes_context); //若使用AES-128,则必要长度为128bits。若使用AES-256,则密钥长度为256bits result = mbedtls_aes_setkey_dec(&aes_context, key, 128); if (result != 0) { mbedtls_strerror(result, (char *)error, sizeof(error)); vLog(LOG_ERROR, "failed to set key:%s\n", error); mbedtls_aes_free(&aes_context); return -1; } //解密后包含填充值的明文,后续需去除填充值 unsigned char decrypted_data_include_padding[encrypted_length]; result = mbedtls_aes_crypt_cbc(&aes_context, MBEDTLS_AES_DECRYPT, encrypted_length, iv, encrypted_data, decrypted_data_include_padding); if (result != 0) { mbedtls_strerror(result, (char *)error, sizeof(error)); vLog(LOG_ERROR, "failed to decrypt data:%s\n", error); mbedtls_aes_free(&aes_context); return -2; } //去除PKCS#7的填充值 //读取最后一个值,此值即为填充值的长度 padding_code = decrypted_data_include_padding[encrypted_length - 1]; padding = (int)padding_code; if (padding < 1 || padding > 16) { vLog(LOG_ERROR, "padding code is illegal!\n"); return -3; } int real_decrypted_data_length = encrypted_length - padding; memcpy(decrypted_data, decrypted_data_include_padding, real_decrypted_data_length); mbedtls_aes_free(&aes_context); return real_decrypted_data_length; } //加密 const std::string _encode(const Json::Value jsonRoot, unsigned char* iv) { std::string outString; Json::StreamWriterBuilder builder; builder["indentation"] = ""; builder["emitUTF8"] = true; std::string inBuf = Json::writeString(builder, jsonRoot); //采用aes加密 //PKCS#7填充的密文长度计算:decrypted_data_length = ((input_length/16)+1)*16,decrypted_data_length = ((11/16)+1)*16 = 16 int encrypted_data_length = (((inBuf.length() >> 4) + 1) << 4); if (encrypted_data_length <= 0) return nullptr; unsigned char* encrypted_data = new unsigned char[encrypted_data_length]; memset(encrypted_data, 0, encrypted_data_length); encrypted_data_length = encrypt_data_by_aes_cbc((unsigned char *)inBuf.c_str(), inBuf.length(), iv, encrypted_data); if (encrypted_data_length < 0) { vLog(LOG_ERROR, "failed to encrypt data!\n"); delete [] encrypted_data; encrypted_data = NULL; return nullptr; } int len = Base64_encodeLength((const b64_data_t *)encrypted_data, encrypted_data_length); if (len > 0) { char *out = new char[len + 1]; do { memset(out, 0, sizeof(char)*(len + 1)); Base64_encode((char *)out, (len + 1), (const b64_data_t *)encrypted_data, encrypted_data_length); } while(0); outString = out; delete [] encrypted_data; encrypted_data = NULL; delete [] out; out = NULL; } return outString; } //解密 Json::Value _decode(const char* pBuf, const int size, unsigned char* iv) { Json::Value jsonRoot; jsonRoot.clear(); std::string err; Json::CharReaderBuilder builder; Json::CharReader* reader(builder.newCharReader()); int len = Base64_decodeLength(pBuf, size); if (len > 0) { char *out = new char[len + 1]; do { memset(out, 0, sizeof(char)*(len + 1)); Base64_decode((b64_data_t *)out, (len + 1), pBuf, size); //aes解密 int decrypted_data_length = (((len >> 4) + 1) << 4); if (decrypted_data_length <= 0) return nullptr; unsigned char* decrypted_data = new unsigned char[decrypted_data_length + 1]; memset(decrypted_data, 0, decrypted_data_length+1); decrypted_data_length = decrypt_data_by_aes_cbc((unsigned char *)out, len, iv, decrypted_data); if ( decrypted_data_length < 0) { vLog(LOG_ERROR, "failed to decrypt data!\n"); delete [] decrypted_data; decrypted_data = NULL; break; } if (!reader->parse((const char *)decrypted_data, (const char *)(decrypted_data + decrypted_data_length), &jsonRoot, &err)) { vLog(LOG_ERROR, "reader->parse(out, out + strlen(out), &jsonRoot, &err) error<%d,%s>。\n", errno, err.c_str()); break; } delete [] decrypted_data; decrypted_data = NULL; } while(0); delete [] out; out = NULL; } return jsonRoot; } bool publish_sensor_data(struct mosquitto *mosq, const std::string traceId, const char* command, const char* topic, const Json::Value payload) { Json::StreamWriterBuilder builder; builder["indentation"] = ""; builder["emitUTF8"] = true; Json::Value jsonRoot; jsonRoot["command"] = command; char str[128]; uuid_t uuid; uuid_generate_time(uuid); uuid_unparse_upper(uuid, str); if (traceId == "") { jsonRoot["traceId"] = str; } else { jsonRoot["traceId"] = traceId; } Json::Int64 mtime = (Json::Int64)time(NULL); mtime *= 1000; jsonRoot["mtime"] = mtime; std::vector tokens = split(std::string(str), '-'); std::string iv = tokens.at(0) + tokens.at(1) + tokens.at(2); jsonRoot["iv"] = iv; jsonRoot["payload"] = _encode(payload, (unsigned char *)iv.c_str()).c_str(); std::string outputConfig = Json::writeString(builder, jsonRoot); vLog(LOG_DEBUG, "send topic: %s, payload: %d\n", topic, outputConfig.length()); int rc = mosquitto_publish(mosq, NULL, topic, outputConfig.length(), outputConfig.c_str(), 0, false); if (rc != MOSQ_ERR_SUCCESS) { vLog(LOG_DEBUG, "publishing topic: %s is error<%d,%s>。\n", topic, rc, mosquitto_strerror(rc)); return false; } return true; } static int GetProcessIDByIRN(QLONG IRN) { int i; for (i = 0; i < PROCESSES_NUM; i++) { if (config.processes[i].irn == IRN) { return i; } } return -1; } static QLONG GetProcessIRNByPid(int pid) { if (pid < 0 || pid >= PROCESSES_NUM) return -1; return config.processes[pid].irn; } static QLONG GetUnitIRNByUid(int uid) { if (uid < 0 || uid >= UNIT_NUM) return -1; return config.units[uid].irn; } static int GetUnitYXCount(int uid) { if (uid < 0 || uid >= UNIT_NUM) return 0; return config.units[uid].yxcount; } static int GetUnitYCCount(int uid) { if (uid < 0 || uid >= UNIT_NUM) return 0; return config.units[uid].yccount; } static int GetUnitYMCount(int uid) { if (uid < 0 || uid >= UNIT_NUM) return 0; return config.units[uid].ymcount; } static float GetUnitYCReal(int uid, int order) { int udb; long value; float coef; float base; struUnit* pUnit; struUnitYC* pYC; if (uid < 0 || uid >= UNIT_NUM) return 0; pUnit = &config.units[uid]; if ((pUnit->state & 0x01) != TRUE) return 0; if (order < 0 || order >= pUnit->yccount) return 0; pYC = &pUnit->ycs[order]; udb = pYC->order; if (udb < 0 || udb >= DATABASE_YC_NUM) { value = pYC->value; coef = 1.0f; base = 0.0f; } else { value = database.ycs[udb].value; coef = pYC->coef; base = pYC->base; pYC->value = value; pYC->update_time = database.ycs[udb].update_time; pYC->qds = database.ycs[udb].qds; } return (float)((float)value * coef + base); } static float GetUnitYCRealFromValue(int uid, int order, long value) { int udb; float coef; float base; struUnit* pUnit; struUnitYC* pYC; if (uid < 0 || uid >= UNIT_NUM) return 0; pUnit = &config.units[uid]; if ((pUnit->state & 0x01) != TRUE) return 0; if (order < 0 || order >= pUnit->yccount) return 0; pYC = &pUnit->ycs[order]; udb = pYC->order; if (udb < 0 || udb >= DATABASE_YC_NUM) { coef = 1.0f; base = 0.0f; } else { coef = pYC->coef; base = pYC->base; } return (float)(value * coef + base); } static float GetUnitYMReal(int uid, int order) { int udb; long value; float coef; float base; struUnit* pUnit; struUnitYM* pYM; if (uid < 0 || uid >= UNIT_NUM) return 0; pUnit = &config.units[uid]; if ((pUnit->state & 0x01) != TRUE) return 0; if (order < 0 || order >= pUnit->ymcount) return 0; pYM = &pUnit->yms[order]; udb = pYM->order; if (udb < 0 || udb >= DATABASE_YM_NUM) { value = pYM->value; coef = 1.0f; base = 0.0f; } else { value = (long)database.yms[udb].value; pYM->update_time = database.yms[udb].update_time; coef = pYM->coef; base = pYM->base; pYM->value = value; } return (float)(value * coef + base); } static BYTE GetUnitYX(int uid, int point) { int udb; BOOLEAN value; struUnit* pUnit; struUnitYX* pYX; if (uid < 0 || uid >= UNIT_NUM) return 0; pUnit = &config.units[uid]; if ((pUnit->state & 0x01) != TRUE) return 0; if (point < 0 || point >= pUnit->yxcount) return 0; pYX = &pUnit->yxs[point]; udb = pYX->order; if (udb < 0 || udb >= DATABASE_YX_NUM) { value = pYX->value; } else { value = database.yxs[udb].value; pYX->value = value; pYX->update_time = database.yxs[udb].update_time; } return value; } static QLONG GetUnitYXIRNByPoint(int uid, int point) { struUnit* pUnit; if (uid < 0 || uid >= UNIT_NUM) return -1; pUnit = &config.units[uid]; if (pUnit->yxcount <= 0) return -1; if (point < 0 || point >= pUnit->yxcount) return -1; return pUnit->yxs[point].irn; } static QLONG GetUnitYCIRNByPoint(int uid, int point) { struUnit* pUnit; if (uid < 0 || uid >= UNIT_NUM) return -1; pUnit = &config.units[uid]; if (pUnit->yccount <= 0) return -1; if (point < 0 || point >= pUnit->yccount) return -1; return pUnit->ycs[point].irn; } static QLONG GetUnitYMIRNByPoint(int uid, int point) { struUnit* pUnit; if (uid < 0 || uid >= UNIT_NUM) return -1; pUnit = &config.units[uid]; if (pUnit->ymcount <= 0) return -1; if (point < 0 || point >= pUnit->ymcount) return -1; return pUnit->yms[point].irn; } static QLONG GetUnitYKIRNByPoint(int uid, int point) { struUnit* pUnit; if (uid < 0 || uid >= UNIT_NUM) return -1; pUnit = &config.units[uid]; if (pUnit->ykcount <= 0) return -1; if (point < 0 || point >= pUnit->ykcount) return -1; return pUnit->yks[point].irn; } static QLONG GetUnitYTIRNByPoint(int uid, int point) { struUnit* pUnit; if (uid < 0 || uid >= UNIT_NUM) return -1; pUnit = &config.units[uid]; if (pUnit->ytcount <= 0) return -1; if (point < 0 || point >= pUnit->ytcount) return -1; return pUnit->yts[point].irn; } static int GetUnitYKPointByIRN(int& uid, QLONG IRN) { int i; int len; struUnit* pUnit; if (ykirn2uid_map.find(IRN) != ykirn2uid_map.end()) { uid = ykirn2uid_map[IRN]; if (uid < 0 || uid >= UNIT_NUM) return -1; pUnit = &config.units[uid]; if (pUnit->ykcount <= 0) return -1; for (i = 0; i < pUnit->ykcount; i++) { if (pUnit->yks[i].irn == IRN) { return i; } } } return -1; } static int GetUnitYTPointByIRN(int& uid, QLONG IRN) { int i; int len; struUnit* pUnit; if (ytirn2uid_map.find(IRN) != ytirn2uid_map.end()) { uid = ykirn2uid_map[IRN]; if (uid < 0 || uid >= UNIT_NUM) return -1; pUnit = &config.units[uid]; if (pUnit->ytcount <= 0) return -1; for (i = 0; i < pUnit->ytcount; i++) { if (pUnit->yts[i].irn == IRN) { return i; } } } return -1; } int GetUnitYXBW(int& uid, BOOLEAN& value, BYTE& qds, int& type, unionCP56Time& st) { int i; int order; int point; while (yxbw.GetYXBW(m_yxbwload, st, order, value, qds, uid, point, type)) { m_yxbwload++; return point; } return -1; } int GetUnitSOE(int& uid, BOOLEAN& value, BYTE& qds, unionCP56Time& st) { int i; int order; int point; while (soe.GetSOE(m_soeload, st, order, value, qds, uid, point)) { m_soeload++; return point; } return -1; } int GetUnitYCBW(int& uid, LONG& value, BYTE& qds, int& type, unionCP56Time& st) { int i; int order; int point; while (ycbw.GetYCBW(m_ycbwload, st, order, value, qds, uid, point, type)) { m_ycbwload++; return point; } return -1; } int get_pid_by_name(const char *process_name) { DIR *dir; struct dirent *entry; char path[PATH_MAX]; // 打开/proc目录 dir = opendir("/proc"); if (dir == NULL) { perror("opendir failed"); return -1; } // 遍历/proc目录中的所有进程 while ((entry = readdir(dir)) != NULL) { if (entry->d_type == DT_DIR) { // 检查进程名是否匹配 if (strcmp(entry->d_name, ".") != 0 && strcmp(entry->d_name, "..") != 0) { snprintf(path, sizeof(path), "/proc/%s/cmdline", entry->d_name); FILE *fp = fopen(path, "r"); if (fp) { char cmdline[PATH_MAX]; fgets(cmdline, sizeof(cmdline), fp); fclose(fp); // 找到匹配的进程 if (strstr(cmdline, process_name) != NULL) { closedir(dir); return atoi(entry->d_name); } } } } } closedir(dir); return -1; } bool thread_created = false; time_t last_time = time(NULL); //历史时间 int file_total_parts = 0; int file_current_parts = 0; char temp_path[MAX_PATH]; int countFilesInDirectory(const char* directory) { DIR *dir; struct dirent *ent; int count = 0; // 打开目录 dir = opendir(directory); if (dir == NULL) { perror("无法打开目录"); return -1; } // 遍历目录 while ((ent = readdir(dir))) { if (strcmp(ent->d_name, ".") == 0 || strcmp(ent->d_name, "..") == 0) { continue; } // 统计文件数量 if (ent->d_type == DT_REG) { count++; } } // 关闭目录 closedir(dir); return count; } int countFilesByTypeInDirectory(const char* directory, const char* fileExtension) { DIR *dir; struct dirent *ent; int count = 0; // 打开目录 dir = opendir(directory); if (dir == NULL) { perror("无法打开目录"); return -1; } // 遍历目录 while ((ent = readdir(dir))) { if (strcmp(ent->d_name, ".") == 0 || strcmp(ent->d_name, "..") == 0) { continue; } // 判断文件类型 if (ent->d_type == DT_REG) { // 使用fnmatch函数匹配文件扩展名 if (fnmatch(fileExtension, ent->d_name, 0) == 0) { count++; } } } // 关闭目录 closedir(dir); return count; } static void *waiting(void *param) { //const char* filePathName = (const char*)param; struct mosquitto *mosq = (struct mosquitto *)param; char filePathName[MAX_PATH]; time_t now_time = time(NULL); int count = 0; vLog(LOG_DEBUG, "判断文件夹:%s的文件是否存在。\n", temp_path); snprintf(filePathName, sizeof(filePathName), "%s/%s", temp_path, configfile); while (true) { sleep(1); //判断文件是否达到数量。 count = countFilesByTypeInDirectory(temp_path, "*.pat"); vLog(LOG_DEBUG, "现在有%d个指定文件,file current parts is: %d, 总共需要%d个文件。\n", count, file_current_parts, file_total_parts); if (file_current_parts == file_total_parts) { //打开指定文件 vLog(LOG_DEBUG, "准备合并文件,文件名为:%s.\n", filePathName); FILE* in = fopen(filePathName, "wb+"); if (in == NULL) break; for (int i = 0; i < count; i++) { char fileName[MAX_PATH]; snprintf(fileName, sizeof(fileName), "%s/%d.pat", temp_path, (i+1)); vLog(LOG_DEBUG, "打开文件名为:%s的文件.\n", fileName); FILE* out = fopen(fileName, "rb"); if (out == NULL) break; while (!feof(out)) { char buffer[1024]; int pos = fread(buffer, 1, sizeof(buffer), out); int w = fwrite(buffer, 1, pos, in); } fclose(out); remove(fileName); } fclose(in); file_current_parts = 0; vLog(LOG_DEBUG, "文件下载完成.\n"); break; } if (now_time >= last_time + 300) { vLog(LOG_DEBUG, "load config file timeout.\n"); //发送超时报文 break; } } count = countFilesByTypeInDirectory(temp_path, "*.pat"); for (int i = 0; i < count; i++) { char fileName[MAX_PATH]; snprintf(fileName, sizeof(fileName), "%s/%d.pat", temp_path, (i+1)); remove(fileName); } vLog(LOG_DEBUG, "结束线程。\n"); file_current_parts = 0; thread_created = false; //发送 /* { "result": true, "message": "success" } */ Json::Value payload; payload["result"] = true; payload["message"] = "success"; publish_sensor_data(mosq, g_traceId, "TERMINAL_CONFIG_UPDATE_RESPONSE", publish_topic, payload); g_dataAcquisitionReload = true; pthread_exit(0); return NULL; } BOOLEAN GetUnitYK(int uid, int& order, BYTE& value, BYTE& act, BYTE& result) { int i; int udb; struUnit* pUnit; struYK* pYK; if (uid < 0 || uid >= UNIT_NUM) return FALSE; pUnit = &config.units[uid]; if ((pUnit->state & 0x01) != TRUE) return FALSE; for (i = 0; i < pUnit->ykcount; i++) { udb = pUnit->yks[i].order; if (udb < 0 || udb >= DATABASE_YK_NUM) continue; pYK = &database.yks[udb]; order = i; value = pYK->value; act = (BYTE)pYK->state; result = (BYTE)pYK->result; if (pYK->op_unit != uid) continue; switch (act) { case YKS_SELED: if (result == YKR_SUCC) { pYK->result = YKR_OPER; yklog.PushYKLog(system32.now, udb, value, YKT_SELRET, YKS_PROC, uid); return TRUE; } else if (result == YKR_FAIL) { pYK->state = YKS_IDLE; pYK->result = YKR_IDLE; pYK->op_unit = -1; return TRUE; } else if (result == YKR_IDLE) { pYK->state = YKS_IDLE; pYK->op_unit = -1; } break; case YKS_EXEED: pYK->state = YKS_IDLE; pYK->result = YKR_IDLE; pYK->op_unit = -1; if (result == YKR_SUCC || result == YKR_FAIL) { return TRUE; } break; case YKS_ABRED: pYK->state = YKS_IDLE; pYK->result = YKR_IDLE; pYK->op_unit = -1; return TRUE; case YKS_EXEING: if (result == YKR_OVER) { pYK->state = YKS_IDLE; pYK->result = YKR_IDLE; pYK->op_unit = -1; return TRUE; } break; case YKS_SELREQ: if (result == YKR_OVER) { pYK->state = YKS_IDLE; pYK->result = YKR_IDLE; pYK->op_unit = -1; return TRUE; } break; case YKS_SELING: if (result == YKR_OVER) { pYK->state = YKS_IDLE; pYK->result = YKR_IDLE; pYK->op_unit = -1; return TRUE; } break; case YKS_ABRREQ: case YKS_EXEREQ: case YKS_IDLE: break; default: pYK->state = YKS_IDLE; pYK->result = YKR_IDLE; pYK->op_unit = -1; break; } } return FALSE; } void SetUnitYK(int uid, int order, BYTE value, BYTE act, BYTE result) { int udb; struUnit* pUnit; struYK* pYK; if (uid < 0 || uid >= UNIT_NUM) return; pUnit = &config.units[uid]; if ((pUnit->state & 0x01) != TRUE) return; if (order < 0 || order >= pUnit->ykcount) return; udb = pUnit->yks[order].order; if (udb < 0 || udb >= DATABASE_YK_NUM) return; pYK = &database.yks[udb]; switch (act) { case YKS_SELREQ: if (pYK->state == YKS_IDLE /*&& pYK->op_unit == -1*/) { pYK->state = act; pYK->result = YKR_IDLE; pYK->value = value; pYK->op_time = system32.timers; pYK->op_unit = uid; yklog.PushYKLog(system32.now, udb, value, YKT_SELREQ, YKS_PROC, uid); } break; case YKS_ABRREQ: if (pYK->op_unit != uid) break; if (pYK->value != value) break; if (pYK->state != YKS_IDLE && pYK->state != YKS_EXEED) { pYK->state = act; pYK->result = YKR_IDLE; yklog.PushYKLog(system32.now, udb, value, YKT_ABRREQ, YKS_PROC, uid); } break; case YKS_EXEREQ: if (pYK->op_unit != uid) break; if (pYK->value != value) break; if (pYK->state == YKS_SELED && pYK->result == YKR_OPER) { pYK->state = act; pYK->result = YKR_IDLE; yklog.PushYKLog(system32.now, udb, value, YKT_EXEREQ, YKS_PROC, uid); } break; } } BOOLEAN GetUnitYT(int uid, int& order, DWORD& value, BYTE& act, BYTE& result) { int i; int udb; struUnit* pUnit; struYT* pYT; if (uid < 0 || uid >= UNIT_NUM) return FALSE; pUnit = &config.units[uid]; if ((pUnit->state & 0x01) != TRUE) return FALSE; for (i = 0; i < pUnit->ytcount; i++) { udb = pUnit->yts[i].order; if (udb < 0 || udb >= DATABASE_YT_NUM) continue; pYT = &database.yts[udb]; order = i; value = (DWORD)pYT->value; act = (BYTE)pYT->state; result = (BYTE)pYT->result; if (pYT->op_unit != uid) continue; switch (act) { case YTS_SELED: if (result == YTR_SUCC) { pYT->result = YTR_OPER; ytlog.PushYTLog(system32.now, udb, value, YTT_SELRET, YTS_PROC, uid); return TRUE; } else if (result == YTR_FAIL) { pYT->state = YTS_IDLE; pYT->result = YTR_IDLE; pYT->op_unit = -1; return TRUE; } else if (result == YTR_IDLE) { pYT->state = YTS_IDLE; pYT->op_unit = -1; } break; case YTS_EXEED: pYT->state = YTS_IDLE; pYT->result = YTR_IDLE; pYT->op_unit = -1; if (result == YTR_SUCC || result == YTR_FAIL) { return TRUE; } break; case YTS_ABRED: pYT->state = YTS_IDLE; pYT->result = YTR_IDLE; pYT->op_unit = -1; return TRUE; case YTS_EXEING: if (result == YTR_OVER) { pYT->state = YTS_IDLE; pYT->result = YTR_IDLE; pYT->op_unit = -1; return TRUE; } break; case YTS_SELREQ: if (result == YTR_OVER) { pYT->state = YTS_IDLE; pYT->result = YTR_IDLE; pYT->op_unit = -1; return TRUE; } break; case YTS_SELING: if (result == YTR_OVER) { pYT->state = YTS_IDLE; pYT->result = YTR_IDLE; pYT->op_unit = -1; return TRUE; } break; case YTS_ABRREQ: case YTS_EXEREQ: case YTS_IDLE: break; default: pYT->state = YTS_IDLE; pYT->result = YTR_IDLE; pYT->op_unit = -1; break; } } return FALSE; } void SetUnitYT(int uid, int order, DWORD value, BYTE act, BYTE result) { int udb; struUnit* pUnit; struYT* pYT; if (uid < 0 || uid >= UNIT_NUM) return; pUnit = &config.units[uid]; if ((pUnit->state & 0x01) != TRUE) return; if (order < 0 || order >= pUnit->ytcount) return; udb = pUnit->yts[order].order; if (udb < 0 || udb >= DATABASE_YT_NUM) return; pYT = &database.yts[udb]; switch (act) { case YTS_SELREQ: if (pYT->state == YTS_IDLE /*&& pYT->op_unit == -1*/) { pYT->state = act; pYT->result = YTR_IDLE; pYT->value = value; pYT->op_time = system32.timers; pYT->op_unit = uid; ytlog.PushYTLog(system32.now, udb, value, YTT_SELREQ, YTS_PROC, uid); } break; case YTS_ABRREQ: if (pYT->op_unit != uid) break; if (pYT->value != (long)value) break; if (pYT->state != YTS_IDLE && pYT->state != YTS_EXEED) { pYT->state = act; pYT->result = YTR_IDLE; ytlog.PushYTLog(system32.now, udb, value, YTT_ABRREQ, YTS_PROC, uid); } break; case YTS_EXEREQ: if ((pYT->state == YTS_SELED || pYT->state == YTS_IDLE)) { pYT->state = act; pYT->result = YTR_IDLE; pYT->value = value; pYT->op_time = system32.timers; pYT->op_unit = uid; ytlog.PushYTLog(system32.now, udb, value, YTT_EXEREQ, YTS_PROC, uid); } break; } } #define CMD_CONTROL_OPERATION 0 #define CMD_CONTROL_SETTING 1 int MakeYKFrame(struct mosquitto *mosq) { int uid; int order; BYTE value, action, result; for (int i = 0; i < m_total_units.size(); i++) { uid = m_total_units.at(i); if (uid < 0 || uid >= UNIT_NUM) continue; if (!GetUnitYK(uid, order, value, action, result)) continue; vLog(LOG_WARN, "Unit(%d) yk(%d) %s state is %s result is %s\n", uid, order, (value ? "CLOSE" : "TRIP"), val_to_str(action, yk_state, "STATE=%d"), val_to_str(result, yk_result, "RESULT=%d")); //发送确认 Json::Value jsonRoot; jsonRoot["irn"] = (Json::Int64)GetUnitYKIRNByPoint(uid, order); jsonRoot["operType"] = CMD_CONTROL_OPERATION; jsonRoot["operValue"] = value; if (YKS_SELED == action && YKR_FAIL == result) { action = YKS_ABRED; } else if (YKS_SELREQ == action && YKR_OVER == result) { action = YKS_ABRED; } else if (YKS_SELING == action && YKR_OVER == result) { action = YKS_ABRED; } if (YKS_SELED == action) { SetUnitYK(uid, order, value, YKS_EXEREQ, YKR_IDLE); vLog(LOG_WARN, "Unit(%d) yk(%d) %s state is YKS_EXEREQ result is YKR_IDLE.\n", uid, order, (value ? "CLOSE" : "TRIP")); return 1; } else if (YKS_ABRED == action) { SetUnitYK(uid, order, value, YKS_ABRREQ, YKR_IDLE); vLog(LOG_WARN, "Unit(%d) yk(%d) %s state is YKS_ABRREQ result is YKR_IDLE.\n", uid, order, (value ? "CLOSE" : "TRIP")); jsonRoot["result"] = false; } else { jsonRoot["result"] = true; } publish_sensor_data(mosq, g_traceId, "CONTROL_REPLY", publish_topic, jsonRoot); return 1; } return 0; } int MakeYTFrame(struct mosquitto *mosq) { int uid; int order; BYTE action, result; DWORD value; for (int i = 0; i < m_total_units.size(); i++) { if (uid < 0 || uid >= UNIT_NUM) continue; if (!GetUnitYT(uid, order, value, action, result)) continue; vLog(LOG_DEBUG, "Unit(%d) set point(%d) %d state is %s result is %s\n", uid, order, value, val_to_str(action, yt_state, "STATE=%d"), val_to_str(result, yt_result, "RESULT=%d")); //发送确认 Json::Value jsonRoot; jsonRoot["irn"] = (Json::Int64)GetUnitYTIRNByPoint(uid, order); jsonRoot["operType"] = CMD_CONTROL_SETTING; jsonRoot["operValue"] = value; if (YTS_SELED == action && YTR_FAIL == result) { action = YTS_ABRED; } else if (YTS_SELREQ == action && YTR_OVER == result) { action = YTS_ABRED; } else if (YTS_SELING == action && YTR_OVER == result) { action = YTS_ABRED; } if (YTS_SELED == action) { SetUnitYT(uid, order, value, YTS_EXEREQ, YTR_IDLE); vLog(LOG_DEBUG, "Unit(%d) set point(%d) %d state is YTS_EXEREQ result is YTR_IDLE.\n", uid, order, value); return 1; } else if (YTS_ABRED == action) { SetUnitYT(uid, order, value, YTS_ABRREQ, YTR_IDLE); vLog(LOG_DEBUG, "Unit(%d) set point(%d) %d state is YTS_ABRREQ result is YTR_IDLE.\n", uid, order, value); jsonRoot["result"] = false; } else { jsonRoot["result"] = true; } publish_sensor_data(mosq, g_traceId, "CONTROL_REPLY", publish_topic, jsonRoot); return 1; } return 0; } bool OnReceivedDeviceCommand(Json::Value jsonRoot) { if (jsonRoot["irn"].isNull()) return FALSE; if (jsonRoot["operType"].isNull()) return FALSE; if (jsonRoot["operValue"].isNull()) return FALSE; int uid = 0;//GetCurUnitID(); Json::Int64 irn = jsonRoot["irn"].asInt64(); int operType = jsonRoot["operType"].asInt(); int operValue = jsonRoot["operValue"].asInt(); int point; if (operType == CMD_CONTROL_OPERATION) //遥控 { //根据irn来查找point point = GetUnitYKPointByIRN(uid, irn); if (point < 0) { vLog(LOG_ERROR, "未能找到对应的遥控点号,请检查并确认。\n"); return FALSE; } SetUnitYK(uid, point, (operValue & 0x03), YKS_SELREQ, YKR_IDLE); vLog(LOG_WARN, "Unit(%d) yk(%d) %s state is YKS_SELREQ result is YKR_IDLE.\n", uid, point, ((operValue & 0x03) ? "CLOSE" : "TRIP")); } else if (operType == CMD_CONTROL_SETTING) //遥调 { //根据irn来查找point point = GetUnitYTPointByIRN(uid, irn); if (point < 0) { vLog(LOG_ERROR, "未能找到对应的遥调点号,请检查并确认。\n"); return FALSE; } SetUnitYT(uid, point, operValue, YTS_EXEREQ, YTR_IDLE); vLog(LOG_DEBUG, "Unit(%d) set point(%d) %d state is YTS_EXEREQ result is YTR_IDLE.\n", uid, point, operValue); } else { vLog(LOG_ERROR, "平台下发的<%d>命令错误。operType不是<0-遥控或1-遥调>,系统不支持的命令!\n", operType); return FALSE; } return TRUE; } bool OnReceivedLogCommand(Json::Value jsonRoot) { if (jsonRoot["linkIrn"].isNull()) return FALSE; QLONG linkIrn = -1; if (jsonRoot["linkIrn"].isInt64()) linkIrn = jsonRoot["linkIrn"].asInt64(); struMonLinkLog.m_iStartTime = system32.timers; struMonLinkLog.m_iLinkIrn = linkIrn; channelBuffer.enabled = TRUE; channelBuffer.mon_port = GetProcessIDByIRN(linkIrn);; vLog(LOG_DEBUG, "receive action LINK_LOG_REQUEST, irn is:%lld, pid is: %d\n", struMonLinkLog.m_iLinkIrn, channelBuffer.mon_port); return TRUE; } bool OnReceivedSystemAction(struct mosquitto *mosq, const std::string traceId, const std::string command, Json::Value payload, const void* obj) { int rc; int code = 0; std::string reason; do { if (command == "TERMINAL_CONFIG_UPDATE") { //数据网关配置更新动作 int version = -1; int totalParts = 0; int currentPart = 0; if (payload["totalParts"].isNull()) { vLog(LOG_DEBUG, "no totalParts part.\n"); return false; } if (payload["currentPart"].isNull()) { vLog(LOG_DEBUG, "no currentPart part.\n"); return false; } if (payload["data"].isNull()) { vLog(LOG_DEBUG, "no data part.\n"); return false; } if (payload["partMD5"].isNull()) { vLog(LOG_DEBUG, "no partMD5 part.\n"); return false; } if (payload["version"].isInt()) version = payload["version"].asInt(); if (payload["totalParts"].isInt()) totalParts = payload["totalParts"].asInt(); if (payload["currentPart"].isInt()) currentPart = payload["currentPart"].asInt(); vLog(LOG_DEBUG, "totalParts is: %d, and currentPart is: %d.\n", totalParts, currentPart); g_traceId = traceId; //response命令的traceId #if 0 if (config.version < 0) { code = 201; reason = "当前版本采用离线运行方式"; vLog(LOG_DEBUG, "当前版本采用离线运行方式。请联系工作人员\n"); } else if (config.version >= version) { code = 200; reason = "当前版本大于需更新版本"; vLog(LOG_DEBUG, "当前版本大于需更新版本,本次无需更新。\n"); } #endif // else { //此处判断一下开启一个线程 if (!thread_created) { vLog(LOG_DEBUG, "创建一个等待线程。\n"); pthread_t tid; pthread_create(&tid, NULL, waiting, (void *)mosq); thread_created = true; file_total_parts = totalParts; } //更新最后收到报文时间 last_time = time(NULL); // config.version = version; //解码 std::string data = payload["data"].asString(); std::string partMD5 = payload["partMD5"].asString(); int r; char *out; int len = Base64_decodeLength(data.c_str(), data.length()); vLog(LOG_DEBUG, "base64解码长度 is: %d\n", len); if (len > 0) { out = new char[len+1]; memset(out, 0, sizeof(char)*(len + 1)); r = Base64_decode((b64_data_t *)out, (len + 1), data.c_str(), data.length()); MD5_CTX md5; MD5_Init(&md5); MD5_Update(&md5, (void *)out, len); unsigned char result[32]; MD5_Final(result, &md5); std::string check; char sum[54]; snprintf(sum, sizeof(sum), "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", result[0], result[1], result[2], result[3], result[4], result[5], result[6], result[7], result[8], result[9], result[10], result[11], result[12], result[13], result[14], result[15]); check = sum; vLog(LOG_DEBUG, "partMD5 is: %s, checkMD5 is: %s\n", partMD5.c_str(), check.c_str()); if (check == partMD5) { char file[128]; snprintf(file, sizeof(file), "%s/%d.pat", temp_path, currentPart); vLog(LOG_DEBUG, "准备写入文件:%s,%s.\n", file, (const char*)obj); FILE* pf = fopen(file, "w+"); if (pf) { fwrite(out, len, 1, pf); fclose(pf); file_current_parts++; } } delete [] out; } } } else if (command == "CMD_CONTROL") { g_traceId = traceId; //response命令的traceId OnReceivedDeviceCommand(payload); } else if (command == "LINK_LOG_REQUEST") { OnReceivedLogCommand(payload); } } while (0); return true; } /* Callback called when the client receives a CONNACK message from the broker. */ void on_connect(struct mosquitto *mosq, void *obj, int reason_code) { int rc; vLog(LOG_DEBUG, "on_connect: %s\n", mosquitto_connack_string(reason_code)); if (reason_code != 0) { mosquitto_disconnect(mosq); } rc = mosquitto_subscribe(mosq, NULL, subscribe_topic, 0); if (rc != MOSQ_ERR_SUCCESS) { vLog(LOG_ERROR, "Error subscribing: %s\n", mosquitto_strerror(rc)); mosquitto_disconnect(mosq); } } /* Callback called when the broker sends a SUBACK in response to a SUBSCRIBE. */ void on_subscribe(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos) { int i; bool have_subscription = false; for (i = 0; i < qos_count; i++) { vLog(LOG_DEBUG, "on_subscribe: %d:granted qos = %d\n", i, granted_qos[i]); if (granted_qos[i] <= 2) { have_subscription = true; } } if (have_subscription == false) { vLog(LOG_ERROR, "Error: All subscriptions rejected.\n"); mosquitto_disconnect(mosq); } } /* Callback called when the client receives a message. */ void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) { /* This blindly prints the payload, but the payload can be anything so take care. */ vLog(LOG_DEBUG, "topic: %s, payload is: %.*s\n", msg->topic, 128, (char *)msg->payload); Json::Value jsonRoot; jsonRoot.clear(); char *buffer = new char[msg->payloadlen+1]; if (buffer == NULL) return; memset(buffer, 0, msg->payloadlen+1); strncpy(buffer, (char *)msg->payload, msg->payloadlen); std::string err; Json::CharReaderBuilder builder; Json::CharReader* reader(builder.newCharReader()); do { if (!reader->parse(buffer, buffer + msg->payloadlen, &jsonRoot, &err)) { vLog(LOG_ERROR, "reader->parse(buffer, buffer + msg->payloadlen, &jsonRoot, &err) error<%d,%s>。\n", errno, err.c_str()); break; } if (jsonRoot["command"].isNull()) { vLog(LOG_ERROR, "json format lost command part.\n"); break; } if (jsonRoot["iv"].isNull()) { vLog(LOG_ERROR, "json format lost iv part.\n"); break; } if (jsonRoot["payload"].isNull()) { vLog(LOG_ERROR, "json format lost payload part.\n"); break; } std::string command = jsonRoot["command"].asString(); std::string traceId = jsonRoot["traceId"].asString(); std::string iv = jsonRoot["iv"].asString(); Json::Int64 mtime = jsonRoot["mtime"].asInt64(); std::string datas = jsonRoot["payload"].asString(); //解码 Json::Value payload = _decode(datas.c_str(), datas.length(), (unsigned char *)iv.c_str()); OnReceivedSystemAction(mosq, traceId, command, payload, (const void*)obj); } while (0); delete [] buffer; buffer = NULL; } #endif pthread_mutex_t mutex; void stop(int signo) { pthread_mutex_lock(&mutex); int i; vLog(LOG_ERROR, "cmg received exit signel(%d)\n", signo); //#ifndef WIN32 // if (signo != SIGPIPE) //#endif { for (i = 0; i < PROCESSES_NUM; i++) { if (procs[i] != NULL) { procs[i]->Destroy(); delete procs[i]; procs[i] = NULL; } } destroy_thread(); } if (zlog_inited) zlog_fini(); #ifdef WIN32 if (g_bSocketStarted) WSACleanup(); #endif signo = 0; pthread_mutex_unlock(&mutex); exit(signo); } void heart_beat(struct mosquitto* mosq, int status) { //发送心跳报文 Json::Value payload; payload["ttl"] = 30000; payload["status"] = status; if (status == 1) { Json::Value jsonItem; Json::Value jsonValue; for (int i = 0; i < (PROCESSES_NUM - 1); i++) { //最后一个为系统设置的本地调试协议 if (config.processes[i].state == TRUE) { jsonValue["linkIrn"] = (Json::Int64)GetProcessIRNByPid(i); jsonValue["online"] = (config.processes[i].softdog >= PROCESS_WATCHDOG_TIME) ? false : true; jsonItem.append(jsonValue); } } if (jsonItem.size() > 0) { payload["links"] = jsonItem; } } publish_sensor_data(mosq, "", "HEARTBEAT", publish_realdata_topic, payload); } DWORD OnReadChannelRingBuff(char *rbuff, DWORD len) { DWORD l; if (channelBuffer.buf == NULL) return -1; len = wMin(len, (channelBuffer.save - channelBuffer.load)); /* 第一部分的拷贝:从环形缓冲区读取数据直至缓冲区最后一个 */ l = wMin(len, MAX_DISPLAY_BUFFER_SIZE - (channelBuffer.load & (MAX_DISPLAY_BUFFER_SIZE - 1))); memcpy(rbuff, channelBuffer.buf + (channelBuffer.load & (MAX_DISPLAY_BUFFER_SIZE - 1)), l); /* 如果溢出则在缓冲区头读取剩余的部分如果没溢出这句代码相当于无效 */ memcpy(rbuff + l, channelBuffer.buf, len - l); channelBuffer.load += len; return len; } BOOLEAN publishMonLinkLog(struct mosquitto* mosq) { Json::Value jsonRoot; Json::Value jsonItem; if (NULL == channelBuffer.buf) return FALSE; if (!channelBuffer.enabled) return FALSE; char str_len[4]; char disp[2048]; int disp_len = 0; int len = 0; while (channelBuffer.load != channelBuffer.save) { //读取一个字节判断该字节是否位t和r len = OnReadChannelRingBuff(&disp[disp_len], 1); if (disp[disp_len] == 't' || disp[disp_len] == 'r') { OnReadChannelRingBuff(str_len, 4); len = atoi(str_len); if (len) { disp_len = 0; if (disp[0] == 'r') disp[0] = 'R'; else if (disp[0] == 't') disp[0] = 'S'; disp[1] = ':'; len = OnReadChannelRingBuff((char *)&disp[2], len); disp[len+2] = '\0'; jsonItem.append(disp); continue; } } disp_len += len; } if (disp_len) jsonItem.append(disp); jsonRoot["linkIrn"] = struMonLinkLog.m_iLinkIrn; jsonRoot["logs"] = jsonItem; if (jsonItem.size() <= 0) return TRUE; publish_sensor_data(mosq, "", "LINK_LOG", publish_topic, jsonRoot); } bool publishRealData(struct mosquitto* mosq) { int i; int uid; int count = 0; Json::Value jsonRoot; QLONG dataTime = system32.timers; dataTime *= 1000; Json::Value yxs; Json::Value ycs; Json::Value yms; for (int k = 0; k < m_total_units.size(); k++) { uid = m_total_units.at(k); //vLog(LOG_DEBUG, "config.units[%d].value is %d.\n", uid, config.units[uid].value); if (config.units[uid].value == SPI_ON) continue; count = GetUnitYCCount(uid); if (count) { Json::Value jsonValue; for (i = 0; i < count; i++) { float value = GetUnitYCReal(uid, i); jsonValue["irn"] = (Json::Int64)GetUnitYCIRNByPoint(uid, i); jsonValue["dataValue"] = value; jsonValue["dataTime"] = (Json::Int64)dataTime; ycs.append(jsonValue); } } count = GetUnitYMCount(uid); if (count) { Json::Value jsonItem; Json::Value jsonValue; for (i = 0; i < count; i++) { float value = GetUnitYMReal(uid, i); jsonValue["irn"] = (Json::Int64)GetUnitYMIRNByPoint(uid, i); jsonValue["dataValue"] = value; jsonValue["dataTime"] = (Json::Int64)dataTime; yms.append(jsonValue); } } count = GetUnitYXCount(uid); if (count) { Json::Value jsonItem; Json::Value jsonValue; for (i = 0; i < count; i++) { BYTE value = GetUnitYX(uid, i); jsonValue["irn"] = (Json::Int64)GetUnitYXIRNByPoint(uid, i); jsonValue["dataValue"] = value; jsonValue["dataTime"] = (Json::Int64)dataTime; yxs.append(jsonValue); } } } #if 0 jsonRoot["pushTime"] = (Json::Int64)dataTime; #endif if (!yxs.isNull()) jsonRoot["yxs"] = yxs; if (!ycs.isNull()) jsonRoot["ycs"] = ycs; if (!yms.isNull()) jsonRoot["yms"] = yms; if (jsonRoot.isNull()) return true; return publish_sensor_data(mosq, "", "REAL_DATA", publish_realdata_topic, jsonRoot); } bool publishSOEData(struct mosquitto* mosq) { int i, uid; int soe_point; BOOLEAN soe_value; BYTE soe_qds; unionCP56Time soe_time; QWORD dataTime; Json::Value jsonRoot; Json::Value jsonItem; Json::Value jsonValue; for (i = 0; i < DATABASE_SOE_NUM; i++) { soe_point = GetUnitSOE(uid, soe_value, soe_qds, soe_time); if (soe_point < 0) break; jsonValue["irn"] = (Json::Int64)GetUnitYXIRNByPoint(uid, soe_point); jsonValue["dataValue"] = soe_value; dataTime = unionCP56TimetoTime_t(&soe_time); dataTime *= 1000; dataTime += soe_time.millisecond % 1000; jsonValue["dataTime"] = (Json::Int64)dataTime; jsonItem.append(jsonValue); } if (jsonItem.size() <= 0) return FALSE; #if 0 QLONG pushTime = system32.timers; pushTime *= 1000; jsonRoot["pushTime"] = (Json::Int64)pushTime; #endif jsonRoot["soe"] = jsonItem; return publish_sensor_data(mosq, "", "SOE_DATA", publish_realdata_topic, jsonRoot); } bool publishYXBWData(struct mosquitto* mosq) { int i, uid; int yxbw_point; int yxbw_type; BOOLEAN yxbw_value; BYTE yxbw_qds; unionCP56Time yxbw_time; QWORD dataTime; Json::Value jsonRoot; Json::Value jsonItem; Json::Value jsonValue; for (i = 0; i < DATABASE_YXBW_NUM; i++) { yxbw_point = GetUnitYXBW(uid, yxbw_value, yxbw_qds, yxbw_type, yxbw_time); if (yxbw_point < 0) break; jsonValue["irn"] = (Json::Int64)GetUnitYXIRNByPoint(uid, yxbw_point); jsonValue["dataValue"] = yxbw_value; dataTime = unionCP56TimetoTime_t(&yxbw_time); dataTime *= 1000; dataTime += yxbw_time.millisecond % 1000; jsonValue["dataTime"] = (Json::Int64)dataTime; jsonItem.append(jsonValue); } jsonRoot["yxs"] = jsonItem; if (jsonItem.size() <= 0) return FALSE; #if 0 QLONG pushTime = system32.timers; pushTime *= 1000; jsonRoot["pushTime"] = (Json::Int64)pushTime; #endif return publish_sensor_data(mosq, "", "REAL_DATA", publish_realdata_topic, jsonRoot); } bool publishYCBWData(struct mosquitto* mosq) { int i; int uid; Json::Value jsonRoot; Json::Value jsonItem; Json::Value jsonValue; int ycbw_point; int ycbw_type; LONG ycbw_value; BYTE ycbw_qds; unionCP56Time ycbw_time; QWORD dataTime; for (i = 0; i < DATABASE_YCBW_NUM; i++) { ycbw_point = GetUnitYCBW(uid, ycbw_value, ycbw_qds, ycbw_type, ycbw_time); if (ycbw_point < 0) break; jsonValue["irn"] = (Json::Int64)GetUnitYCIRNByPoint(uid, ycbw_point); jsonValue["dataValue"] = GetUnitYCRealFromValue(uid, ycbw_point, ycbw_value); dataTime = unionCP56TimetoTime_t(&ycbw_time); dataTime *= 1000; dataTime += ycbw_time.millisecond % 1000; jsonValue["dataTime"] = (Json::Int64)dataTime; jsonItem.append(jsonValue); } jsonRoot["ycs"] = jsonItem; if (jsonItem.size() <= 0) return FALSE; return publish_sensor_data(mosq, "", "REAL_DATA", publish_realdata_topic, jsonRoot); } int main(int argc, char** argv) { int c; BOOLEAN enable_sqlite_configuration = FALSE; BOOLEAN enable_auto_platform = TRUE; char issmqtt_config_pathName[256] = {'\0'}; char host[256] = {"192.168.129.31"}; int port = 1883; char clientid[128] = {"iss_cmg"}; char username[128] = {"usp3"}; char password[128] = {"zaq12WSX"}; char topic[128] = {"USP/v99"}; uuid_t uuid; char cid[128]; uuid_generate_time(uuid); uuid_unparse_upper(uuid, cid); std::vector tokens = split(std::string(cid), '-'); std::string iv = tokens.back(); snprintf(clientid, sizeof(clientid), "iss_cmg_%s", iv.c_str()); //获取可执行文件所在目录 const char default_config[] = "[global]\n" "default format = \"%d.%ms [%-6V] - %m%n\"\n" "[rules]\n" "my_cat.* >stderr\n"; int rc = dzlog_init("./application.conf", "my_cat"); if (rc < 0) { rc = dzlog_init(default_config, "my_cat"); if (rc < 0) zlog_inited = FALSE; else { fprintf(stderr, "dzlog_init(\"./application.conf\", \"my_cat\") failed, load default config.\n"); zlog_inited = TRUE; } } else { zlog_inited = TRUE; } pthread_mutex_init(&mutex, NULL); static struct option long_options[] = { { "directory", required_argument, NULL, 'd' }, { "file", required_argument, NULL, 'f' }, { "host", required_argument, NULL, 'h'}, { "port", required_argument, NULL, 'p'}, { "key", required_argument, NULL, 'k'}, { "username", required_argument, NULL, 'u' }, { "password", required_argument, NULL, 'P' }, { "clientid", required_argument, NULL, 'i'}, { "topic", required_argument, NULL, 't'}, { "help", no_argument, NULL, 'H' }, { NULL, 0, NULL, 0 } }; while (1) { int opt_index = 0; c = getopt_long(argc, argv, "d:f:h:p:i:u:P:t:k:H", long_options, &opt_index); if (c == -1) break; switch (c) { case 'd': snprintf(configpath, sizeof(configpath), "%s", optarg); break; case 'f': snprintf(configfile, sizeof(configfile), "%s", optarg); break; case 'h': snprintf(host, sizeof(host), "%s", optarg); break; case 'p': port = strtol(optarg, NULL, 10); break; case 'i': snprintf(clientid, sizeof(clientid), "%s", optarg); break; case 'u': snprintf(username, sizeof(username), "%s", optarg); break; case 'P': snprintf(password, sizeof(password), "%s", optarg); break; case 't': snprintf(topic, sizeof(topic), "%s", optarg); break; case 'k': snprintf((char *)key, sizeof(key), "%s", optarg); break; case '?': case 'H': vLog(LOG_DEBUG, "Usage: %s [OPTION]... \n" " -d, --directory : set configuration file directory. Default is /data/config/rtufiles\n" " -f, --file : set configuration file name. Default is config.db\n" " -h, --host : mqtt ip. Default is localhost\n" " -p, --port : mqtt port, default is 1883\n" " -u, --username : mqtt user name, default is usp3\n" " -P, --password : mqtt user password, default is zaq12WSX\n" " -i, --clientid : mqtt client id, default is dataAcquisition_xx\n" " -t, --topic : mqtt base topic, default is USP/v99\n" " -H, --help : print this usage\n", argv[0]); return (EXIT_SUCCESS); default: vLog(LOG_DEBUG, "?? getopt returned character code 0%c ??\n", c); } } #ifdef WIN32 int err; WORD wVersionRequested; WSADATA wsaData; wVersionRequested = MAKEWORD(2, 2); err = WSAStartup(wVersionRequested, &wsaData); if (err != 0) { vLog(LOG_ERROR, "network init error(%d,%s)\n", errno, strerror(errno)); return FALSE; } if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2) { WSACleanup(); vLog(LOG_ERROR, "network version error.\r\n"); return FALSE; } g_bSocketStarted = TRUE; signal(SIGBREAK, stop); #else signal(SIGHUP, stop); signal(SIGQUIT, stop); signal(SIGKILL, stop); signal(SIGPIPE, stop); signal(SIGSTOP, stop); #endif //WIN32 signal(SIGINT, stop); signal(SIGILL, stop); signal(SIGSEGV, stop); signal(SIGTERM, stop); signal(SIGABRT, stop); signal(SIGFPE, stop); struct mosquitto *mosq = NULL; mosquitto_lib_init(); do { int status = 2; //0 - 离线, 1 - 在线, 2 - 未配置 g_dataAcquisitionReload = false; vLog(LOG_DEBUG, "system initialize...\n"); char szHostCode[128]; memset(szHostCode, '\0', sizeof(szHostCode)); if (!initialize_system(FALSE, FALSE, NULL, szHostCode)) { vLog(LOG_ERROR, "system initialize error.\n"); if (enable_auto_platform) { //主动和平台链接 char szHostName[32] = ""; gethostname(szHostName, sizeof(szHostName)); nodes.m_node[0].m_netnode_no = 0; nodes.m_node[0].m_tcitype = MASTER_TCI; nodes.m_node[0].m_target_addr = INADDR_LOOPBACK; nodes.m_node[0].m_target_port = 15000; snprintf(nodes.m_node[0].m_machine_name, sizeof(nodes.m_node[0].m_machine_name), "%s", szHostName); nodes.m_node[0].irn = strtoll(irn, NULL, 10); vLog(LOG_DEBUG, "nodes.m_node[0].m_machine_name is: %s\n", nodes.m_node[0].m_machine_name); } } else { snprintf(clientid, sizeof(clientid), "%s", szHostCode); vLog(LOG_DEBUG, "configed client id is: %s.\n", clientid); if (enable_auto_platform) { //增加协议和单元配置的数量统计 m_total_processes.clear(); m_total_units.clear(); ykirn2uid_map.clear(); ytirn2uid_map.clear(); for (int i = 0; i < (PROCESSES_NUM - 1); i++) { if (config.processes[i].state) { m_total_processes.push_back(i); for (int j = 0; j < PROCESS_UNIT_NUM; j++) { int uid = config.processes[i].units[j]; if (uid >= 0) { //if (config.units[uid].type == MASTER_UNIT) { //平台下发的配置,装置都为主设备 m_total_units.push_back(uid); for (int k = 0; k < config.units[uid].ykcount; k++) { QLONG key = config.units[uid].yks[k].irn; if (ykirn2uid_map.find(key) == ykirn2uid_map.end()) { ykirn2uid_map.insert(std::unordered_map::value_type(key, uid)); } else { vLog(LOG_DEBUG, "at least two yk points have the same irn.\n"); } } for (int k = 0; k < config.units[uid].ytcount; k++) { QLONG key = config.units[uid].yts[k].irn; if (ytirn2uid_map.find(key) == ytirn2uid_map.end()) { ytirn2uid_map.insert(std::unordered_map::value_type(key, uid)); } else { vLog(LOG_DEBUG, "at least two yt points have the same irn.\n"); } } } } } } } vLog(LOG_DEBUG, "here have %d processes, and %d units.\n", m_total_processes.size(), m_total_units.size()); status = 1; } } if (enable_auto_platform) { if (irn[0] == '\0') { vLog(LOG_ERROR, "node irn cann't be empty.\n"); return (EXIT_FAILURE); } //创建mqtt服务 mosq = mosquitto_new(clientid, true, issmqtt_config_pathName); if (mosq == NULL) { vLog(LOG_ERROR, "Error: Out of memory.\n"); break; } /* Configure callbacks. This should be done before connecting ideally. */ mosquitto_username_pw_set(mosq, username, password); mosquitto_connect_callback_set(mosq, on_connect); mosquitto_subscribe_callback_set(mosq, on_subscribe); mosquitto_message_callback_set(mosq, on_message); rc = mosquitto_connect(mosq, host, port, 60); if (rc != MOSQ_ERR_SUCCESS) { vLog(LOG_ERROR, "mosquitto_connect Error: %s\n", mosquitto_strerror(rc)); } /* Run the network loop in a background thread, this call returns quickly. */ rc = mosquitto_loop_start(mosq); if (rc != MOSQ_ERR_SUCCESS) { mosquitto_destroy(mosq); vLog(LOG_ERROR, "mosquitto_loop_start Error: %s\n", mosquitto_strerror(rc)); break; } } unsigned int m_runCount = 0; unsigned int count = 0; unsigned int critical = 0; CChangeMaster masterTci; if (!masterTci.Init()) { break; } masterTci.MasterTciFirstRun(); time_t last_sec = 0; while (TRUE) { m_runCount++; masterTci.MasterSend(); usleep(MASTER_TCI_SEND_INTERVAL); if (MASTER_TCI == CChangeMaster::m_tcitype) { if (m_runCount > count) { count = m_runCount; critical = 0; } else { critical++; if (critical > 15) { break; } } if (enable_auto_platform) { BOOLEAN sec_changed = FALSE; if (last_sec != (time_t)system32.timers) { last_sec = system32.timers; sec_changed = TRUE; } MakeYKFrame(mosq); MakeYTFrame(mosq); if (sec_changed) { if (struMonLinkLog.m_iStartTime > 0) { if (struMonLinkLog.m_iStartTime > 0 && system32.timers <= (struMonLinkLog.m_iStartTime + 30)) //默认为30s,调试设置为2s { //发送报文 publishMonLinkLog(mosq); } else { channelBuffer.enabled = FALSE; channelBuffer.mon_port = -1; struMonLinkLog.m_iLinkIrn = -1; struMonLinkLog.m_iStartTime = 0; } } publishYXBWData(mosq); publishSOEData(mosq); if ((last_sec % 60) == 0) { //更新数据 publishRealData(mosq); } if ((last_sec % 20) == 0) { heart_beat(mosq, status); } } } } if (g_dataAcquisitionReload) { break; } } if (critical > 15) { vLog(LOG_ERROR, "unknow error.\n"); } masterTci.ChangeDelete(); } while(0); pthread_mutex_destroy(&mutex); mosquitto_lib_cleanup(); if (zlog_inited) zlog_fini(); #ifdef WIN32 if (g_bSocketStarted) WSACleanup(); #endif //WIN32 vLog(LOG_DEBUG, "system stop okay.\n"); return EXIT_SUCCESS; }