774 lines
26 KiB
C++
774 lines
26 KiB
C++
#include "bfftpfile2issmqtt.h"
|
||
#include "uuid/uuid.h"
|
||
#include <fstream>
|
||
#include <dirent.h>
|
||
#include <fnmatch.h>
|
||
|
||
static void on_connect_wrapper(struct mosquitto *mosq, void *obj, int reason_code)
|
||
{
|
||
if (obj == NULL) return;
|
||
CBFFTPFile2ISSMQTTProcess*mqtt = (CBFFTPFile2ISSMQTTProcess*)obj;
|
||
|
||
UNUSED(mosq);
|
||
|
||
mqtt->on_connect(reason_code);
|
||
}
|
||
|
||
static void on_disconnect_wrapper(struct mosquitto *mosq, void* obj, int reason_code)
|
||
{
|
||
if (obj == NULL) return;
|
||
CBFFTPFile2ISSMQTTProcess*mqtt = (CBFFTPFile2ISSMQTTProcess*)obj;
|
||
|
||
UNUSED(mosq);
|
||
|
||
mqtt->on_disconnect(reason_code);
|
||
}
|
||
|
||
static void on_publish_wrapper(struct mosquitto *mosq, void *obj, int mid)
|
||
{
|
||
if (obj == NULL) return;
|
||
CBFFTPFile2ISSMQTTProcess*mqtt = (CBFFTPFile2ISSMQTTProcess*)obj;
|
||
|
||
UNUSED(mosq);
|
||
|
||
mqtt->on_publish(mid);
|
||
}
|
||
|
||
static void on_subscribe_wrapper(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
|
||
{
|
||
if (obj == NULL) return;
|
||
CBFFTPFile2ISSMQTTProcess*mqtt = (CBFFTPFile2ISSMQTTProcess*)obj;
|
||
|
||
UNUSED(mosq);
|
||
|
||
mqtt->on_subscribe(mid, qos_count, granted_qos);
|
||
}
|
||
|
||
static void on_message_wrapper(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
|
||
{
|
||
if (obj == NULL) return;
|
||
CBFFTPFile2ISSMQTTProcess*mqtt = (CBFFTPFile2ISSMQTTProcess*)obj;
|
||
|
||
UNUSED(mosq);
|
||
|
||
mqtt->on_message(msg);
|
||
}
|
||
|
||
static void on_log_wrapper(struct mosquitto *mosq, void *obj, int level, const char *str)
|
||
{
|
||
if (obj == NULL) return;
|
||
CBFFTPFile2ISSMQTTProcess*mqtt = (CBFFTPFile2ISSMQTTProcess*)obj;
|
||
|
||
UNUSED(mosq);
|
||
|
||
mqtt->on_log(level, str);
|
||
}
|
||
|
||
CBFFTPFile2ISSMQTTProcessItem::CBFFTPFile2ISSMQTTProcessItem()
|
||
{
|
||
m_bAdded = FALSE;
|
||
memset(nodeId, 0, sizeof(nodeId));
|
||
memset(deviceId, 0, sizeof(deviceId));
|
||
memset(manufacturerId, 0, sizeof(manufacturerId));
|
||
memset(model, 0, sizeof(model));
|
||
}
|
||
|
||
CBFFTPFile2ISSMQTTProcessItem::~CBFFTPFile2ISSMQTTProcessItem()
|
||
{
|
||
}
|
||
|
||
void CBFFTPFile2ISSMQTTProcessItem::Attach(int uid, int sock, BYTE* clint_id, WORD peer_port)
|
||
{
|
||
CProcessItem::Attach(uid, sock, *(DWORD *)clint_id, peer_port);
|
||
}
|
||
|
||
void CBFFTPFile2ISSMQTTProcessItem::Release(void)
|
||
{
|
||
CProcessItem::Release();
|
||
}
|
||
|
||
CBFFTPFile2ISSMQTTProcess::CBFFTPFile2ISSMQTTProcess()
|
||
{
|
||
connect_begin = 0;
|
||
m_mid = 0;
|
||
last_sec = 0;
|
||
connected = FALSE;
|
||
issmqttOption.connectInterval = 30; //链接等待
|
||
issmqttOption.keepAliveInterval = 60; //心跳链接
|
||
issmqttOption.connectTimeout = 30; //链接等待超时
|
||
issmqttOption.publishInterval = 60; //数据发送间隔
|
||
|
||
strncpy(issmqttOption.username, "8E88A55D8E3A4A6BB1196089191494DD", sizeof(issmqttOption.username));
|
||
strncpy(issmqttOption.password, "?tChs9!s6KA?[g:078H4&xCW", sizeof(issmqttOption.password));
|
||
strncpy(issmqttOption.client_id, "D3332045Poutg", sizeof(issmqttOption.client_id));
|
||
|
||
m_irns = NULL;
|
||
m_dataValues = NULL;
|
||
m_nCount = 0;
|
||
m_terminalIrn = 0;
|
||
m_dataCount = 0;
|
||
}
|
||
|
||
CBFFTPFile2ISSMQTTProcess::~CBFFTPFile2ISSMQTTProcess()
|
||
{
|
||
}
|
||
|
||
CProcessItem * CBFFTPFile2ISSMQTTProcess::CreateItem(int ord)
|
||
{
|
||
return(dynamic_cast<CProcessItem *>(new CBFFTPFile2ISSMQTTProcessItem));
|
||
}
|
||
|
||
void CBFFTPFile2ISSMQTTProcess::DestroyItem(int ord, BOOLEAN bDeleted)
|
||
{
|
||
CBFFTPFile2ISSMQTTProcessItem *pItem = (CBFFTPFile2ISSMQTTProcessItem *)GetItem(ord);
|
||
if (pItem != NULL && !bDeleted)
|
||
{
|
||
delete pItem;
|
||
return CProcess::DestroyItem(ord, TRUE);
|
||
}
|
||
|
||
return CProcess::DestroyItem(ord, bDeleted);
|
||
}
|
||
|
||
BOOLEAN CBFFTPFile2ISSMQTTProcess::OnPreCreate(int id)
|
||
{
|
||
if (!CProcess::OnPreCreate(id)) return FALSE;
|
||
|
||
if (!GetOption(&issmqttOption, sizeof(issmqttOption))) return FALSE;
|
||
#if 1
|
||
//读取units.sta静态文件信息
|
||
int uid = GetUnitID(0);
|
||
if (uid < 0 || uid >= UNIT_NUM) return FALSE;
|
||
m_deviceID = static_units[uid].deviceId;
|
||
#endif
|
||
//第0个为本机节点IRN
|
||
m_terminalIrn = nodes.m_node[0].irn;
|
||
snprintf(publish_topic, sizeof(publish_topic), "USP/v100/U/%lld", m_terminalIrn);
|
||
snprintf(subscribe_topic, sizeof(subscribe_topic), "USP/v100/D/%lld", m_terminalIrn);
|
||
|
||
snprintf(m_file_dir, sizeof(m_file_dir), "%s/datas", configpath);
|
||
vLog(LOG_DEBUG, "file dir is: %s, size is: %d.\n", m_file_dir, sizeof(struFileData));
|
||
|
||
m_dataCount = GetUnitYCCount(uid);
|
||
if (m_dataCount)
|
||
{
|
||
m_irns = new Json::Int64[m_dataCount];
|
||
m_dataValues = new float[m_dataCount];
|
||
}
|
||
if (m_irns)
|
||
{
|
||
for (int i = 0; i < m_dataCount; i++)
|
||
{
|
||
m_irns[i] = (Json::Int64)GetUnitYCIRNByPoint(uid, i);
|
||
}
|
||
}
|
||
if (m_dataValues)
|
||
{
|
||
memset(m_dataValues, 0, sizeof(float) * m_dataCount);
|
||
}
|
||
//snprintf(issmqttOption.client_id, sizeof(issmqttOption.client_id), "%s", "bfftpfile");
|
||
|
||
mosquitto_lib_init();
|
||
m_mosq = mosquitto_new(issmqttOption.client_id, true, this);
|
||
if (m_mosq == NULL)
|
||
{
|
||
vLog(LOG_ERROR, "[BFFTPFILE2ISSMQTT] mosquitto_new() Out of memory.\n");
|
||
return FALSE;
|
||
}
|
||
|
||
mosquitto_username_pw_set(m_mosq, issmqttOption.username, issmqttOption.password);
|
||
mosquitto_connect_callback_set(m_mosq, on_connect_wrapper);
|
||
mosquitto_disconnect_callback_set(m_mosq, on_disconnect_wrapper);
|
||
mosquitto_publish_callback_set(m_mosq, on_publish_wrapper);
|
||
mosquitto_subscribe_callback_set(m_mosq, on_subscribe_wrapper);
|
||
mosquitto_message_callback_set(m_mosq, on_message_wrapper);
|
||
mosquitto_log_callback_set(m_mosq, on_log_wrapper);
|
||
|
||
int rc;
|
||
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] user: %s, pass: %s\n", issmqttOption.username, issmqttOption.password);
|
||
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] client_id: %s, device_id: %s\n", issmqttOption.client_id, issmqttOption.device_id);
|
||
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] host: %s:%d, keepAlive is: %d\n", issmqttOption.host, issmqttOption.target_port, issmqttOption.keepAliveInterval);
|
||
rc = mosquitto_connect_async(m_mosq, issmqttOption.host, issmqttOption.target_port, issmqttOption.keepAliveInterval);
|
||
if (rc != MOSQ_ERR_SUCCESS)
|
||
{
|
||
mosquitto_destroy(m_mosq);
|
||
vLog(LOG_ERROR, "[BFFTPFILE2ISSMQTT] mosquitto_connect() %d,%s\n", rc, mosquitto_strerror(rc));
|
||
return FALSE;
|
||
}
|
||
|
||
return TRUE;
|
||
}
|
||
|
||
void CBFFTPFile2ISSMQTTProcess::on_connect(int reason_code)
|
||
{
|
||
if (reason_code != MOSQ_ERR_SUCCESS)
|
||
{
|
||
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] MQTT连接失败:%s\n", mosquitto_connack_string(reason_code));
|
||
mosquitto_disconnect(m_mosq);
|
||
connected = FALSE;
|
||
return;
|
||
}
|
||
int rc;
|
||
rc = mosquitto_subscribe(m_mosq, NULL, subscribe_topic, QOS_LEVEL_0);
|
||
if (rc != MOSQ_ERR_SUCCESS)
|
||
{
|
||
vLog(LOG_ERROR, "MQTT订阅下行主题失败:%s\n", strerror(errno));
|
||
mosquitto_disconnect(m_mosq);
|
||
connected = FALSE;
|
||
return;
|
||
}
|
||
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] MQTT连接成功\n");
|
||
connect_begin = system32.timers;
|
||
//此处添加一个上电信息。
|
||
connected = TRUE;
|
||
}
|
||
|
||
void CBFFTPFile2ISSMQTTProcess::on_disconnect(int reason_code)
|
||
{
|
||
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] on_Disconnect: %d, %s\n", reason_code, mosquitto_reason_string(reason_code));
|
||
connected = FALSE;
|
||
}
|
||
|
||
void CBFFTPFile2ISSMQTTProcess::on_publish(int mid)
|
||
{
|
||
//vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] Message with mid %d has been published.\n", mid);
|
||
}
|
||
|
||
void CBFFTPFile2ISSMQTTProcess::on_subscribe(int mid, int qos_count, const int *granted_qos)
|
||
{
|
||
int i;
|
||
bool have_subscription = false;
|
||
for (i = 0; i < qos_count; i++)
|
||
{
|
||
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] on_subscribe: %d:granted qos = %d\n", i, granted_qos[i]);
|
||
if (granted_qos[i] <= 2)
|
||
{
|
||
have_subscription = true;
|
||
}
|
||
}
|
||
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] MQTT订阅成功\n");
|
||
if (have_subscription == false)
|
||
{
|
||
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] Error: All subscriptions rejected.\n");
|
||
mosquitto_disconnect(m_mosq);
|
||
}
|
||
}
|
||
|
||
void CBFFTPFile2ISSMQTTProcess::on_message(const struct mosquitto_message *msg)
|
||
{
|
||
Json::Value jsonRoot;
|
||
jsonRoot.clear();
|
||
char *buffer = new char[msg->payloadlen+1];
|
||
if (buffer == NULL) return;
|
||
memset(buffer, 0, msg->payloadlen+1);
|
||
strncpy(buffer, (char *)msg->payload, msg->payloadlen);
|
||
std::string err;
|
||
Json::CharReaderBuilder builder;
|
||
Json::CharReader* reader(builder.newCharReader());
|
||
if (!reader->parse(buffer, buffer + msg->payloadlen, &jsonRoot, &err))
|
||
{
|
||
delete buffer;
|
||
buffer = NULL;
|
||
return;
|
||
}
|
||
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] topic: %s, msg: %s\n", msg->topic, buffer);
|
||
|
||
OnReceiveSubscribeTopic(jsonRoot);
|
||
delete buffer;
|
||
buffer = NULL;
|
||
}
|
||
|
||
void CBFFTPFile2ISSMQTTProcess::on_log(int level, const char* str)
|
||
{
|
||
if (level == MOSQ_LOG_ERR) vLog(LOG_ERROR, "[MQTT] %s\n", str);
|
||
else if (level == MOSQ_LOG_WARNING) vLog(LOG_WARN, "[MQTT] %s\n", str);
|
||
else if (level == MOSQ_LOG_NOTICE) vLog(LOG_INFO, "[MQTT] %s\n", str);
|
||
else if (level == MOSQ_LOG_INFO) vLog(LOG_INFO, "[MQTT] %s\n", str);
|
||
}
|
||
|
||
void CBFFTPFile2ISSMQTTProcess::Destroy(void)
|
||
{
|
||
mosquitto_lib_cleanup();
|
||
CProcess::Destroy();
|
||
}
|
||
|
||
BOOLEAN CBFFTPFile2ISSMQTTProcess::Run(void)
|
||
{
|
||
if (!CProcess::Run()) return FALSE;
|
||
FeedDog();
|
||
|
||
int rc;
|
||
|
||
rc = mosquitto_loop(m_mosq, -1, 1);
|
||
if (rc != MOSQ_ERR_SUCCESS)
|
||
{
|
||
connected = false;
|
||
//vLog(LOG_ERROR, "Error: 循环处理MQTT消息失败!%d,%s\n", rc, mosquitto_strerror(rc));
|
||
switch (rc)
|
||
{
|
||
case MOSQ_ERR_NOMEM:
|
||
case MOSQ_ERR_PROTOCOL:
|
||
case MOSQ_ERR_INVAL:
|
||
case MOSQ_ERR_NOT_FOUND:
|
||
case MOSQ_ERR_TLS:
|
||
case MOSQ_ERR_PAYLOAD_SIZE:
|
||
case MOSQ_ERR_NOT_SUPPORTED:
|
||
case MOSQ_ERR_AUTH:
|
||
case MOSQ_ERR_ACL_DENIED:
|
||
case MOSQ_ERR_UNKNOWN:
|
||
case MOSQ_ERR_EAI:
|
||
case MOSQ_ERR_PROXY:
|
||
//return FALSE;
|
||
case MOSQ_ERR_ERRNO:
|
||
break;
|
||
}
|
||
if (errno == EPROTO)
|
||
{
|
||
return FALSE;
|
||
}
|
||
//重新连接
|
||
if (system32.timers >= (connect_begin + issmqttOption.connectTimeout))
|
||
{
|
||
connect_begin = system32.timers;
|
||
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] start to connect broker.\n");
|
||
rc = mosquitto_reconnect_async(m_mosq);
|
||
}
|
||
}
|
||
|
||
return TRUE;
|
||
}
|
||
|
||
|
||
BOOLEAN CBFFTPFile2ISSMQTTProcess::OnTimer(void)
|
||
{
|
||
if (!CProcess::OnTimer()) return FALSE;
|
||
|
||
BOOLEAN sec_changed = FALSE;
|
||
|
||
m_nCount++;
|
||
if (last_sec != (time_t)system32.timers)
|
||
{
|
||
last_sec = system32.timers;
|
||
sec_changed = TRUE;
|
||
}
|
||
|
||
if (!connected)
|
||
{
|
||
return TRUE;
|
||
}
|
||
|
||
UnitFeedDog(GetCurUnitID());
|
||
//此处每5分钟更新一次数据
|
||
if (sec_changed)
|
||
{
|
||
int count = countFilesInDirectory(m_file_dir);
|
||
#if 0
|
||
if ((last_sec % issmqttOption.publishInterval) == 0)
|
||
{ //更新数据
|
||
publishRealData();
|
||
return TRUE;
|
||
}
|
||
#endif
|
||
if ((last_sec % 20) == 0)
|
||
{
|
||
publishMonHeartBeat();
|
||
return TRUE;
|
||
}
|
||
}
|
||
|
||
return TRUE;
|
||
}
|
||
|
||
BOOLEAN CBFFTPFile2ISSMQTTProcess::OnReceiveSubscribeTopic(Json::Value jsonRoot)
|
||
{
|
||
if (jsonRoot["command"].isNull()) return FALSE;
|
||
if (jsonRoot["payload"].isNull()) return FALSE;
|
||
|
||
std::string command = jsonRoot["command"].asString();
|
||
|
||
if (command == "CMD_CONTROL")
|
||
{
|
||
OnReceivedDeviceCommand(jsonRoot["payload"]);
|
||
}
|
||
#if 1
|
||
else if (command == "LINK_LOGMONITOR_ENABLE")
|
||
{
|
||
OnReceivedSystemAction(jsonRoot["payload"]);
|
||
}
|
||
#endif
|
||
|
||
|
||
return TRUE;
|
||
}
|
||
|
||
BOOLEAN CBFFTPFile2ISSMQTTProcess::OnReceivedDeviceCommand(Json::Value jsonRoot)
|
||
{
|
||
#if 0
|
||
if (jsonRoot["irn"].isNull()) return FALSE;
|
||
if (jsonRoot["operType"].isNull()) return FALSE;
|
||
if (jsonRoot["operValue"].isNull()) return FALSE;
|
||
|
||
int uid = GetCurUnitID();
|
||
|
||
Json::Int64 irn = jsonRoot["irn"].asInt64();
|
||
int operType = jsonRoot["operType"].asInt();
|
||
int operValue = jsonRoot["operValue"].asInt();
|
||
int point;
|
||
|
||
if (operType == CMD_CONTROL_OPERATION) //遥控
|
||
{
|
||
//根据irn来查找point
|
||
point = GetUnitYKPointByIRN(uid, irn);
|
||
if (point < 0)
|
||
{
|
||
vLog(LOG_ERROR, "未能找到对应的遥控点号,请检查并确认。\n");
|
||
return FALSE;
|
||
}
|
||
SetUnitYK(uid, point, (operValue & 0x03), YKS_SELREQ, YKR_IDLE);
|
||
vLog(LOG_WARN, "Unit(%d) yk(%d) %s state is YKS_SELREQ result is YKR_IDLE.\n", uid, point, ((operValue & 0x03) ? "CLOSE" : "TRIP"));
|
||
}
|
||
else if (operType == CMD_CONTROL_SETTING) //遥调
|
||
{
|
||
//根据irn来查找point
|
||
point = GetUnitYTPointByIRN(uid, irn);
|
||
if (point < 0)
|
||
{
|
||
vLog(LOG_ERROR, "未能找到对应的遥调点号,请检查并确认。\n");
|
||
return FALSE;
|
||
}
|
||
SetUnitYT(uid, point, operValue, YTS_EXEREQ, YTR_IDLE);
|
||
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] Unit(%d) set point(%d) %d state is YTS_EXEREQ result is YTR_IDLE.\n", uid, point, operValue);
|
||
}
|
||
else
|
||
{
|
||
vLog(LOG_ERROR, "平台下发的<%d>命令错误。operType不是<0-遥控或1-遥调>,系统不支持的命令!\n", operType);
|
||
return FALSE;
|
||
}
|
||
#endif
|
||
return TRUE;
|
||
}
|
||
|
||
BOOLEAN CBFFTPFile2ISSMQTTProcess::OnReceivedSystemAction(Json::Value jsonRoot)
|
||
{
|
||
#if 0
|
||
if (jsonRoot["linkIrn"].isNull()) return FALSE;
|
||
|
||
{ //链路日志监控开启
|
||
QLONG linkIrn = -1;
|
||
if (jsonRoot["linkIrn"].isInt64()) linkIrn = jsonRoot["linkIrn"].asInt64();
|
||
struMonLinkLog.m_iStart_Time = system32.timers;
|
||
struMonLinkLog.m_iLinkId = GetProcessIDByIRN(linkIrn);
|
||
|
||
struMonLinkLog.m_bEnable = TRUE;
|
||
struMonLinkLog.m_iLinkIrn = linkIrn;
|
||
channelBuffer.enabled = TRUE;
|
||
channelBuffer.mon_port = struMonLinkLog.m_iLinkId;
|
||
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] receive action LINK_LOGMONITOR_ENABLE, irn is:%lld, pid is: %d\n", struMonLinkLog.m_iLinkIrn, struMonLinkLog.m_iLinkId);
|
||
}
|
||
#endif
|
||
return TRUE;
|
||
}
|
||
|
||
BOOLEAN CBFFTPFile2ISSMQTTProcess::publishFrame(const char* command, Json::Value payload)
|
||
{
|
||
Json::StreamWriterBuilder builder;
|
||
builder["indentation"] = "";
|
||
builder["emitUTF8"] = true;
|
||
builder["precision"] = 3;
|
||
|
||
Json::Value jsonRoot;
|
||
jsonRoot["command"] = command;
|
||
uuid_t uuid;
|
||
char str[128];
|
||
uuid_generate_time(uuid);
|
||
uuid_unparse_upper(uuid, str);
|
||
jsonRoot["traceId"] = str;
|
||
QLONG pushTime = system32.timers;
|
||
pushTime *= 1000;
|
||
pushTime += system32.now.millisecond % 1000;
|
||
jsonRoot["mtime"] = (Json::Int64)pushTime;
|
||
jsonRoot["payload"] = payload;
|
||
|
||
std::string outputConfig = Json::writeString(builder, jsonRoot);
|
||
|
||
int rc;
|
||
//vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] send topic: %s, payload: %d, %lld\n", publish_topic, outputConfig.length(), pushTime);
|
||
rc = mosquitto_publish(m_mosq, NULL, publish_topic, outputConfig.length(), outputConfig.c_str(), QOS_LEVEL_0, false);
|
||
if (rc != MOSQ_ERR_SUCCESS)
|
||
{
|
||
vLog(LOG_ERROR, "publishing topic: %s is error<%d,%s>。\n", publish_topic, rc, mosquitto_strerror(rc));
|
||
return FALSE;
|
||
}
|
||
|
||
return TRUE;
|
||
}
|
||
|
||
BOOLEAN CBFFTPFile2ISSMQTTProcess::publishRealData(Json::Int64 dataTime)
|
||
{
|
||
int i;
|
||
int uid;
|
||
int count = 0;
|
||
|
||
Json::Value jsonRoot;
|
||
Json::Value jsonItem;
|
||
Json::Value jsonValue;
|
||
|
||
for (i = 0; i < m_dataCount; i++)
|
||
{
|
||
jsonValue["irn"] = m_irns[i];
|
||
jsonValue["dataValue"] = m_dataValues[i];
|
||
jsonValue["dataTime"] = dataTime;
|
||
jsonItem.append(jsonValue);
|
||
}
|
||
jsonRoot["ycs"] = jsonItem;
|
||
|
||
return publishFrame("REAL_DATA", jsonRoot);
|
||
}
|
||
|
||
BOOLEAN CBFFTPFile2ISSMQTTProcess::publishMonHeartBeat(void)
|
||
{
|
||
int i, uid;
|
||
|
||
uid = GetCurUnitID();
|
||
if (uid < 0 || uid >= UNIT_NUM) return FALSE;
|
||
|
||
Json::Value jsonRoot;
|
||
Json::Value jsonItem;
|
||
Json::Value jsonValue;
|
||
|
||
for (i = 0; i < PROCESSES_NUM; i++)
|
||
{
|
||
if (config.processes[i].state == TRUE)
|
||
{
|
||
jsonValue["linkIrn"] = (Json::Int64)GetProcessIRNByPid(i);
|
||
jsonValue["online"] = (config.processes[i].softdog >= PROCESS_WATCHDOG_TIME) ? false : true;
|
||
jsonItem.append(jsonValue);
|
||
}
|
||
}
|
||
|
||
if (jsonItem.size() <= 0) return FALSE;
|
||
|
||
jsonRoot["ttl"] = 30000;
|
||
jsonRoot["status"] = 0;
|
||
jsonRoot["links"] = jsonItem;
|
||
|
||
return publishFrame("HEARTBEAT", jsonRoot);
|
||
}
|
||
|
||
int CBFFTPFile2ISSMQTTProcess::countFilesInDirectory(const char* directory)
|
||
{
|
||
DIR *dir;
|
||
struct dirent *ent;
|
||
int count = 0;
|
||
|
||
// 打开目录
|
||
dir = opendir(directory);
|
||
if (dir == NULL) {
|
||
perror("无法打开目录");
|
||
return -1;
|
||
}
|
||
|
||
// 遍历目录
|
||
while ((ent = readdir(dir))) {
|
||
if (strcmp(ent->d_name, ".") == 0 || strcmp(ent->d_name, "..") == 0) {
|
||
continue;
|
||
}
|
||
|
||
// 统计文件数量
|
||
if (ent->d_type == DT_REG) {
|
||
count++;
|
||
char fileName[MAX_PATH];
|
||
|
||
struct timespec start, end;
|
||
double elapsed_time;
|
||
//vLog(LOG_DEBUG, "start...\n");
|
||
clock_gettime(CLOCK_MONOTONIC, &start);
|
||
|
||
snprintf(fileName, sizeof(fileName), "%s/%s", m_file_dir, ent->d_name);
|
||
char name[MAX_PATH];
|
||
snprintf(name, sizeof(name), "%s", ent->d_name);
|
||
int year, month, day, hour, minute, second;
|
||
sscanf(name, "%04d-%02d-%02d-%02d%02d%02d", &year, &month, &day, &hour, &minute, &second);
|
||
//vLog(LOG_DEBUG, "%04d-%02d-%02d %02d:%02d:%02d\n", year, month, day, hour, minute, second);
|
||
time_t ctime;
|
||
struct tm ctm;
|
||
ctm.tm_year = year - 1900;
|
||
ctm.tm_mon = month - 1;
|
||
ctm.tm_mday = day;
|
||
ctm.tm_hour = hour;
|
||
ctm.tm_min = minute;
|
||
ctm.tm_sec = second;
|
||
ctm.tm_isdst = 0;
|
||
ctime = mktime(&ctm);
|
||
Json::Int64 dataTime = (Json::Int64)ctime;
|
||
dataTime *= 1000;
|
||
FILE* pf = fopen(fileName, "rb");
|
||
if (pf == NULL) continue;
|
||
while (!feof(pf))
|
||
{
|
||
struFileData buffers;
|
||
int pos = fread((void *)&buffers, 1, sizeof(buffers), pf);
|
||
if (pos)
|
||
{
|
||
int len = 0;
|
||
m_dataValues[len++] = buffers.iTurbineOperationMode;
|
||
m_dataValues[len++] = buffers.iBPLevel;
|
||
m_dataValues[len++] = buffers.iYPLevel;
|
||
m_dataValues[len++] = buffers.bDQ_NacelleStartYawCCW;
|
||
m_dataValues[len++] = buffers.bDQ_NacelleStartYawCW;
|
||
m_dataValues[len++] = buffers.rGenPowerForProcess;
|
||
m_dataValues[len++] = buffers.rGenPowerForProcess_1sec;
|
||
m_dataValues[len++] = buffers.rGenPowerForProcess_30sec;
|
||
m_dataValues[len++] = buffers.rGenSpeedPDM;
|
||
m_dataValues[len++] = buffers.rGenSpeedPDM_1sec;
|
||
m_dataValues[len++] = buffers.rGenSpeedPDM_30sec;
|
||
m_dataValues[len++] = buffers.rPitchAngle;
|
||
m_dataValues[len++] = buffers.rPitchAngle_1sec;
|
||
m_dataValues[len++] = buffers.rPitchAngle_30sec;
|
||
m_dataValues[len++] = buffers.rWindSpeed;
|
||
m_dataValues[len++] = buffers.rWindSpeed_1sec;
|
||
m_dataValues[len++] = buffers.rWindSpeed_30sec;
|
||
m_dataValues[len++] = buffers.rCosPhi_KL3403;
|
||
m_dataValues[len++] = buffers.rReactivePower_KL3403;
|
||
m_dataValues[len++] = buffers.rFrequency_690V_KL3403;
|
||
m_dataValues[len++] = buffers.rUL1_690V_KL3403;
|
||
m_dataValues[len++] = buffers.rUL2_690V_KL3403;
|
||
m_dataValues[len++] = buffers.rUL3_690V_KL3403;
|
||
m_dataValues[len++] = buffers.rIL1_690V_KL3403;
|
||
m_dataValues[len++] = buffers.rIL2_690V_KL3403;
|
||
m_dataValues[len++] = buffers.rIL3_690V_KL3403;
|
||
m_dataValues[len++] = buffers.rActivePowerSetPointValue;
|
||
m_dataValues[len++] = buffers.rSetValueGenSpeed;
|
||
m_dataValues[len++] = buffers.rSetValuePitchAngle;
|
||
m_dataValues[len++] = buffers.rNacellePositionLtd;
|
||
m_dataValues[len++] = buffers.rCableTwistTotal;
|
||
m_dataValues[len++] = buffers.rNacellePositionTotal;
|
||
m_dataValues[len++] = buffers.rVaneDirection_1sec;
|
||
m_dataValues[len++] = buffers.rAvailabillityToday;
|
||
m_dataValues[len++] = buffers.rAvailabillityTotal;
|
||
m_dataValues[len++] = buffers.rCosPhiSetValue;
|
||
m_dataValues[len++] = buffers.iPAR_iKWhOverall;
|
||
m_dataValues[len++] = buffers.iPAR_iKWhThisDay;
|
||
m_dataValues[len++] = buffers.wOperationHoursOverall;
|
||
m_dataValues[len++] = buffers.wOperationHoursDay;
|
||
m_dataValues[len++] = buffers.SCW001;
|
||
m_dataValues[len++] = buffers.SCW002;
|
||
m_dataValues[len++] = buffers.SCW003;
|
||
m_dataValues[len++] = buffers.SCW004;
|
||
m_dataValues[len++] = buffers.SCW005;
|
||
m_dataValues[len++] = buffers.SCW006;
|
||
m_dataValues[len++] = buffers.SCW007;
|
||
m_dataValues[len++] = buffers.SCW008;
|
||
m_dataValues[len++] = buffers.SCW009;
|
||
m_dataValues[len++] = buffers.SCW010;
|
||
m_dataValues[len++] = buffers.SCW011;
|
||
m_dataValues[len++] = buffers.SCW012;
|
||
m_dataValues[len++] = buffers.SCW013;
|
||
m_dataValues[len++] = buffers.SCW014;
|
||
m_dataValues[len++] = buffers.SCW015;
|
||
m_dataValues[len++] = buffers.SCW016;
|
||
m_dataValues[len++] = buffers.SCW017;
|
||
m_dataValues[len++] = buffers.SCW018;
|
||
m_dataValues[len++] = buffers.SCW019;
|
||
m_dataValues[len++] = buffers.SCW020;
|
||
m_dataValues[len++] = buffers.SCW021;
|
||
m_dataValues[len++] = buffers.SCW022;
|
||
m_dataValues[len++] = buffers.SCW023;
|
||
m_dataValues[len++] = buffers.SCW024;
|
||
m_dataValues[len++] = buffers.SCW025;
|
||
m_dataValues[len++] = buffers.SCW026;
|
||
m_dataValues[len++] = buffers.SCW027;
|
||
m_dataValues[len++] = buffers.SCW028;
|
||
m_dataValues[len++] = buffers.SCW029;
|
||
m_dataValues[len++] = buffers.SCW030;
|
||
m_dataValues[len++] = buffers.SCW031;
|
||
m_dataValues[len++] = buffers.SCW032;
|
||
m_dataValues[len++] = buffers.SCW033;
|
||
m_dataValues[len++] = buffers.SCW034;
|
||
m_dataValues[len++] = buffers.SCW035;
|
||
m_dataValues[len++] = buffers.SCW036;
|
||
m_dataValues[len++] = buffers.SCW037;
|
||
m_dataValues[len++] = buffers.SCW038;
|
||
m_dataValues[len++] = buffers.SCW039;
|
||
m_dataValues[len++] = buffers.SCW040;
|
||
m_dataValues[len++] = buffers.SCW041;
|
||
m_dataValues[len++] = buffers.wWecRunContion;
|
||
m_dataValues[len++] = buffers.wFaultInformation;
|
||
m_dataValues[len++] = buffers.wTriggerSCAdress;
|
||
m_dataValues[len++] = buffers.rWindSpeed_IEC;
|
||
m_dataValues[len++] = buffers.rWindSpeed_IEC_30sec;
|
||
m_dataValues[len++] = buffers.rWindSpeed_IEC_10min;
|
||
m_dataValues[len++] = buffers.rRotorSpeedPDM;
|
||
m_dataValues[len++] = buffers.rCurrentVibrationY_PCH;
|
||
m_dataValues[len++] = buffers.rCurrentVibrationZ_PCH;
|
||
m_dataValues[len++] = buffers.rFilteredVibrationY_PCH;
|
||
m_dataValues[len++] = buffers.rFilteredVibrationZ_PCH;
|
||
m_dataValues[len++] = buffers.rStandardDensity;
|
||
m_dataValues[len++] = buffers.rAirDensity;
|
||
m_dataValues[len++] = buffers.rWindSpeed_Local;
|
||
m_dataValues[len++] = buffers.rDynamicpowerCalcValue;
|
||
m_dataValues[len++] = buffers.rPowerSetpoint_flag;
|
||
m_dataValues[len++] = buffers.rPitchAngleBlade1;
|
||
m_dataValues[len++] = buffers.rPitchAngleBlade2;
|
||
m_dataValues[len++] = buffers.rPitchAngleBlade3;
|
||
m_dataValues[len++] = buffers.iPMMAxis1_Health;
|
||
m_dataValues[len++] = buffers.iPMMAxis2_Health;
|
||
m_dataValues[len++] = buffers.iPMMAxis3_Health;
|
||
m_dataValues[len++] = buffers.rPMMAxis1InternalR;
|
||
m_dataValues[len++] = buffers.rPMMAxis2InternalR;
|
||
m_dataValues[len++] = buffers.rPMMAxis3InternalR;
|
||
m_dataValues[len++] = buffers.rPMMAxis1ActualBatteryVoltage;
|
||
m_dataValues[len++] = buffers.rPMMAxis2ActualBatteryVoltage;
|
||
m_dataValues[len++] = buffers.rPMMAxis3ActualBatteryVoltage;
|
||
publishRealData(dataTime);
|
||
dataTime += 10;
|
||
}
|
||
}
|
||
fclose(pf);
|
||
remove(fileName);
|
||
clock_gettime(CLOCK_MONOTONIC, &end);
|
||
elapsed_time = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
|
||
vLog(LOG_DEBUG, "Elapsed time: %.6f seconds\n", elapsed_time);
|
||
}
|
||
}
|
||
|
||
// 关闭目录
|
||
closedir(dir);
|
||
|
||
return count;
|
||
}
|
||
|
||
int CBFFTPFile2ISSMQTTProcess::countFilesByTypeInDirectory(const char* directory, const char* fileExtension)
|
||
{
|
||
DIR *dir;
|
||
struct dirent *ent;
|
||
int count = 0;
|
||
|
||
// 打开目录
|
||
dir = opendir(directory);
|
||
if (dir == NULL) {
|
||
perror("无法打开目录");
|
||
return -1;
|
||
}
|
||
|
||
// 遍历目录
|
||
while ((ent = readdir(dir))) {
|
||
if (strcmp(ent->d_name, ".") == 0 || strcmp(ent->d_name, "..") == 0) {
|
||
continue;
|
||
}
|
||
|
||
// 判断文件类型
|
||
if (ent->d_type == DT_REG) {
|
||
// 使用fnmatch函数匹配文件扩展名
|
||
if (fnmatch(fileExtension, ent->d_name, 0) == 0) {
|
||
count++;
|
||
}
|
||
}
|
||
}
|
||
|
||
// 关闭目录
|
||
closedir(dir);
|
||
|
||
return count;
|
||
}
|