update hostmodbustcp读取历史文件

This commit is contained in:
zhouhuang 2024-10-31 14:32:23 +08:00
parent 14fb35383f
commit b62a4e6dcc
4 changed files with 114 additions and 79 deletions

View File

@ -223,3 +223,4 @@ endif()
#add_link_options (-static) #add_link_options (-static)
add_executable (application ${APP_SRCS}) add_executable (application ${APP_SRCS})
target_link_libraries (application ${APP_LIBS}) target_link_libraries (application ${APP_LIBS})

View File

@ -2015,7 +2015,7 @@ bool CRYDevice::publishdeviceEventData(void)
if (root.size()) { if (root.size()) {
//return publish_sensor_data("", "deviceEvent", root); //return publish_sensor_data("", "deviceEvent", root);
vLog(LOG_DEBUG, "%s", root.toStyledString().c_str()); //vLog(LOG_DEBUG, "%s", root.toStyledString().c_str());
} }
return FALSE; return FALSE;

View File

@ -403,15 +403,7 @@ static bool publishhistoryLowSpeedData(const noPollConn* conn, const int uid, co
{ {
if (uid < 0 || uid >= UNIT_NUM) return false; if (uid < 0 || uid >= UNIT_NUM) return false;
Json::Value root; Json::Value root;
#if 0
Json::Value values;
int count = GetUnitYCCount(uid);
if (count) {
for (int i = 0; i < count; i++) {
values[(const char *)config.units[uid].ycs[i].name] = GetUnitYCRealFromValue(uid, i, data[i]);
}
}
#endif
if (values.size()) { if (values.size()) {
root["dataTime"] = (Json::Int64)(dt / 1000 * 1000); //取整 root["dataTime"] = (Json::Int64)(dt / 1000 * 1000); //取整
root["deviceId"] = static_units[uid].deviceId; root["deviceId"] = static_units[uid].deviceId;
@ -446,7 +438,6 @@ static size_t write_callback(void *ptr, size_t size, size_t nmemb, struct memory
} }
#else #else
struct CustomProgress struct CustomProgress
{ {
curl_off_t lastruntime; curl_off_t lastruntime;
@ -537,15 +528,6 @@ static int ftpget(const char* remote, const char* local, const char* user, const
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)chunk); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)chunk);
curl_easy_setopt(curl, CURLOPT_FTP_USE_EPSV, 0L); // Optional: depending on your FTP server curl_easy_setopt(curl, CURLOPT_FTP_USE_EPSV, 0L); // Optional: depending on your FTP server
ret = curl_easy_perform(curl); ret = curl_easy_perform(curl);
#endif
//vLog(LOG_DEBUG, "curl easy perform return value is: %d, and OK is: %d.\n", ret, CURLE_OK);
#if 0
int curl_state = 0;
if (ret == CURLE_OK) curl_state = 1;
else {
//vLog(LOG_ERROR, "%d,%s\n", ret, curl_easy_strerror(ret));
curl_state = 0;
}
#endif #endif
curl_easy_cleanup(curl); curl_easy_cleanup(curl);
curl_global_cleanup(); curl_global_cleanup();
@ -560,12 +542,8 @@ static void* ryftp_process(void* param)
CHostModbusTcpProcess* mbt = (CHostModbusTcpProcess *)param; CHostModbusTcpProcess* mbt = (CHostModbusTcpProcess *)param;
//获取此协议配置里面的ftp信息 //获取此协议配置里面的ftp信息
// char remote_filename[64];
// char remote_dirent[64];
char remote[256]; char remote[256];
char name[256]; char name[256];
// char local_filename[64];
// char local_dirent[128];
//默认参数,或是通过协议配置获取 //默认参数,或是通过协议配置获取
char user[128] = "administrator"; char user[128] = "administrator";
@ -587,10 +565,6 @@ static void* ryftp_process(void* param)
DWORD target_addr = mbt->target_addr; DWORD target_addr = mbt->target_addr;
memset(ipaddress, '\0', sizeof(ipaddress)); memset(ipaddress, '\0', sizeof(ipaddress));
inet_ntop(AF_INET, &target_addr, ipaddress, 16); inet_ntop(AF_INET, &target_addr, ipaddress, 16);
#if 0
struct timespec start, end;
double elapsed_time = 0;
#endif
for (int i = 0; i < sizeof(m_datalen_mbaddr) / sizeof(m_datalen_mbaddr[0]); i++) { for (int i = 0; i < sizeof(m_datalen_mbaddr) / sizeof(m_datalen_mbaddr[0]); i++) {
m_datalen2mbaddr_map.insert(datalen2mbaddrmap::value_type(m_datalen_mbaddr[i].address, i + 1)); m_datalen2mbaddr_map.insert(datalen2mbaddrmap::value_type(m_datalen_mbaddr[i].address, i + 1));
@ -1112,29 +1086,38 @@ static void* ryftp_process(void* param)
//根据实际配置表将 //根据实际配置表将
WORD ftpget_retry_count = 0; WORD ftpget_retry_count = 0;
BOOLEAN bReadCurrentFile = TRUE; //读取当前文件标识
while (TRUE) { while (TRUE) {
sleep(1); //每秒执行一次 sleep(1); //每秒执行一次
if (!mbt->m_bFtpRun) break;
//ftp获取文件 //ftp获取文件
if (mbt->m_iv == 1) { if (mbt->m_iv == 1) {
//文件目录无效 //文件目录无效
continue; continue;
} }
if (bReadCurrentFile)
{ //读取当前文件
snprintf(name, sizeof(name), "%s/%d", pathName, mbt->m_currentFileNo); snprintf(name, sizeof(name), "%s/%d", pathName, mbt->m_currentFileNo);
snprintf(remote, sizeof(remote), "ftp://%s/Hard%20Disk2/data/rtdatalog/%d/%d", ipaddress, mbt->m_currentDirNo, mbt->m_currentFileNo); snprintf(remote, sizeof(remote), "ftp://%s/Hard%20Disk2/data/rtdatalog/%d/%d", ipaddress, mbt->m_currentDirNo, mbt->m_currentFileNo);
#if 0 }
clock_gettime(CLOCK_MONOTONIC, &start); else
#endif { //读取未读的文件
snprintf(name, sizeof(name), "%s/%d", pathName, mbt->m_lastReadFileNo);
snprintf(remote, sizeof(remote), "ftp://%s/Hard%20Disk2/data/rtdatalog/%d/%d", ipaddress, mbt->m_lastReadDirNo, mbt->m_lastReadFileNo);
}
struct memory chunk = {0}; // For storing the downloaded data struct memory chunk = {0}; // For storing the downloaded data
int result = ftpget(remote, name, user, password, 3, &chunk); int result = ftpget(remote, name, user, password, 3, &chunk);
if (result == CURLE_OK) { if (result == CURLE_OK) {
//成功,处理文件 //成功,处理文件
vLog(LOG_DEBUG, "get %s to local %s, with name: %s, and password: %s okay.\n", remote, name, user, password); vLog(LOG_DEBUG, "get %s to local %s, with name: %s, and password: %s okay.\n", remote, name, user, password);
ftpget_retry_count = 0; ftpget_retry_count = 0;
struRYDeviceData *data = (struRYDeviceData *)chunk.response; struRYDeviceData *data = (struRYDeviceData *)chunk.response;
unionCP56Time st; unionCP56Time st;
int uid = mbt->GetCurUnitID(); int uid = mbt->GetCurUnitID();
for (int i = 0; i < 250; i++, data++) { for (int i = 0; i < 250; i++, data++) { //每个文件有250个数据块
QLONG unix_time = filetime_to_unix(data->localtime); QLONG unix_time = filetime_to_unix(data->localtime);
DWORD localtime = ((unix_time / 1000) - 28800L); DWORD localtime = ((unix_time / 1000) - 28800L);
@ -1151,14 +1134,6 @@ static void* ryftp_process(void* param)
int order = it->first; int order = it->first;
int point = it->second; int point = it->second;
if (GetYCValue(point, fields, (const char *)data, dvalue)) { if (GetYCValue(point, fields, (const char *)data, dvalue)) {
//fprintf(stderr, "get<%d> value is: %d.\n", point, dvalue);
#if 0
vLog(LOG_DEBUG, "datatime is: %04d-%02d-%02d %02d:%02d:%02d.%03d, value1 is: %d, value2 is: %d.\n",
st.year + 2000, st.month, st.dayofmonth, st.hour, st.minute, st.millisecond / 1000,
unix_time % 1000,
data->iGenSpeed,
data->iGenPower);
#endif
if (GetUnitYCIsFloat(uid, order)) { if (GetUnitYCIsFloat(uid, order)) {
highspeedvalues[(const char *)config.units[uid].ycs[order].name] = GetUnitYCRealFromValue(uid, order, dvalue); highspeedvalues[(const char *)config.units[uid].ycs[order].name] = GetUnitYCRealFromValue(uid, order, dvalue);
} else { } else {
@ -1202,25 +1177,29 @@ static void* ryftp_process(void* param)
publishhistoryLowSpeedData(g_conn, uid, unix_time, lowspeedvalues); publishhistoryLowSpeedData(g_conn, uid, unix_time, lowspeedvalues);
} }
} }
if (chunk.response) free(chunk.response); if (chunk.response) free(chunk.response);
#if 0
clock_gettime(CLOCK_MONOTONIC, &end); if (bReadCurrentFile) { //读取的是当前文件
elapsed_time = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
vLog(LOG_DEBUG, "Elapsed time: %.6f seconds\n", elapsed_time);
#endif
mbt->m_lastFileNo = mbt->m_currentFileNo; mbt->m_lastFileNo = mbt->m_currentFileNo;
mbt->m_currentFileNo++; mbt->m_currentFileNo++;
if ((mbt->m_currentFileNo - mbt->m_lastStartFileNo) % 1000 == 0) { if ((mbt->m_currentFileNo - mbt->m_currentDirStartFileNo) % 1000 == 0) {
//一个文件夹最多存放1000个文件, mbt->m_currentFileNo = 0; //一个文件夹最多存放1000个文件, mbt->m_currentFileNo = 0;
mbt->m_currentDirNo++; mbt->m_currentDirNo++;
mbt->m_lastStartFileNo = mbt->m_currentFileNo; mbt->m_currentDirStartFileNo = mbt->m_currentFileNo;
#if 0
if (mbt->m_currentDirNo >= 56) { //7天数据大约有56个文件夹
mbt->m_currentDirNo = 0;
} }
#endif } else { //读取的是未读历史文件
//保存文件信息 mbt->m_lastReadFileNo++;
if ((mbt->m_lastReadFileNo - mbt->m_lastReadDirStartFileNo) % 1000 == 0) {
//一个文件夹最多存放1000个文件, mbt->m_currentFileNo = 0;
mbt->m_lastReadDirNo++;
} }
if (mbt->m_curStartDirNo <= mbt->m_lastReadDirNo && mbt->m_curStartFileNo <= mbt->m_lastReadFileNo) {
vLog(LOG_DEBUG, "已读取完成所有未读的文件。\n");
mbt->m_bHaveUnReadFile = FALSE;
}
}
bReadCurrentFile = TRUE;
} else if (result == CURLE_REMOTE_FILE_NOT_FOUND) { } else if (result == CURLE_REMOTE_FILE_NOT_FOUND) {
//文件不存在尝试60次1分钟正常情况下PLC10s生成一个文件 //文件不存在尝试60次1分钟正常情况下PLC10s生成一个文件
ftpget_retry_count++; ftpget_retry_count++;
@ -1230,6 +1209,10 @@ static void* ryftp_process(void* param)
mbt->m_currentDirNo = -1; mbt->m_currentDirNo = -1;
mbt->m_currentFileNo = -1; mbt->m_currentFileNo = -1;
} }
//此处空闲读取未读的文件
if (mbt->m_bHaveUnReadFile) {
bReadCurrentFile = FALSE;
}
} }
} }
@ -1348,11 +1331,28 @@ CHostModbusTcpProcess::CHostModbusTcpProcess()
m_currentFileNo = -1; //当前文件编号 m_currentFileNo = -1; //当前文件编号
m_lastDirNo = -1; //上一目录编号 m_lastDirNo = -1; //上一目录编号
m_lastFileNo = -1; //上一文件编号 m_lastFileNo = -1; //上一文件编号
m_bFtpRun = FALSE;
#endif #endif
} }
CHostModbusTcpProcess::~CHostModbusTcpProcess() CHostModbusTcpProcess::~CHostModbusTcpProcess()
{ {
vLog(LOG_DEBUG, "here.....\n");
#ifdef HAVE_FTP_PROCESS
m_bFtpRun = FALSE;
vLog(LOG_DEBUG, "保存本协议读取到的那个目录和文件\n");
char fileName[260];
snprintf(fileName, sizeof(fileName), "hostmodbustcp_%d.mem", GetCurID());
FILE *pf = fopen(fileName, "w+");
if (pf)
{
fwrite(&m_currentDirNo, sizeof(LONG), 1, pf);
fwrite(&m_currentFileNo, sizeof(LONG), 1, pf);
fwrite(&m_currentDirStartFileNo, sizeof(LONG), 1, pf);
fclose(pf);
}
#endif
} }
CNetProcessItem *CHostModbusTcpProcess::CreateItem(int ord) CNetProcessItem *CHostModbusTcpProcess::CreateItem(int ord)
@ -1511,17 +1511,7 @@ void CHostModbusTcpProcess::calc2(void)
if (yccount) ycparam = new STRUCT_PARAM[yccount]; if (yccount) ycparam = new STRUCT_PARAM[yccount];
if (ymcount) ymparam = new STRUCT_PARAM[ymcount]; if (ymcount) ymparam = new STRUCT_PARAM[ymcount];
if (yxcount) yxparam = new STRUCT_PARAM[yxcount]; if (yxcount) yxparam = new STRUCT_PARAM[yxcount];
#if 0
//插入一帧读取信息的报文
ycframes[0].FrameType = MODBUSP_READ_ID;
ycframes[0].FuncCode = MODBUSP_READ_ID_FUNCCODE;
ycframes[0].RegBegin = MODBUSP_READ_ID_REGISTER_ADDRESS;
ycframes[0].RegCount = MODBUSP_READ_ID_REGISTER_LENGTH;
j = 1;
#else
j = 0;
#endif
if (ycparam) if (ycparam)
{ {
memset(ycparam, 0, sizeof(STRUCT_PARAM) * yccount); memset(ycparam, 0, sizeof(STRUCT_PARAM) * yccount);
@ -1538,7 +1528,7 @@ void CHostModbusTcpProcess::calc2(void)
{ {
WORD addr = MAKEWORD(ycparam[n].param[1], ycparam[n].param[2]); WORD addr = MAKEWORD(ycparam[n].param[1], ycparam[n].param[2]);
if (addr >= 65535) continue; if (addr >= 65535) continue;
for (; j < MODBUS_RTU_AUTOMATIC_FRAME; j++) for (j = 0; j < MODBUS_RTU_AUTOMATIC_FRAME; j++)
{ {
if (ycframes[j].FuncCode == 0) if (ycframes[j].FuncCode == 0)
{ {
@ -1683,8 +1673,26 @@ BOOLEAN CHostModbusTcpProcess::OnPreCreate(int id)
#ifdef HAVE_FTP_PROCESS #ifdef HAVE_FTP_PROCESS
vLog(LOG_DEBUG, "file size is: %d\n", sizeof(struRYDeviceData) * 250 / 1024); vLog(LOG_DEBUG, "file size is: %d\n", sizeof(struRYDeviceData) * 250 / 1024);
//读取文件
m_lastReadDirNo = 1;
m_lastReadFileNo = 1;
m_lastReadDirStartFileNo = 1;
m_bHaveUnReadFile = TRUE;
char fileName[260];
snprintf(fileName, sizeof(fileName), "hostmodbustcp_%d.mem", GetCurID());
FILE *pf = fopen(fileName, "rb");
if (pf)
{
fread(&m_lastReadDirNo, sizeof(LONG), 1, pf);
fread(&m_lastReadFileNo, sizeof(LONG), 1, pf);
fread(&m_lastReadDirStartFileNo, sizeof(LONG), 1, pf);
fclose(pf);
}
//启动后创建ftp线程 //启动后创建ftp线程
if (m_pid <= 0) { if (m_pid <= 0) {
m_bFtpRun = TRUE;
vLog(LOG_DEBUG, "create a ftp thread.\n"); vLog(LOG_DEBUG, "create a ftp thread.\n");
pthread_attr_t attr; pthread_attr_t attr;
@ -1702,6 +1710,7 @@ BOOLEAN CHostModbusTcpProcess::OnPreCreate(int id)
pthread_attr_destroy(&attr); pthread_attr_destroy(&attr);
} }
#endif #endif
return TRUE; return TRUE;
@ -2306,10 +2315,27 @@ BOOLEAN CHostModbusTcpProcess::OnReceiveIDData(CHostModbusTcpProcessItem *pItem,
m_currentDirNo = (DWORD)((pBuf[2] << 24) | (pBuf[3] << 16) | (pBuf[0] << 8) | pBuf[1]); pBuf += 4; m_currentDirNo = (DWORD)((pBuf[2] << 24) | (pBuf[3] << 16) | (pBuf[0] << 8) | pBuf[1]); pBuf += 4;
//当前文件夹下最后新文件 //当前文件夹下最后新文件
m_currentFileNo = (DWORD)((pBuf[2] << 24) | (pBuf[3] << 16) | (pBuf[0] << 8) | pBuf[1]); pBuf += 4; m_currentFileNo = (DWORD)((pBuf[2] << 24) | (pBuf[3] << 16) | (pBuf[0] << 8) | pBuf[1]); pBuf += 4;
//当前文件夹下第一个文件 //当前目录文件开始编号
m_lastStartFileNo = (DWORD)((pBuf[2] << 24) | (pBuf[3] << 16) | (pBuf[0] << 8) | pBuf[1]); pBuf += 4; m_currentDirStartFileNo = (DWORD)((pBuf[2] << 24) | (pBuf[3] << 16) | (pBuf[0] << 8) | pBuf[1]); pBuf += 4;
vLog(LOG_DEBUG, "最新文件夹编号: %ld, 最新文件名编号: %ld: 最新文件夹中第一个文件的编号: %ld\n", m_currentDirNo, m_currentFileNo, m_lastStartFileNo);
m_curStartDirNo = m_currentDirNo; //当前开始目录
m_curStartFileNo = m_currentFileNo; //当前开始文件
//判断是否有要读取的文件
if (m_curStartDirNo <= m_lastReadDirNo && m_curStartFileNo <= m_lastReadFileNo)
{
vLog(LOG_DEBUG, "不存在未读的文件。\n");
m_bHaveUnReadFile = FALSE;
}
else
{
m_bHaveUnReadFile = TRUE;
int dir_count = m_curStartDirNo - m_lastReadDirNo + 1;
int file_count = m_curStartFileNo - m_lastReadFileNo;
vLog(LOG_DEBUG, "总共有%d个目录的%d个文件未读取。\n", dir_count, file_count);
}
/*文件从保存的最后读取的文件目录的文件一直督导当前开始目录*/
vLog(LOG_DEBUG, "最新文件夹编号: %ld, 最新文件名编号: %ld: 最新文件夹中第一个文件的编号: %ld\n", m_currentDirNo, m_currentFileNo, m_currentDirStartFileNo);
return TRUE; return TRUE;
} }

View File

@ -233,7 +233,15 @@ public:
LONG m_currentFileNo; //当前文件编号 LONG m_currentFileNo; //当前文件编号
LONG m_lastDirNo; //上一目录编号 LONG m_lastDirNo; //上一目录编号
LONG m_lastFileNo; //上一文件编号 LONG m_lastFileNo; //上一文件编号
LONG m_lastStartFileNo; //文件开始编号 LONG m_currentDirStartFileNo; //当前目录文件开始编号
LONG m_lastReadDirNo; //最后获取的目录编号
LONG m_lastReadFileNo; //最后获取的文件编号
LONG m_lastReadDirStartFileNo; //最后获取的目录文件开始编号
LONG m_curStartDirNo; //当前获取的目录编号
LONG m_curStartFileNo; //当前获取的文件编号
BOOLEAN m_bHaveUnReadFile; //存在未读的文件
BOOLEAN m_bFtpRun;
//ftp参数信息 //ftp参数信息
char m_user[128]; char m_user[128];