From c6d2bd6a5654d210fedccf757c691ded0bdaa8a3 Mon Sep 17 00:00:00 2001 From: zhouhuang Date: Tue, 10 Dec 2024 10:14:40 +0800 Subject: [PATCH] update --- das-dn/cmg/changemaster.cpp | 6 +- das-dn/cmg/changemaster.h | 8 +- das-dn/cmg/ry.cpp | 139 +++++++++++++----- das-dn/cmg/ry.h | 2 + das-dn/hostadsbf/hostadsbf.cpp | 96 ++++++------ das-dn/hostadsbf/hostadsbf.h | 12 +- das-dn/inc/public.h | 36 +---- das-dn/minio/ftp2minio.cpp | 109 ++++++++++---- das-dn/minio/ftp2minio.h | 14 +- das-dn/third_party/AdsLib/AdsDevice.cpp | 3 +- das-dn/third_party/AdsLib/Sockets.cpp | 16 +- das-dn/third_party/AdsLib/Sockets.h | 1 + .../AdsLib/Standalone/AmsConnection.cpp | 7 +- 13 files changed, 289 insertions(+), 160 deletions(-) diff --git a/das-dn/cmg/changemaster.cpp b/das-dn/cmg/changemaster.cpp index 1c0df763..1d27a769 100644 --- a/das-dn/cmg/changemaster.cpp +++ b/das-dn/cmg/changemaster.cpp @@ -394,7 +394,7 @@ void CChangeMaster::StartUp(void) if (procs[i] != NULL) { -#if 0 +#if 1 if (!procs[i]->Create(i)) { procs[i]->Destroy(); @@ -407,10 +407,9 @@ void CChangeMaster::StartUp(void) } } } - +#if 0 void CChangeMaster::tryStartup(int pid) { - vLog(LOG_DEBUG, "pid is: %d, create it\n", pid); int i = pid; if (!procs[i]->Create(i)) { @@ -419,6 +418,7 @@ void CChangeMaster::tryStartup(int pid) procs[i] = NULL; } } +#endif void CChangeMaster::MasterTciFirstRun() { diff --git a/das-dn/cmg/changemaster.h b/das-dn/cmg/changemaster.h index 2da2c1bc..b651e116 100644 --- a/das-dn/cmg/changemaster.h +++ b/das-dn/cmg/changemaster.h @@ -3,8 +3,8 @@ #include "public.h" #include -#include -#include +//#include +//#include #define MASTER_TCI_ALIVE 0x55 #define MASTER_TCI_SEND_INTERVAL (1000*200) //sleep(1) @@ -73,8 +73,8 @@ private: void StartUp(void); - std::vector threads; - void tryStartup(int pid); +// std::vector threads; +// void tryStartup(int pid); }; void *ChangeNewObject(void *args); diff --git a/das-dn/cmg/ry.cpp b/das-dn/cmg/ry.cpp index fbcc194f..4cc96be6 100644 --- a/das-dn/cmg/ry.cpp +++ b/das-dn/cmg/ry.cpp @@ -315,7 +315,7 @@ bool CRYDevice::publish_sensor_data(const std::string traceId, const char* comma jsonRoot["data"] = payload; std::string outputConfig = Json::writeString(builder, jsonRoot); - +#if 0 if (traceId != "") { vLog(LOG_DEBUG, "send cmd: %s, payload: %d\n", command, outputConfig.length()); @@ -327,6 +327,7 @@ bool CRYDevice::publish_sensor_data(const std::string traceId, const char* comma vLog(LOG_DEBUG, "send cmd: %s, payload: %d\n", command, outputConfig.length()); } } +#endif #ifndef USE_NOPOLL_WEBSOCKET //vLog(LOG_DEBUG, "push a frame <%s>\n", command); sendMsg(outputConfig); @@ -444,6 +445,42 @@ float CRYDevice::GetUnitYCRealFromValue(int uid, int order, long value) const return (float)(value * coef + base); } +BOOLEAN CRYDevice::GetUnitYCIsForceArchive(int uid, int order) const +{ + int udb; + struUnit* pUnit; + struUnitYC* pYC; + if (uid < 0 || uid >= UNIT_NUM) return 0; + pUnit = &config.units[uid]; + if ((pUnit->state & 0x01) != TRUE) return 0; + if (order < 0 || order >= pUnit->yccount) return 0; + pYC = &pUnit->ycs[order]; + udb = pYC->order; + if (udb < 0 || udb >= DATABASE_YC_NUM) + { + return FALSE; + } + return pYC->forceArchive; +} + +BOOLEAN CRYDevice::GetUnitYXIsForceArchive(int uid, int order) const +{ + int udb; + struUnit* pUnit; + struUnitYX* pYX; + if (uid < 0 || uid >= UNIT_NUM) return 0; + pUnit = &config.units[uid]; + if ((pUnit->state & 0x01) != TRUE) return 0; + if (order < 0 || order >= pUnit->yxcount) return 0; + pYX = &pUnit->yxs[order]; + udb = pYX->order; + if (udb < 0 || udb >= DATABASE_YX_NUM) + { + return FALSE; + } + return pYX->forceArchive; +} + BOOLEAN CRYDevice::GetUnitYCIsFloat(int uid, int order) const { int udb; @@ -1231,22 +1268,22 @@ BOOLEAN CRYDevice::processRyFTP2MinioParam(const Json::Value jsonRoot, int pid) //FTP参数 //用户名 - if (jsonRoot["userName"].isString()) { - snprintf(config_config.processes[pid].option.ftp2minio.ftp.user, sizeof(config_config.processes[pid].option.ftp2minio.ftp.user), "%s", jsonRoot["userName"].asCString()); + if (jsonRoot["ftpUser"].isString()) { + snprintf(config_config.processes[pid].option.ftp2minio.ftp.user, sizeof(config_config.processes[pid].option.ftp2minio.ftp.user), "%s", jsonRoot["ftpUser"].asCString()); } else { - snprintf(config_config.processes[pid].option.ftp2minio.ftp.user, sizeof(config_config.processes[pid].option.ftp2minio.ftp.user), "%s", "administrator"); + snprintf(config_config.processes[pid].option.ftp2minio.ftp.user, sizeof(config_config.processes[pid].option.ftp2minio.ftp.user), "%s", "guest"); } //密码 - if (jsonRoot["passWord"].isString()) { - snprintf(config_config.processes[pid].option.ftp2minio.ftp.password, sizeof(config_config.processes[pid].option.ftp2minio.ftp.password), "%s", jsonRoot["passWord"].asCString()); + if (jsonRoot["ftpPassword"].isString()) { + snprintf(config_config.processes[pid].option.ftp2minio.ftp.password, sizeof(config_config.processes[pid].option.ftp2minio.ftp.password), "%s", jsonRoot["ftpPassword"].asCString()); } else { - snprintf(config_config.processes[pid].option.ftp2minio.ftp.password, sizeof(config_config.processes[pid].option.ftp2minio.ftp.password), "%s", "123456"); + snprintf(config_config.processes[pid].option.ftp2minio.ftp.password, sizeof(config_config.processes[pid].option.ftp2minio.ftp.password), "%s", "1"); } //远程路径 - if (jsonRoot["remotePath"].isString()) { - snprintf(config_config.processes[pid].option.ftp2minio.ftp.remotePath, sizeof(config_config.processes[pid].option.ftp2minio.ftp.remotePath), "%s", jsonRoot["remotePath"].asCString()); + if (jsonRoot["rootDir"].isString()) { + snprintf(config_config.processes[pid].option.ftp2minio.ftp.remotePath, sizeof(config_config.processes[pid].option.ftp2minio.ftp.remotePath), "%s", jsonRoot["rootDir"].asCString()); } else { - snprintf(config_config.processes[pid].option.ftp2minio.ftp.remotePath, sizeof(config_config.processes[pid].option.ftp2minio.ftp.remotePath), "%s", "Hard Disk2/data/rtdatalog"); + snprintf(config_config.processes[pid].option.ftp2minio.ftp.remotePath, sizeof(config_config.processes[pid].option.ftp2minio.ftp.remotePath), "%s", "Hard Disk/data/Tracelog;Hard Disk/data/Statuscode"); } //本地路径 if (jsonRoot["localPath"].isString()) { @@ -2284,6 +2321,7 @@ bool CRYDevice::dealConfigFile(const Json::Value jsonRoot) config_config.units[uid].ycs[k].limit2Enable = FALSE; config_config.units[uid].ycs[k].limit2Low = 0; config_config.units[uid].ycs[k].limit2High = 0.0f; + config_config.units[uid].ycs[k].forceArchive = FALSE; config_config.units[uid].ycs[k].highSpeed = ycs[k].highSpeed; Json::Value param = ycs[k].params; if (!param.isNull()) { @@ -2298,6 +2336,7 @@ bool CRYDevice::dealConfigFile(const Json::Value jsonRoot) if (param["limit2Low"].isDouble()) config_config.units[uid].ycs[k].limit2Low = param["limit2Low"].asFloat(); if (param["limit1High"].isDouble()) config_config.units[uid].ycs[k].limit1High = param["limit1High"].asFloat(); if (param["limit2High"].isDouble()) config_config.units[uid].ycs[k].limit2High = param["limit2High"].asFloat(); + if (param["forceArchive"].isInt()) config_config.units[uid].ycs[k].forceArchive = param["forceArchive"].asInt(); //vLog(LOG_DEBUG, "config_config.units[%d].ycs[%d].coef is: %f\n", uid, k, config_config.units[uid].ycs[k].coef); switch (config_config.processes[i].proto) { @@ -2451,12 +2490,14 @@ bool CRYDevice::dealConfigFile(const Json::Value jsonRoot) } config_config.units[uid].yxs[k].value = 0; config_config.units[uid].yxs[k].qds = 0x80; //默认为无效 - config_config.units[uid].yxs[k].invert = 0; + config_config.units[uid].yxs[k].invert = FALSE; + config_config.units[uid].yxs[k].forceArchive = FALSE; config_database.yxs[config_config.units[uid].yxs[k].order].auto_reset = 0; Json::Value param = yxs[k].params; if (!param.isNull()) { if (param["invert"].asInt()) config_config.units[uid].yxs[k].invert = param["invert"].asInt(); + if (param["forceArchive"].isInt()) config_config.units[uid].yxs[k].forceArchive = param["forceArchive"].asInt(); switch (config_config.processes[i].proto) { case PROTOCOL_HOST_MODBUS_RTU: case PROTOCOL_HOST_MODBUS_TCP: @@ -2775,7 +2816,7 @@ void CRYDevice::heart_beat(int status) } } if (jsonLink.size() > 0) { - vLog(LOG_DEBUG, "link is: %d\n", jsonLink.size()); + //vLog(LOG_DEBUG, "link is: %d\n", jsonLink.size()); payload["links"] = jsonLink; } for (int i = 0; i < UNIT_NUM; i++) { @@ -2793,7 +2834,7 @@ void CRYDevice::heart_beat(int status) } } if (jsonLink.size() > 0) { - vLog(LOG_DEBUG, "device is: %d\n", jsonDevice.size()); + //vLog(LOG_DEBUG, "device is: %d\n", jsonDevice.size()); payload["devices"] = jsonDevice; } } @@ -2850,26 +2891,41 @@ bool CRYDevice::publishAnalogData(int uid) if (uid < 0 || uid >= UNIT_NUM) return false; Json::Value root; Json::Value values; + Json::Value archiveValues; int count = GetUnitYCCount(uid); if (count) { for (int i = 0; i < count; i++) { - if (GetUnitYCIsFloat(uid, i)) { - values[(const char *)config.units[uid].ycs[i].name] = GetUnitYCReal(uid, i); - } else { - values[(const char *)config.units[uid].ycs[i].name] = GetUnitYC(uid, i); + if (GetUnitYCIsForceArchive(uid, i)) + { + if (GetUnitYCIsFloat(uid, i)) { + archiveValues[(const char *)config.units[uid].ycs[i].name] = GetUnitYCReal(uid, i); + } else { + archiveValues[(const char *)config.units[uid].ycs[i].name] = GetUnitYC(uid, i); + } + } + else + { + if (GetUnitYCIsFloat(uid, i)) { + values[(const char *)config.units[uid].ycs[i].name] = GetUnitYCReal(uid, i); + } else { + values[(const char *)config.units[uid].ycs[i].name] = GetUnitYC(uid, i); + } } } } + Json::Int64 datatime = (Json::Int64)time(NULL); + datatime *= 1000; + root["dataTime"] = datatime; + root["deviceId"] = static_units[uid].deviceId; +// root["isStore"] = config.units[uid].state & 0x20 ? false : true; + if (values.size()) { - Json::Int64 datatime = (Json::Int64)time(NULL); - datatime *= 1000; - root["dataTime"] = datatime; - root["deviceId"] = static_units[uid].deviceId; - root["isStore"] = config.units[uid].state & 0x20 ? false : true; root["values"] = values; - return publish_sensor_data("", "analogData", root); } - return false; + if (archiveValues.size()) { + root["archiveValues"] = archiveValues; + } + return publish_sensor_data("", "analogData", root); } bool CRYDevice::publishStateData(int uid) @@ -2877,22 +2933,33 @@ bool CRYDevice::publishStateData(int uid) if (uid < 0 || uid >= UNIT_NUM) return false; Json::Value root; Json::Value values; + Json::Value archiveValues; int count = GetUnitYXCount(uid); if (count) { for (int i = 0; i < count; i++) { - values[(const char *)config.units[uid].yxs[i].name] = GetUnitYX(uid, i); + //判断是否要加入values或是archiveValues + if (GetUnitYXIsForceArchive(uid, i)) + { + archiveValues[(const char *)config.units[uid].yxs[i].name] = GetUnitYX(uid, i); + } + else + { + values[(const char *)config.units[uid].yxs[i].name] = GetUnitYX(uid, i); + } } } + Json::Int64 datatime = (Json::Int64)time(NULL); + datatime *= 1000; + root["dataTime"] = datatime; + root["deviceId"] = static_units[uid].deviceId; +// root["isStore"] = config.units[uid].state & 0x20 ? false : true; if (values.size()) { - Json::Int64 datatime = (Json::Int64)time(NULL); - datatime *= 1000; - root["dataTime"] = datatime; - root["deviceId"] = static_units[uid].deviceId; - root["isStore"] = config.units[uid].state & 0x20 ? false : true; root["values"] = values; - return publish_sensor_data("", "stateData", root); } - return false; + if (archiveValues.size()) { + root["archiveValues"] = archiveValues; + } + return publish_sensor_data("", "stateData", root); } bool CRYDevice::publishdeviceEventData(void) @@ -2945,7 +3012,6 @@ bool CRYDevice::publishdeviceEventData(void) if (root.size()) { return publish_sensor_data("", "deviceEvent", root); - //vLog(LOG_DEBUG, "%s", root.toStyledString().c_str()); } return FALSE; @@ -3175,11 +3241,8 @@ bool CRYDevice::ry_run(void) MakeYKFrame(i); MakeYTFrame(i); if (sec_changed) { - //publishinitDeviceData(i); -// if ((last_sec % 10) == 0) { //更新数据 - publishAnalogData(i); - publishStateData(i); -// } + publishAnalogData(i); + publishStateData(i); } } return m_dataAcquisitionReload; diff --git a/das-dn/cmg/ry.h b/das-dn/cmg/ry.h index fbb2dca1..df6dced6 100644 --- a/das-dn/cmg/ry.h +++ b/das-dn/cmg/ry.h @@ -149,9 +149,11 @@ private: float GetUnitYCReal(int uid, int order) const; float GetUnitYCRealFromValue(int uid, int order, long value) const; BOOLEAN GetUnitYCIsFloat(int uid, int order) const; + BOOLEAN GetUnitYCIsForceArchive(int uid, int order) const; float GetUnitYCLimitReal(int uid, int order, int type = 1) const; float GetUnitYMReal(int uid, int order); BYTE GetUnitYX(int uid, int point); + BOOLEAN GetUnitYXIsForceArchive(int uid, int order) const; int GetUnitYXBW(int& uid, BOOLEAN& value, unionCP56Time& st); int GetUnitSOE(int& uid, BOOLEAN& value, BYTE& qds, unionCP56Time& st); int GetUnitYCBW(int& uid, LONG& value, int& type, unionCP56Time& st); diff --git a/das-dn/hostadsbf/hostadsbf.cpp b/das-dn/hostadsbf/hostadsbf.cpp index 14f93bc3..71ca2ff1 100644 --- a/das-dn/hostadsbf/hostadsbf.cpp +++ b/das-dn/hostadsbf/hostadsbf.cpp @@ -377,7 +377,7 @@ static void* ryftp_process(void* param) //char pathName[128] = "./"; char processName[128]; - snprintf(processName, sizeof(processName), "%s", mbt->GetCurProcessName()); + snprintf(processName, sizeof(processName), "%s", mbt->m_pidName.c_str()); //配置远方路径 char* escaped_string = escape_char_in_string(mbt->m_remotePath, ' '); if (escaped_string == NULL) @@ -429,12 +429,12 @@ static void* ryftp_process(void* param) if (GetUnitYCType(uid, i)) { if (m_datalen2mbaddr_map.find(register_addr) != m_datalen2mbaddr_map.end()) { highspeedmap.insert(datatypemap::value_type(i, m_datalen2mbaddr_map[register_addr])); //point-配置的序号,order-数据文件序号 - vLog(LOG_DEBUG, "register_addr is: %d, point is: %d, and order is: %d\n", register_addr, i, m_datalen2mbaddr_map[register_addr]); + //vLog(LOG_DEBUG, "register_addr is: %d, point is: %d, and order is: %d\n", register_addr, i, m_datalen2mbaddr_map[register_addr]); } } else { if (m_datalen2mbaddr_map.find(register_addr) != m_datalen2mbaddr_map.end()) { lowspeedmap.insert(datatypemap::value_type(i, m_datalen2mbaddr_map[register_addr])); //point-配置的序号,order-数据文件序号 - vLog(LOG_DEBUG, "register_addr is: %d, point is: %d, and order is: %d\n", register_addr, i, m_datalen2mbaddr_map[register_addr]); + //vLog(LOG_DEBUG, "register_addr is: %d, point is: %d, and order is: %d\n", register_addr, i, m_datalen2mbaddr_map[register_addr]); } } } @@ -449,7 +449,7 @@ static void* ryftp_process(void* param) pos.point = m_datalen2mbaddr_map[register_addr]; pos.pos = data_pos; yxdatamap.insert(datatypeposmap::value_type(i, pos)); //point-配置的序号,order-数据文件序号 - vLog(LOG_DEBUG, "register_addr is: %d, point is: %d, and order is: %d\n", register_addr, i, m_datalen2mbaddr_map[register_addr]); + //vLog(LOG_DEBUG, "register_addr is: %d, point is: %d, and order is: %d\n", register_addr, i, m_datalen2mbaddr_map[register_addr]); } } @@ -664,6 +664,10 @@ CHostADSBFProcess::~CHostADSBFProcess() fclose(pf); } } + if (m_threadRun) { + m_threadRun = FALSE; + m_startup.join(); + } } BOOLEAN CHostADSBFProcess::calc(void) @@ -777,6 +781,32 @@ BOOLEAN CHostADSBFProcess::calc(void) return TRUE; } +void CHostADSBFProcess::tryStartup() +{ + m_turbine = NULL; + while (m_threadRun) { + if (m_turbine == NULL) { + SetLocalAmsNetId(AmsNetId(m_localNetId)); + m_turbine = new AdsDevice{m_remoteIp, AmsNetId(m_remoteNetId), m_remotePort}; + const auto state = m_turbine->GetState(); + vLog(LOG_DEBUG, "%s ADS state: %d devState: %d\n", m_pidName.c_str(), (uint16_t)state.ads, (uint16_t)state.device); + if ((uint16_t)state.ads >= ADSSTATE::ADSSTATE_MAXSTATES || (uint16_t)state.device >= ADSSTATE::ADSSTATE_MAXSTATES) + { + vLog(LOG_DEBUG, "read device state error.\n"); + long ret = AddRemoteRoute(m_remoteIp, m_localNetId, m_localIp, std::string("isoftstone"), std::string("admin"), std::string("admin")); + vLog(LOG_DEBUG, "bbb %s add route return value is: %d\n", m_pidName.c_str(), ret); + delete m_turbine; + m_turbine = NULL; + } + } else { + if (m_turbine->IsConnected()) continue; + delete m_turbine; + m_turbine = NULL; + } + sleep(5); + } +} + BOOLEAN CHostADSBFProcess::OnPreCreate(int id) { if (!CProcess::OnPreCreate(id)) return FALSE; @@ -786,14 +816,10 @@ BOOLEAN CHostADSBFProcess::OnPreCreate(int id) vLog(LOG_DEBUG, "润阳ADS读取配置错误。"); return FALSE; } -#if 0 - m_localIp = m_nOptions.localIp; //本机IP地址 - m_localNetId = m_nOptions.localNetId; //本机IP地址 - m_remoteIp = m_nOptions.remoteIp; //PLC设备ip地址 - m_remoteNetId = m_nOptions.remoteNetId; -#endif + char ipaddress[32]; + m_pidName = std::string(GetCurProcessName()); //本机IP和netid memset(ipaddress, '\0', sizeof(ipaddress)); inet_ntop(AF_INET, &m_nOptions.net.bind_addr, ipaddress, 16); @@ -810,35 +836,11 @@ BOOLEAN CHostADSBFProcess::OnPreCreate(int id) m_bHaveFTP = m_nOptions.bHaveFTP; calc(); - vLog(LOG_DEBUG, "local ip is: %s, netid is: %s, remote ip is: %s, and netid is: %s\n", m_localIp.c_str(), m_localNetId.c_str(), m_remoteIp.c_str(), m_remoteNetId.c_str()); - //AddRemoteRoute(m_remoteIp, m_localNetId, m_localIp, std::string("isoftstone"), std::string("guest"), std::string("1")); - - try { - SetLocalAmsNetId(AmsNetId(m_localNetId)); - AdsDevice route {m_remoteIp, AmsNetId(m_remoteNetId), m_remotePort}; - if (!readDeviceState(route)) - { - vLog(LOG_DEBUG, "read device state error.\n"); - } - } catch (const AdsException& ex) { - vLog(LOG_ERROR, "aaa %s AdsException message: %s, remote is: %s:%d\n", GetCurProcessName(), ex.what(), m_remoteIp.c_str(), m_remotePort); - long ret = AddRemoteRoute(m_remoteIp, m_localNetId, m_localIp, std::string("isoftstone"), std::string("guest"), std::string("1")); - vLog(LOG_DEBUG, "bbb %s add route return value is: %d\n", GetCurProcessName(), ret); - - } catch (const std::runtime_error& ex) { - vLog(LOG_ERROR, "***%s\n", ex.what()); - } - sleep(1); - try { - SetLocalAmsNetId(AmsNetId(m_localNetId)); - m_turbine = new AdsDevice{m_remoteIp, AmsNetId(m_remoteNetId), m_remotePort}; - //readDeviceState(*m_turbine); - } catch (const AdsException& ex) { - vLog(LOG_ERROR, "%s AdsException message: %s, remote is: %s:%d\n", GetCurProcessName(), ex.what(), m_remoteIp.c_str(), m_remotePort); - } catch (const std::runtime_error& ex) { - vLog(LOG_ERROR, "%s\n", ex.what()); - } + vLog(LOG_DEBUG, "%s local ip is: %s, netid is: %s, remote ip is: %s, and netid is: %s\n", m_pidName.c_str(), m_localIp.c_str(), m_localNetId.c_str(), m_remoteIp.c_str(), m_remoteNetId.c_str()); + m_turbine = NULL; + m_threadRun = TRUE; + m_startup = std::thread(&CHostADSBFProcess::tryStartup, this); if (m_bHaveFTP) { snprintf(m_user, sizeof(m_user), "%s", m_nOptions.ftp.user); @@ -879,7 +881,7 @@ BOOLEAN CHostADSBFProcess::OnPreCreate(int id) return TRUE; } char name[17]; - snprintf(name, 16, "%s_ftp", GetCurProcessName()); + snprintf(name, 16, "%s_ftp", m_pidName.c_str()); pthread_setname_np(m_pid, name); pthread_attr_destroy(&attr); @@ -908,9 +910,9 @@ BOOLEAN CHostADSBFProcess::OnTimer(void) } if (sec_changed) { - //if (m_turbine->IsConnected()) - try - { + if (m_turbine == NULL) return TRUE; + if (!m_turbine->IsConnected()) return TRUE; + try { if (m_bHaveFTP) { //启动时读取一次,后面自己维护序号 if ((m_currentDirNo == -1) && (m_currentFileNo == -1)) @@ -920,10 +922,9 @@ BOOLEAN CHostADSBFProcess::OnTimer(void) } readRealData(); } catch (const AdsException& ex) { - //vLog(LOG_ERROR, "Error: %d\n", ex.errorCode); - vLog(LOG_ERROR, "%s AdsException message: %s, remote is: %s:%d\n", GetCurProcessName(), ex.what(), m_remoteIp.c_str(), m_remotePort); + vLog(LOG_ERROR, "%s AdsException message: %s, remote is: %s:%d\n", m_pidName.c_str(), ex.what(), m_remoteIp.c_str(), m_remotePort); } catch (const std::runtime_error& ex) { - vLog(LOG_ERROR, "%s\n", ex.what()); + vLog(LOG_ERROR, "%s %s\n", m_pidName.c_str(), ex.what()); } } return TRUE; @@ -946,6 +947,7 @@ BOOLEAN CHostADSBFProcess::readFileID() int uid = GetCurUnitID(); if (uid < 0 || uid >= UNIT_NUM) return TRUE; + vLog(LOG_DEBUG, "%s here read file info\n", m_pidName.c_str()); AdsVariable wPathInfoInvalid {*m_turbine, ".gwPathInfoInvalid"}; //vLog(LOG_DEBUG, "Read back with first value %d\n", (WORD)wPathInfoInvalid); AdsVariable wCurrentFolderNo {*m_turbine, ".gwCurrentFolderNo"}; @@ -987,10 +989,10 @@ BOOLEAN CHostADSBFProcess::readFileID() m_bHaveUnReadFile = TRUE; int dir_count = m_curStartDirNo - m_lastReadDirNo + 1; int file_count = m_curStartFileNo - m_lastReadFileNo; - vLog(LOG_DEBUG, "%s 总共有%d个目录的%d个文件未读取。\n", GetCurProcessName(), dir_count, file_count); + vLog(LOG_DEBUG, "%s 总共有%d个目录的%d个文件未读取。\n", m_pidName.c_str(), dir_count, file_count); } - vLog(LOG_DEBUG, "%s 最新文件夹编号: %ld, 最新文件名编号: %ld: 最新文件夹中第一个文件的编号: %ld\n", GetCurProcessName(), m_currentDirNo, m_currentFileNo, m_currentDirStartFileNo); + vLog(LOG_DEBUG, "%s 最新文件夹编号: %ld, 最新文件名编号: %ld: 最新文件夹中第一个文件的编号: %ld\n", m_pidName.c_str(), m_currentDirNo, m_currentFileNo, m_currentDirStartFileNo); return TRUE; diff --git a/das-dn/hostadsbf/hostadsbf.h b/das-dn/hostadsbf/hostadsbf.h index e18d8afe..2bf22ed9 100644 --- a/das-dn/hostadsbf/hostadsbf.h +++ b/das-dn/hostadsbf/hostadsbf.h @@ -3,6 +3,7 @@ #include "ryFileDef.h" #include "process.h" +#include #include "Log.h" #include "AdsLib.h" #include "AdsVariable.h" @@ -20,6 +21,8 @@ typedef struct register2typemap adsDataBlocks; } struADSData; typedef std::vector adsReadDataVector; + + class CHostADSBFProcess : public CProcess { public: @@ -32,6 +35,7 @@ public: private: struRYADSOption m_nOptions; + //增加websocket连接 pthread_t m_pid; @@ -45,8 +49,14 @@ private: int m_total_length; DWORD last_sec; - struADSData m_adsDatas[4]; + BOOLEAN m_threadRun; + std::thread m_startup; + void tryStartup(); + struADSData m_adsDatas[4]; + public: + std::string m_pidName; + BOOLEAN m_bHaveFTP; //存在FTP协议 //ftp参数信息 diff --git a/das-dn/inc/public.h b/das-dn/inc/public.h index 7657b495..620ba54f 100644 --- a/das-dn/inc/public.h +++ b/das-dn/inc/public.h @@ -767,15 +767,10 @@ typedef struct QLONG irn; //是否取反,default: 0 BOOLEAN invert; + //强制归档,default: 0 + BOOLEAN forceArchive; } struUnitYX; -#if 0 -typedef struct -{ - char name[(MAX_NAME_SIZE << 2)]; -} struUnitYXStatic; -#endif - typedef struct { char name[(MAX_NAME_SIZE << 2)]; @@ -809,15 +804,10 @@ typedef struct float limit2High; //限值2下限 float limit2Low; + //强制归档,default: 0 + BOOLEAN forceArchive; } struUnitYC; -#if 0 -typedef struct -{ - char name[(MAX_NAME_SIZE << 2)]; -} struUnitYCStatic; -#endif - typedef struct { short order; @@ -841,13 +831,6 @@ typedef struct QLONG irn; } struUnitYM; -#if 0 -typedef struct -{ - char name[(MAX_NAME_SIZE << 2)]; -} struUnitYMStatic; -#endif - typedef struct { char name[(MAX_NAME_SIZE << 2)]; @@ -871,13 +854,6 @@ typedef struct QLONG irn; } struUnitYT; -#if 0 -typedef struct -{ - char name[(MAX_NAME_SIZE << 2)]; -} struUnitYTStatic; -#endif - typedef struct { BYTE state; @@ -1293,10 +1269,6 @@ extern "C" BOOLEAN WriteStaticUnitCFG(void); BOOLEAN ReadStaticUnitCFG(void); void vLog(eLogLevel eLevel, const char* szFmt, ...); -#if 0 - int validate_utf8(const char *str, int len); - int random_bytes(void *bytes, int count); -#endif DWORD ReadMsgRingBuff(char *rbuff, DWORD len); #ifdef __cplusplus } diff --git a/das-dn/minio/ftp2minio.cpp b/das-dn/minio/ftp2minio.cpp index b4ae9e1e..6b330300 100644 --- a/das-dn/minio/ftp2minio.cpp +++ b/das-dn/minio/ftp2minio.cpp @@ -1,6 +1,7 @@ #include "ftp2minio.h" #include #include +#include FtpManage::FtpManage(const std::string user, const std::string password, const std::string id) :Ftp_ip(id), User(user), Password(password) @@ -127,7 +128,6 @@ bool FtpManage::GetfilenameFromftp(const std::string filePath) std::string path = Ftp_ip + filePath; std::string fileName; // 文件名列表保存位置 - vLog(LOG_DEBUG, "path is: %s", path.c_str()); if (curl) { curl_easy_setopt(curl, CURLOPT_URL, path.c_str()); // 设置访问URL @@ -233,11 +233,12 @@ BOOLEAN CFtp2MinioProcess::OnPreCreate(int id) char remotePath[128] = "\0"; char localPath[128]; char url[256]; - char listpath[256]; + //char listpath[256]; snprintf(user, sizeof(user), "%s", m_nOptions.ftp.user); snprintf(password, sizeof(password), "%s", m_nOptions.ftp.password); snprintf(localPath, sizeof(localPath), "%s", m_nOptions.ftp.localPath); m_localPath = std::string(localPath); + if (m_localPath.back() != '/') m_localPath += std::string("/"); char *escaped_string = escape_char_in_string(m_nOptions.ftp.remotePath, ' '); if (!escaped_string) return FALSE; @@ -248,10 +249,22 @@ BOOLEAN CFtp2MinioProcess::OnPreCreate(int id) inet_ntop(AF_INET, &target_addr, ipaddress, 16); snprintf(url, sizeof(url), "ftp://%s", ipaddress); - snprintf(listpath, sizeof(listpath), "/%s", remotePath); + + std::string listpaths(remotePath); + std::vector splits = split(listpaths, ';'); + for (std::vector::iterator it = splits.begin(); it != splits.end(); it++) + { + if ((*it).back() != '/') + { + m_listPaths.push_back("/" + (*it) + "/"); + } + else + { + m_listPaths.push_back("/" + (*it)); + } + } m_pAftp = new FtpManage(user, password, url); - m_listPath = std::string(listpath); fileName2Id_map.clear(); //读取列表 @@ -268,6 +281,17 @@ BOOLEAN CFtp2MinioProcess::Run(void) return TRUE; } +std::optional CFtp2MinioProcess::extractDate(const std::string& filename, const std::string& regex) +{ + std::regex datePattern(regex);//(R"(\d{4}-\d{2}-\d{2})"); + std::smatch match; + if (std::regex_search(filename, match, datePattern)) + { + return match.str(); // 返回匹配到的日期字符串 + } + return std::nullopt; // 没有找到日期则返回 nullopt +} + BOOLEAN CFtp2MinioProcess::OnTimer(void) { if (!CProcess::OnTimer()) return FALSE; @@ -279,37 +303,43 @@ BOOLEAN CFtp2MinioProcess::OnTimer(void) min_changed = TRUE; } if (min_changed) - { - vLog(LOG_DEBUG, "准备读取文件夹%s的内容\n", m_listPath.c_str()); - for (const auto& n : m_pAftp->GetFilesName(m_listPath)) + { + for (std::vector::iterator it = m_listPaths.begin(); it != m_listPaths.end(); it++) { - std::string remotefile = m_listPath + "/" + n; - std::string localpath = m_localPath; - if (fileName2Id_map.find(n) == fileName2Id_map.end()) + m_listPath = (*it); + if (m_listPath.empty()) continue; + // 检查字符串的最后一个字符是否为 '/' + vLog(LOG_DEBUG, "准备读取文件夹%s的内容\n", m_listPath.c_str()); + for (const auto& n : m_pAftp->GetFilesName(m_listPath)) { - fileName2Id_map.insert(fileName2Idmap::value_type(n, 0)); - if (m_pAftp->DownloadFile(remotefile.c_str(), localpath.c_str())) + std::string remotefile = m_listPath + n; + std::string localpath = m_localPath; + if (fileName2Id_map.find(n) == fileName2Id_map.end()) { - vLog(LOG_DEBUG, "下载成功!\n"); - push2minio(n); + fileName2Id_map.insert(fileName2Idmap::value_type(n, 0)); + if (m_pAftp->DownloadFile(remotefile.c_str(), localpath.c_str())) + { + vLog(LOG_DEBUG, "下载成功!\n"); + std::vector tokens = split(m_listPath, '/'); + push2minio(tokens[tokens.size() - 1], n); + } + } + else + { + vLog(LOG_WARN, "该文件:%s已经被下载。\n", n.c_str()); } } - else + if (last_count != fileName2Id_map.size()) { - vLog(LOG_WARN, "该文件:%s已经被下载。\n", n.c_str()); - return TRUE; + last_count = fileName2Id_map.size(); + saveMapToFile(); } } - if (last_count != fileName2Id_map.size()) - { - last_count = fileName2Id_map.size(); - saveMapToFile(); - } } return TRUE; } -BOOLEAN CFtp2MinioProcess::push2minio(std::string pathName) +BOOLEAN CFtp2MinioProcess::push2minio(std::string parentDir, std::string pathName) { //创建URL //minio::s3::BaseUrl base_url("http://192.168.109.187:9000"); @@ -324,7 +354,7 @@ BOOLEAN CFtp2MinioProcess::push2minio(std::string pathName) minio::s3::Client client(base_url, &provider); //std::string bucket_name = "test"; std::string bucket_name = std::string(m_nOptions.minio.bucket); - + // 检查test桶是否存在 bool exist; { @@ -358,10 +388,37 @@ BOOLEAN CFtp2MinioProcess::push2minio(std::string pathName) //上传到桶中的绝对路径 //char *pYMDhms = strchr((char *)pathName.c_str(), '.'); //args.object = "/A-001/Alarm/" + pathName; - args.object = std::string(m_nOptions.minio.object) + pathName; + //args.object = std::string(m_nOptions.minio.object) + pathName; + + auto date = extractDate(pathName, R"(\d{4}-\d{2}-\d{2})"); + std::string date_string; + if (date.has_value()) + { + //vLog(LOG_DEBUG, "fileName is: %s, Date is: %s\n", pathName.c_str(), date.value().c_str()); + date_string = replaceChar(date.value(), '-', '/'); + //vLog(LOG_DEBUG, "date_string is: %s\n", date_string.c_str()); + } + else + { + date = extractDate(pathName, R"(\d{4}\d{2}\d{2})"); + if (date.has_value()) + { + //vLog(LOG_DEBUG, "fileName is: %s, Date is: %s\n", pathName.c_str(), date.value().c_str()); + date_string = date.value().substr(0, 4) + std::string("/") + date.value().substr(4, 2) + std::string("/") + date.value().substr(6, 2) + std::string("/"); + //vLog(LOG_DEBUG, "date_string is: %s\n", date_string.c_str()); + } + else + { + vLog(LOG_DEBUG, "文件不存在时间格式\n"); + return TRUE; + } + } + + //return TRUE; + args.object = std::string(GetCurProcessName()) + std::string("/") + parentDir + std::string("/") + date_string + std::string("/") + pathName; //本地文件系统中的绝对路径 - args.filename = pathName;//"/das/minio-example/test.txt"; + args.filename = m_localPath + pathName; minio::s3::UploadObjectResponse resp = client.UploadObject(args); if (!resp) { diff --git a/das-dn/minio/ftp2minio.h b/das-dn/minio/ftp2minio.h index a04a117b..8c45ff0d 100644 --- a/das-dn/minio/ftp2minio.h +++ b/das-dn/minio/ftp2minio.h @@ -4,6 +4,7 @@ #include "process.h" #include #include +#include #include #include #include @@ -47,6 +48,7 @@ public: private: FtpManage* m_pAftp; + std::vector m_listPaths; //需要查询的目录,用";"隔开 std::string m_listPath; //需要查询的目录 std::string m_localPath; //文件保存到本地的目录 @@ -59,7 +61,17 @@ private: void saveMapToFile(void); void loadMapFromFile(void); - BOOLEAN push2minio(std::string); + std::string replaceChar(const std::string& str, char oldChar, char newChar) { + std::string result = str; // 复制原始字符串以避免修改原始数据 + for (size_t i = 0; i < result.size(); ++i) { + if (result[i] == oldChar) { + result[i] = newChar; // 替换字符 + } + } + return result; + } + std::optional extractDate(const std::string& filename, const std::string& regex); + BOOLEAN push2minio(std::string, std::string); }; #endif //_ISS_FTP2MINIO_PROCESS_H_ diff --git a/das-dn/third_party/AdsLib/AdsDevice.cpp b/das-dn/third_party/AdsLib/AdsDevice.cpp index 3d8baaac..4a593e0e 100644 --- a/das-dn/third_party/AdsLib/AdsDevice.cpp +++ b/das-dn/third_party/AdsLib/AdsDevice.cpp @@ -33,8 +33,9 @@ AdsDevice::AdsDevice(const std::string& ipV4, AmsNetId amsNetId, uint16_t port) m_LocalPort(new long { OpenLocalPort() }, { CloseLocalPort }), m_Connected(false) { - if(*m_NetId.get() == amsNetId) + if(*m_NetId.get() == amsNetId) { m_Connected = true; + } } long AdsDevice::DeleteNotificationHandle(uint32_t handle) const diff --git a/das-dn/third_party/AdsLib/Sockets.cpp b/das-dn/third_party/AdsLib/Sockets.cpp index 093ae267..5f3dd79b 100644 --- a/das-dn/third_party/AdsLib/Sockets.cpp +++ b/das-dn/third_party/AdsLib/Sockets.cpp @@ -116,6 +116,7 @@ Socket::Socket(const struct addrinfo* const host, const int type) m_DestAddr(SOCK_DGRAM == type ? reinterpret_cast(&m_SockAddress) : nullptr), m_DestAddrLen(0), m_LastError(0), + m_type(type), m_Connected(false) { for (auto rp = host; rp; rp = rp->ai_next) { @@ -130,7 +131,7 @@ Socket::Socket(const struct addrinfo* const host, const int type) closesocket(m_Socket); m_Socket = INVALID_SOCKET; continue; - }else{ + } else { m_Connected = true; m_HostAddr = *(reinterpret_cast(rp->ai_addr)); } @@ -200,8 +201,11 @@ bool Socket::IsConnectedTo(const struct addrinfo* const targetAddresses) const size_t Socket::read(uint8_t* buffer, size_t maxBytes, timeval* timeout) { - if(m_Connected == false || IsValid() == false) - return 0; + if (m_type == SOCK_STREAM) { + if(m_Connected == false || IsValid() == false) { + return 0; + } + } if (!Select(timeout)) { return 0; @@ -254,13 +258,14 @@ bool Socket::Select(timeval* timeout) /* and check if socket was correct */ if (1 != state) { - LOG_ERROR("Socket select something strange happen while waiting for socket in state: " << - state << " with error: " << std::strerror(m_LastError)); + LOG_ERROR("Socket select something strange happen while waiting for socket in state: " << state << " with error: " << std::strerror(m_LastError)); return false; } if(!FD_ISSET(m_Socket, &readSockets)) + { return false; + } return true; } @@ -271,6 +276,7 @@ size_t Socket::write(const Frame& frame) LOG_ERROR("Socket write frame length: " << frame.size() << " exceeds maximum length."); return 0; } + if (!IsValid()) return 0; const int bufferLength = static_cast(frame.size()); const char* const buffer = reinterpret_cast(frame.data()); diff --git a/das-dn/third_party/AdsLib/Sockets.h b/das-dn/third_party/AdsLib/Sockets.h index 9d26f4b2..54224905 100644 --- a/das-dn/third_party/AdsLib/Sockets.h +++ b/das-dn/third_party/AdsLib/Sockets.h @@ -58,6 +58,7 @@ protected: const sockaddr* const m_DestAddr; socklen_t m_DestAddrLen; sockaddr_in m_HostAddr; + int m_type; Socket(const struct addrinfo* host, int type); ~Socket(); diff --git a/das-dn/third_party/AdsLib/Standalone/AmsConnection.cpp b/das-dn/third_party/AdsLib/Standalone/AmsConnection.cpp index e155da0e..b54e3cb9 100644 --- a/das-dn/third_party/AdsLib/Standalone/AmsConnection.cpp +++ b/das-dn/third_party/AdsLib/Standalone/AmsConnection.cpp @@ -83,8 +83,11 @@ AmsConnection::AmsConnection(Router& __router, const struct addrinfo* const dest AmsConnection::~AmsConnection() { - socket.Shutdown(); - receiver.join(); + if (socket.IsConnected()) + { + socket.Shutdown(); + receiver.join(); + } } SharedDispatcher AmsConnection::CreateNotifyMapping(uint32_t hNotify, std::shared_ptr notification)