add sub modbus

This commit is contained in:
zhouhuang 2024-11-21 16:09:38 +08:00
parent 046527c76e
commit b469f8e3ab
5 changed files with 89 additions and 986 deletions

View File

@ -1,773 +0,0 @@
#include "bfftpfile2issmqtt.h"
#include "uuid/uuid.h"
#include <fstream>
#include <dirent.h>
#include <fnmatch.h>
static void on_connect_wrapper(struct mosquitto *mosq, void *obj, int reason_code)
{
if (obj == NULL) return;
CBFFTPFile2ISSMQTTProcess*mqtt = (CBFFTPFile2ISSMQTTProcess*)obj;
UNUSED(mosq);
mqtt->on_connect(reason_code);
}
static void on_disconnect_wrapper(struct mosquitto *mosq, void* obj, int reason_code)
{
if (obj == NULL) return;
CBFFTPFile2ISSMQTTProcess*mqtt = (CBFFTPFile2ISSMQTTProcess*)obj;
UNUSED(mosq);
mqtt->on_disconnect(reason_code);
}
static void on_publish_wrapper(struct mosquitto *mosq, void *obj, int mid)
{
if (obj == NULL) return;
CBFFTPFile2ISSMQTTProcess*mqtt = (CBFFTPFile2ISSMQTTProcess*)obj;
UNUSED(mosq);
mqtt->on_publish(mid);
}
static void on_subscribe_wrapper(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
{
if (obj == NULL) return;
CBFFTPFile2ISSMQTTProcess*mqtt = (CBFFTPFile2ISSMQTTProcess*)obj;
UNUSED(mosq);
mqtt->on_subscribe(mid, qos_count, granted_qos);
}
static void on_message_wrapper(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
{
if (obj == NULL) return;
CBFFTPFile2ISSMQTTProcess*mqtt = (CBFFTPFile2ISSMQTTProcess*)obj;
UNUSED(mosq);
mqtt->on_message(msg);
}
static void on_log_wrapper(struct mosquitto *mosq, void *obj, int level, const char *str)
{
if (obj == NULL) return;
CBFFTPFile2ISSMQTTProcess*mqtt = (CBFFTPFile2ISSMQTTProcess*)obj;
UNUSED(mosq);
mqtt->on_log(level, str);
}
CBFFTPFile2ISSMQTTProcessItem::CBFFTPFile2ISSMQTTProcessItem()
{
m_bAdded = FALSE;
memset(nodeId, 0, sizeof(nodeId));
memset(deviceId, 0, sizeof(deviceId));
memset(manufacturerId, 0, sizeof(manufacturerId));
memset(model, 0, sizeof(model));
}
CBFFTPFile2ISSMQTTProcessItem::~CBFFTPFile2ISSMQTTProcessItem()
{
}
void CBFFTPFile2ISSMQTTProcessItem::Attach(int uid, int sock, BYTE* clint_id, WORD peer_port)
{
CProcessItem::Attach(uid, sock, *(DWORD *)clint_id, peer_port);
}
void CBFFTPFile2ISSMQTTProcessItem::Release(void)
{
CProcessItem::Release();
}
CBFFTPFile2ISSMQTTProcess::CBFFTPFile2ISSMQTTProcess()
{
connect_begin = 0;
m_mid = 0;
last_sec = 0;
connected = FALSE;
issmqttOption.connectInterval = 30; //链接等待
issmqttOption.keepAliveInterval = 60; //心跳链接
issmqttOption.connectTimeout = 30; //链接等待超时
issmqttOption.publishInterval = 60; //数据发送间隔
strncpy(issmqttOption.username, "8E88A55D8E3A4A6BB1196089191494DD", sizeof(issmqttOption.username));
strncpy(issmqttOption.password, "?tChs9!s6KA?[g:078H4&xCW", sizeof(issmqttOption.password));
strncpy(issmqttOption.client_id, "D3332045Poutg", sizeof(issmqttOption.client_id));
m_irns = NULL;
m_dataValues = NULL;
m_nCount = 0;
m_terminalIrn = 0;
m_dataCount = 0;
}
CBFFTPFile2ISSMQTTProcess::~CBFFTPFile2ISSMQTTProcess()
{
}
CProcessItem * CBFFTPFile2ISSMQTTProcess::CreateItem(int ord)
{
return(dynamic_cast<CProcessItem *>(new CBFFTPFile2ISSMQTTProcessItem));
}
void CBFFTPFile2ISSMQTTProcess::DestroyItem(int ord, BOOLEAN bDeleted)
{
CBFFTPFile2ISSMQTTProcessItem *pItem = (CBFFTPFile2ISSMQTTProcessItem *)GetItem(ord);
if (pItem != NULL && !bDeleted)
{
delete pItem;
return CProcess::DestroyItem(ord, TRUE);
}
return CProcess::DestroyItem(ord, bDeleted);
}
BOOLEAN CBFFTPFile2ISSMQTTProcess::OnPreCreate(int id)
{
if (!CProcess::OnPreCreate(id)) return FALSE;
if (!GetOption(&issmqttOption, sizeof(issmqttOption))) return FALSE;
#if 1
//读取units.sta静态文件信息
int uid = GetUnitID(0);
if (uid < 0 || uid >= UNIT_NUM) return FALSE;
m_deviceID = static_units[uid].deviceId;
#endif
//第0个为本机节点IRN
m_terminalIrn = nodes.m_node[0].irn;
snprintf(publish_topic, sizeof(publish_topic), "USP/v100/U/%lld", m_terminalIrn);
snprintf(subscribe_topic, sizeof(subscribe_topic), "USP/v100/D/%lld", m_terminalIrn);
snprintf(m_file_dir, sizeof(m_file_dir), "%s/datas", configpath);
vLog(LOG_DEBUG, "file dir is: %s, size is: %d.\n", m_file_dir, sizeof(struFileData));
m_dataCount = GetUnitYCCount(uid);
if (m_dataCount)
{
m_irns = new Json::Int64[m_dataCount];
m_dataValues = new float[m_dataCount];
}
if (m_irns)
{
for (int i = 0; i < m_dataCount; i++)
{
m_irns[i] = (Json::Int64)GetUnitYCIRNByPoint(uid, i);
}
}
if (m_dataValues)
{
memset(m_dataValues, 0, sizeof(float) * m_dataCount);
}
//snprintf(issmqttOption.client_id, sizeof(issmqttOption.client_id), "%s", "bfftpfile");
mosquitto_lib_init();
m_mosq = mosquitto_new(issmqttOption.client_id, true, this);
if (m_mosq == NULL)
{
vLog(LOG_ERROR, "[BFFTPFILE2ISSMQTT] mosquitto_new() Out of memory.\n");
return FALSE;
}
mosquitto_username_pw_set(m_mosq, issmqttOption.username, issmqttOption.password);
mosquitto_connect_callback_set(m_mosq, on_connect_wrapper);
mosquitto_disconnect_callback_set(m_mosq, on_disconnect_wrapper);
mosquitto_publish_callback_set(m_mosq, on_publish_wrapper);
mosquitto_subscribe_callback_set(m_mosq, on_subscribe_wrapper);
mosquitto_message_callback_set(m_mosq, on_message_wrapper);
mosquitto_log_callback_set(m_mosq, on_log_wrapper);
int rc;
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] user: %s, pass: %s\n", issmqttOption.username, issmqttOption.password);
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] client_id: %s, device_id: %s\n", issmqttOption.client_id, issmqttOption.device_id);
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] host: %s:%d, keepAlive is: %d\n", issmqttOption.host, issmqttOption.target_port, issmqttOption.keepAliveInterval);
rc = mosquitto_connect_async(m_mosq, issmqttOption.host, issmqttOption.target_port, issmqttOption.keepAliveInterval);
if (rc != MOSQ_ERR_SUCCESS)
{
mosquitto_destroy(m_mosq);
vLog(LOG_ERROR, "[BFFTPFILE2ISSMQTT] mosquitto_connect() %d,%s\n", rc, mosquitto_strerror(rc));
return FALSE;
}
return TRUE;
}
void CBFFTPFile2ISSMQTTProcess::on_connect(int reason_code)
{
if (reason_code != MOSQ_ERR_SUCCESS)
{
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] MQTT连接失败%s\n", mosquitto_connack_string(reason_code));
mosquitto_disconnect(m_mosq);
connected = FALSE;
return;
}
int rc;
rc = mosquitto_subscribe(m_mosq, NULL, subscribe_topic, QOS_LEVEL_0);
if (rc != MOSQ_ERR_SUCCESS)
{
vLog(LOG_ERROR, "MQTT订阅下行主题失败%s\n", strerror(errno));
mosquitto_disconnect(m_mosq);
connected = FALSE;
return;
}
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] MQTT连接成功\n");
connect_begin = system32.timers;
//此处添加一个上电信息。
connected = TRUE;
}
void CBFFTPFile2ISSMQTTProcess::on_disconnect(int reason_code)
{
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] on_Disconnect: %d, %s\n", reason_code, mosquitto_reason_string(reason_code));
connected = FALSE;
}
void CBFFTPFile2ISSMQTTProcess::on_publish(int mid)
{
//vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] Message with mid %d has been published.\n", mid);
}
void CBFFTPFile2ISSMQTTProcess::on_subscribe(int mid, int qos_count, const int *granted_qos)
{
int i;
bool have_subscription = false;
for (i = 0; i < qos_count; i++)
{
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] on_subscribe: %d:granted qos = %d\n", i, granted_qos[i]);
if (granted_qos[i] <= 2)
{
have_subscription = true;
}
}
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] MQTT订阅成功\n");
if (have_subscription == false)
{
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] Error: All subscriptions rejected.\n");
mosquitto_disconnect(m_mosq);
}
}
void CBFFTPFile2ISSMQTTProcess::on_message(const struct mosquitto_message *msg)
{
Json::Value jsonRoot;
jsonRoot.clear();
char *buffer = new char[msg->payloadlen+1];
if (buffer == NULL) return;
memset(buffer, 0, msg->payloadlen+1);
strncpy(buffer, (char *)msg->payload, msg->payloadlen);
std::string err;
Json::CharReaderBuilder builder;
Json::CharReader* reader(builder.newCharReader());
if (!reader->parse(buffer, buffer + msg->payloadlen, &jsonRoot, &err))
{
delete buffer;
buffer = NULL;
return;
}
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] topic: %s, msg: %s\n", msg->topic, buffer);
OnReceiveSubscribeTopic(jsonRoot);
delete buffer;
buffer = NULL;
}
void CBFFTPFile2ISSMQTTProcess::on_log(int level, const char* str)
{
if (level == MOSQ_LOG_ERR) vLog(LOG_ERROR, "[MQTT] %s\n", str);
else if (level == MOSQ_LOG_WARNING) vLog(LOG_WARN, "[MQTT] %s\n", str);
else if (level == MOSQ_LOG_NOTICE) vLog(LOG_INFO, "[MQTT] %s\n", str);
else if (level == MOSQ_LOG_INFO) vLog(LOG_INFO, "[MQTT] %s\n", str);
}
void CBFFTPFile2ISSMQTTProcess::Destroy(void)
{
mosquitto_lib_cleanup();
CProcess::Destroy();
}
BOOLEAN CBFFTPFile2ISSMQTTProcess::Run(void)
{
if (!CProcess::Run()) return FALSE;
FeedDog();
int rc;
rc = mosquitto_loop(m_mosq, -1, 1);
if (rc != MOSQ_ERR_SUCCESS)
{
connected = false;
//vLog(LOG_ERROR, "Error: 循环处理MQTT消息失败%d,%s\n", rc, mosquitto_strerror(rc));
switch (rc)
{
case MOSQ_ERR_NOMEM:
case MOSQ_ERR_PROTOCOL:
case MOSQ_ERR_INVAL:
case MOSQ_ERR_NOT_FOUND:
case MOSQ_ERR_TLS:
case MOSQ_ERR_PAYLOAD_SIZE:
case MOSQ_ERR_NOT_SUPPORTED:
case MOSQ_ERR_AUTH:
case MOSQ_ERR_ACL_DENIED:
case MOSQ_ERR_UNKNOWN:
case MOSQ_ERR_EAI:
case MOSQ_ERR_PROXY:
//return FALSE;
case MOSQ_ERR_ERRNO:
break;
}
if (errno == EPROTO)
{
return FALSE;
}
//重新连接
if (system32.timers >= (connect_begin + issmqttOption.connectTimeout))
{
connect_begin = system32.timers;
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] start to connect broker.\n");
rc = mosquitto_reconnect_async(m_mosq);
}
}
return TRUE;
}
BOOLEAN CBFFTPFile2ISSMQTTProcess::OnTimer(void)
{
if (!CProcess::OnTimer()) return FALSE;
BOOLEAN sec_changed = FALSE;
m_nCount++;
if (last_sec != (time_t)system32.timers)
{
last_sec = system32.timers;
sec_changed = TRUE;
}
if (!connected)
{
return TRUE;
}
UnitFeedDog(GetCurUnitID());
//此处每5分钟更新一次数据
if (sec_changed)
{
int count = countFilesInDirectory(m_file_dir);
#if 0
if ((last_sec % issmqttOption.publishInterval) == 0)
{ //更新数据
publishRealData();
return TRUE;
}
#endif
if ((last_sec % 20) == 0)
{
publishMonHeartBeat();
return TRUE;
}
}
return TRUE;
}
BOOLEAN CBFFTPFile2ISSMQTTProcess::OnReceiveSubscribeTopic(Json::Value jsonRoot)
{
if (jsonRoot["command"].isNull()) return FALSE;
if (jsonRoot["payload"].isNull()) return FALSE;
std::string command = jsonRoot["command"].asString();
if (command == "CMD_CONTROL")
{
OnReceivedDeviceCommand(jsonRoot["payload"]);
}
#if 1
else if (command == "LINK_LOGMONITOR_ENABLE")
{
OnReceivedSystemAction(jsonRoot["payload"]);
}
#endif
return TRUE;
}
BOOLEAN CBFFTPFile2ISSMQTTProcess::OnReceivedDeviceCommand(Json::Value jsonRoot)
{
#if 0
if (jsonRoot["irn"].isNull()) return FALSE;
if (jsonRoot["operType"].isNull()) return FALSE;
if (jsonRoot["operValue"].isNull()) return FALSE;
int uid = GetCurUnitID();
Json::Int64 irn = jsonRoot["irn"].asInt64();
int operType = jsonRoot["operType"].asInt();
int operValue = jsonRoot["operValue"].asInt();
int point;
if (operType == CMD_CONTROL_OPERATION) //遥控
{
//根据irn来查找point
point = GetUnitYKPointByIRN(uid, irn);
if (point < 0)
{
vLog(LOG_ERROR, "未能找到对应的遥控点号,请检查并确认。\n");
return FALSE;
}
SetUnitYK(uid, point, (operValue & 0x03), YKS_SELREQ, YKR_IDLE);
vLog(LOG_WARN, "Unit(%d) yk(%d) %s state is YKS_SELREQ result is YKR_IDLE.\n", uid, point, ((operValue & 0x03) ? "CLOSE" : "TRIP"));
}
else if (operType == CMD_CONTROL_SETTING) //遥调
{
//根据irn来查找point
point = GetUnitYTPointByIRN(uid, irn);
if (point < 0)
{
vLog(LOG_ERROR, "未能找到对应的遥调点号,请检查并确认。\n");
return FALSE;
}
SetUnitYT(uid, point, operValue, YTS_EXEREQ, YTR_IDLE);
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] Unit(%d) set point(%d) %d state is YTS_EXEREQ result is YTR_IDLE.\n", uid, point, operValue);
}
else
{
vLog(LOG_ERROR, "平台下发的<%d>命令错误。operType不是<0-遥控或1-遥调>,系统不支持的命令!\n", operType);
return FALSE;
}
#endif
return TRUE;
}
BOOLEAN CBFFTPFile2ISSMQTTProcess::OnReceivedSystemAction(Json::Value jsonRoot)
{
#if 0
if (jsonRoot["linkIrn"].isNull()) return FALSE;
{ //链路日志监控开启
QLONG linkIrn = -1;
if (jsonRoot["linkIrn"].isInt64()) linkIrn = jsonRoot["linkIrn"].asInt64();
struMonLinkLog.m_iStart_Time = system32.timers;
struMonLinkLog.m_iLinkId = GetProcessIDByIRN(linkIrn);
struMonLinkLog.m_bEnable = TRUE;
struMonLinkLog.m_iLinkIrn = linkIrn;
channelBuffer.enabled = TRUE;
channelBuffer.mon_port = struMonLinkLog.m_iLinkId;
vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] receive action LINK_LOGMONITOR_ENABLE, irn is:%lld, pid is: %d\n", struMonLinkLog.m_iLinkIrn, struMonLinkLog.m_iLinkId);
}
#endif
return TRUE;
}
BOOLEAN CBFFTPFile2ISSMQTTProcess::publishFrame(const char* command, Json::Value payload)
{
Json::StreamWriterBuilder builder;
builder["indentation"] = "";
builder["emitUTF8"] = true;
builder["precision"] = 3;
Json::Value jsonRoot;
jsonRoot["command"] = command;
uuid_t uuid;
char str[128];
uuid_generate_time(uuid);
uuid_unparse_upper(uuid, str);
jsonRoot["traceId"] = str;
QLONG pushTime = system32.timers;
pushTime *= 1000;
pushTime += system32.now.millisecond % 1000;
jsonRoot["mtime"] = (Json::Int64)pushTime;
jsonRoot["payload"] = payload;
std::string outputConfig = Json::writeString(builder, jsonRoot);
int rc;
//vLog(LOG_DEBUG, "[BFFTPFILE2ISSMQTT] send topic: %s, payload: %d, %lld\n", publish_topic, outputConfig.length(), pushTime);
rc = mosquitto_publish(m_mosq, NULL, publish_topic, outputConfig.length(), outputConfig.c_str(), QOS_LEVEL_0, false);
if (rc != MOSQ_ERR_SUCCESS)
{
vLog(LOG_ERROR, "publishing topic: %s is error<%d,%s>。\n", publish_topic, rc, mosquitto_strerror(rc));
return FALSE;
}
return TRUE;
}
BOOLEAN CBFFTPFile2ISSMQTTProcess::publishRealData(Json::Int64 dataTime)
{
int i;
int uid;
int count = 0;
Json::Value jsonRoot;
Json::Value jsonItem;
Json::Value jsonValue;
for (i = 0; i < m_dataCount; i++)
{
jsonValue["irn"] = m_irns[i];
jsonValue["dataValue"] = m_dataValues[i];
jsonValue["dataTime"] = dataTime;
jsonItem.append(jsonValue);
}
jsonRoot["ycs"] = jsonItem;
return publishFrame("REAL_DATA", jsonRoot);
}
BOOLEAN CBFFTPFile2ISSMQTTProcess::publishMonHeartBeat(void)
{
int i, uid;
uid = GetCurUnitID();
if (uid < 0 || uid >= UNIT_NUM) return FALSE;
Json::Value jsonRoot;
Json::Value jsonItem;
Json::Value jsonValue;
for (i = 0; i < PROCESSES_NUM; i++)
{
if (config.processes[i].state == TRUE)
{
jsonValue["linkIrn"] = (Json::Int64)GetProcessIRNByPid(i);
jsonValue["online"] = (config.processes[i].softdog >= PROCESS_WATCHDOG_TIME) ? false : true;
jsonItem.append(jsonValue);
}
}
if (jsonItem.size() <= 0) return FALSE;
jsonRoot["ttl"] = 30000;
jsonRoot["status"] = 0;
jsonRoot["links"] = jsonItem;
return publishFrame("HEARTBEAT", jsonRoot);
}
int CBFFTPFile2ISSMQTTProcess::countFilesInDirectory(const char* directory)
{
DIR *dir;
struct dirent *ent;
int count = 0;
// 打开目录
dir = opendir(directory);
if (dir == NULL) {
perror("无法打开目录");
return -1;
}
// 遍历目录
while ((ent = readdir(dir))) {
if (strcmp(ent->d_name, ".") == 0 || strcmp(ent->d_name, "..") == 0) {
continue;
}
// 统计文件数量
if (ent->d_type == DT_REG) {
count++;
char fileName[MAX_PATH];
struct timespec start, end;
double elapsed_time;
//vLog(LOG_DEBUG, "start...\n");
clock_gettime(CLOCK_MONOTONIC, &start);
snprintf(fileName, sizeof(fileName), "%s/%s", m_file_dir, ent->d_name);
char name[MAX_PATH];
snprintf(name, sizeof(name), "%s", ent->d_name);
int year, month, day, hour, minute, second;
sscanf(name, "%04d-%02d-%02d-%02d%02d%02d", &year, &month, &day, &hour, &minute, &second);
//vLog(LOG_DEBUG, "%04d-%02d-%02d %02d:%02d:%02d\n", year, month, day, hour, minute, second);
time_t ctime;
struct tm ctm;
ctm.tm_year = year - 1900;
ctm.tm_mon = month - 1;
ctm.tm_mday = day;
ctm.tm_hour = hour;
ctm.tm_min = minute;
ctm.tm_sec = second;
ctm.tm_isdst = 0;
ctime = mktime(&ctm);
Json::Int64 dataTime = (Json::Int64)ctime;
dataTime *= 1000;
FILE* pf = fopen(fileName, "rb");
if (pf == NULL) continue;
while (!feof(pf))
{
struFileData buffers;
int pos = fread((void *)&buffers, 1, sizeof(buffers), pf);
if (pos)
{
int len = 0;
m_dataValues[len++] = buffers.iTurbineOperationMode;
m_dataValues[len++] = buffers.iBPLevel;
m_dataValues[len++] = buffers.iYPLevel;
m_dataValues[len++] = buffers.bDQ_NacelleStartYawCCW;
m_dataValues[len++] = buffers.bDQ_NacelleStartYawCW;
m_dataValues[len++] = buffers.rGenPowerForProcess;
m_dataValues[len++] = buffers.rGenPowerForProcess_1sec;
m_dataValues[len++] = buffers.rGenPowerForProcess_30sec;
m_dataValues[len++] = buffers.rGenSpeedPDM;
m_dataValues[len++] = buffers.rGenSpeedPDM_1sec;
m_dataValues[len++] = buffers.rGenSpeedPDM_30sec;
m_dataValues[len++] = buffers.rPitchAngle;
m_dataValues[len++] = buffers.rPitchAngle_1sec;
m_dataValues[len++] = buffers.rPitchAngle_30sec;
m_dataValues[len++] = buffers.rWindSpeed;
m_dataValues[len++] = buffers.rWindSpeed_1sec;
m_dataValues[len++] = buffers.rWindSpeed_30sec;
m_dataValues[len++] = buffers.rCosPhi_KL3403;
m_dataValues[len++] = buffers.rReactivePower_KL3403;
m_dataValues[len++] = buffers.rFrequency_690V_KL3403;
m_dataValues[len++] = buffers.rUL1_690V_KL3403;
m_dataValues[len++] = buffers.rUL2_690V_KL3403;
m_dataValues[len++] = buffers.rUL3_690V_KL3403;
m_dataValues[len++] = buffers.rIL1_690V_KL3403;
m_dataValues[len++] = buffers.rIL2_690V_KL3403;
m_dataValues[len++] = buffers.rIL3_690V_KL3403;
m_dataValues[len++] = buffers.rActivePowerSetPointValue;
m_dataValues[len++] = buffers.rSetValueGenSpeed;
m_dataValues[len++] = buffers.rSetValuePitchAngle;
m_dataValues[len++] = buffers.rNacellePositionLtd;
m_dataValues[len++] = buffers.rCableTwistTotal;
m_dataValues[len++] = buffers.rNacellePositionTotal;
m_dataValues[len++] = buffers.rVaneDirection_1sec;
m_dataValues[len++] = buffers.rAvailabillityToday;
m_dataValues[len++] = buffers.rAvailabillityTotal;
m_dataValues[len++] = buffers.rCosPhiSetValue;
m_dataValues[len++] = buffers.iPAR_iKWhOverall;
m_dataValues[len++] = buffers.iPAR_iKWhThisDay;
m_dataValues[len++] = buffers.wOperationHoursOverall;
m_dataValues[len++] = buffers.wOperationHoursDay;
m_dataValues[len++] = buffers.SCW001;
m_dataValues[len++] = buffers.SCW002;
m_dataValues[len++] = buffers.SCW003;
m_dataValues[len++] = buffers.SCW004;
m_dataValues[len++] = buffers.SCW005;
m_dataValues[len++] = buffers.SCW006;
m_dataValues[len++] = buffers.SCW007;
m_dataValues[len++] = buffers.SCW008;
m_dataValues[len++] = buffers.SCW009;
m_dataValues[len++] = buffers.SCW010;
m_dataValues[len++] = buffers.SCW011;
m_dataValues[len++] = buffers.SCW012;
m_dataValues[len++] = buffers.SCW013;
m_dataValues[len++] = buffers.SCW014;
m_dataValues[len++] = buffers.SCW015;
m_dataValues[len++] = buffers.SCW016;
m_dataValues[len++] = buffers.SCW017;
m_dataValues[len++] = buffers.SCW018;
m_dataValues[len++] = buffers.SCW019;
m_dataValues[len++] = buffers.SCW020;
m_dataValues[len++] = buffers.SCW021;
m_dataValues[len++] = buffers.SCW022;
m_dataValues[len++] = buffers.SCW023;
m_dataValues[len++] = buffers.SCW024;
m_dataValues[len++] = buffers.SCW025;
m_dataValues[len++] = buffers.SCW026;
m_dataValues[len++] = buffers.SCW027;
m_dataValues[len++] = buffers.SCW028;
m_dataValues[len++] = buffers.SCW029;
m_dataValues[len++] = buffers.SCW030;
m_dataValues[len++] = buffers.SCW031;
m_dataValues[len++] = buffers.SCW032;
m_dataValues[len++] = buffers.SCW033;
m_dataValues[len++] = buffers.SCW034;
m_dataValues[len++] = buffers.SCW035;
m_dataValues[len++] = buffers.SCW036;
m_dataValues[len++] = buffers.SCW037;
m_dataValues[len++] = buffers.SCW038;
m_dataValues[len++] = buffers.SCW039;
m_dataValues[len++] = buffers.SCW040;
m_dataValues[len++] = buffers.SCW041;
m_dataValues[len++] = buffers.wWecRunContion;
m_dataValues[len++] = buffers.wFaultInformation;
m_dataValues[len++] = buffers.wTriggerSCAdress;
m_dataValues[len++] = buffers.rWindSpeed_IEC;
m_dataValues[len++] = buffers.rWindSpeed_IEC_30sec;
m_dataValues[len++] = buffers.rWindSpeed_IEC_10min;
m_dataValues[len++] = buffers.rRotorSpeedPDM;
m_dataValues[len++] = buffers.rCurrentVibrationY_PCH;
m_dataValues[len++] = buffers.rCurrentVibrationZ_PCH;
m_dataValues[len++] = buffers.rFilteredVibrationY_PCH;
m_dataValues[len++] = buffers.rFilteredVibrationZ_PCH;
m_dataValues[len++] = buffers.rStandardDensity;
m_dataValues[len++] = buffers.rAirDensity;
m_dataValues[len++] = buffers.rWindSpeed_Local;
m_dataValues[len++] = buffers.rDynamicpowerCalcValue;
m_dataValues[len++] = buffers.rPowerSetpoint_flag;
m_dataValues[len++] = buffers.rPitchAngleBlade1;
m_dataValues[len++] = buffers.rPitchAngleBlade2;
m_dataValues[len++] = buffers.rPitchAngleBlade3;
m_dataValues[len++] = buffers.iPMMAxis1_Health;
m_dataValues[len++] = buffers.iPMMAxis2_Health;
m_dataValues[len++] = buffers.iPMMAxis3_Health;
m_dataValues[len++] = buffers.rPMMAxis1InternalR;
m_dataValues[len++] = buffers.rPMMAxis2InternalR;
m_dataValues[len++] = buffers.rPMMAxis3InternalR;
m_dataValues[len++] = buffers.rPMMAxis1ActualBatteryVoltage;
m_dataValues[len++] = buffers.rPMMAxis2ActualBatteryVoltage;
m_dataValues[len++] = buffers.rPMMAxis3ActualBatteryVoltage;
publishRealData(dataTime);
dataTime += 10;
}
}
fclose(pf);
remove(fileName);
clock_gettime(CLOCK_MONOTONIC, &end);
elapsed_time = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
vLog(LOG_DEBUG, "Elapsed time: %.6f seconds\n", elapsed_time);
}
}
// 关闭目录
closedir(dir);
return count;
}
int CBFFTPFile2ISSMQTTProcess::countFilesByTypeInDirectory(const char* directory, const char* fileExtension)
{
DIR *dir;
struct dirent *ent;
int count = 0;
// 打开目录
dir = opendir(directory);
if (dir == NULL) {
perror("无法打开目录");
return -1;
}
// 遍历目录
while ((ent = readdir(dir))) {
if (strcmp(ent->d_name, ".") == 0 || strcmp(ent->d_name, "..") == 0) {
continue;
}
// 判断文件类型
if (ent->d_type == DT_REG) {
// 使用fnmatch函数匹配文件扩展名
if (fnmatch(fileExtension, ent->d_name, 0) == 0) {
count++;
}
}
}
// 关闭目录
closedir(dir);
return count;
}

View File

@ -1,194 +0,0 @@
#ifndef _BFFTPFILE_ISSMQTT_PROCESS_H_
#define _BFFTPFILE_ISSMQTT_PROCESS_H_
#include "process.h"
#include "mqtt/mosquitto.h"
#include "json/json.h"
#pragma pack(1)
typedef struct {
short iTurbineOperationMode;
short iBPLevel;
short iYPLevel;
BYTE bDQ_NacelleStartYawCCW;
BYTE bDQ_NacelleStartYawCW;
float rGenPowerForProcess;
float rGenPowerForProcess_1sec;
float rGenPowerForProcess_30sec;
float rGenSpeedPDM;
float rGenSpeedPDM_1sec;
float rGenSpeedPDM_30sec;
float rPitchAngle;
float rPitchAngle_1sec;
float rPitchAngle_30sec;
float rWindSpeed;
float rWindSpeed_1sec;
float rWindSpeed_30sec;
float rCosPhi_KL3403;
float rReactivePower_KL3403;
float rFrequency_690V_KL3403;
float rUL1_690V_KL3403;
float rUL2_690V_KL3403;
float rUL3_690V_KL3403;
float rIL1_690V_KL3403;
float rIL2_690V_KL3403;
float rIL3_690V_KL3403;
float rActivePowerSetPointValue;
float rSetValueGenSpeed;
float rSetValuePitchAngle;
float rNacellePositionLtd;
float rCableTwistTotal;
float rNacellePositionTotal;
float rVaneDirection_1sec;
float rAvailabillityToday;
float rAvailabillityTotal;
float rCosPhiSetValue;
LONG iPAR_iKWhOverall;
LONG iPAR_iKWhThisDay;
DWORD wOperationHoursOverall;
DWORD wOperationHoursDay;
WORD SCW001;
WORD SCW002;
WORD SCW003;
WORD SCW004;
WORD SCW005;
WORD SCW006;
WORD SCW007;
WORD SCW008;
WORD SCW009;
WORD SCW010;
WORD SCW011;
WORD SCW012;
WORD SCW013;
WORD SCW014;
WORD SCW015;
WORD SCW016;
WORD SCW017;
WORD SCW018;
WORD SCW019;
WORD SCW020;
WORD SCW021;
WORD SCW022;
WORD SCW023;
WORD SCW024;
WORD SCW025;
WORD SCW026;
WORD SCW027;
WORD SCW028;
WORD SCW029;
WORD SCW030;
WORD SCW031;
WORD SCW032;
WORD SCW033;
WORD SCW034;
WORD SCW035;
WORD SCW036;
WORD SCW037;
WORD SCW038;
WORD SCW039;
WORD SCW040;
WORD SCW041;
DWORD wWecRunContion;
DWORD wFaultInformation;
DWORD wTriggerSCAdress;
float rWindSpeed_IEC;
float rWindSpeed_IEC_30sec;
float rWindSpeed_IEC_10min;
float rRotorSpeedPDM;
float rCurrentVibrationY_PCH;
float rCurrentVibrationZ_PCH;
float rFilteredVibrationY_PCH;
float rFilteredVibrationZ_PCH;
float rStandardDensity;
float rAirDensity;
float rWindSpeed_Local;
float rDynamicpowerCalcValue;
float rPowerSetpoint_flag;
float rPitchAngleBlade1;
float rPitchAngleBlade2;
float rPitchAngleBlade3;
float iPMMAxis1_Health;
float iPMMAxis2_Health;
float iPMMAxis3_Health;
float rPMMAxis1InternalR;
float rPMMAxis2InternalR;
float rPMMAxis3InternalR;
float rPMMAxis1ActualBatteryVoltage;
float rPMMAxis2ActualBatteryVoltage;
float rPMMAxis3ActualBatteryVoltage;
} struFileData;
#pragma pack()
class CBFFTPFile2ISSMQTTProcessItem : public CProcessItem
{
public:
CBFFTPFile2ISSMQTTProcessItem();
virtual ~CBFFTPFile2ISSMQTTProcessItem();
virtual void Attach(int uid, int sock, BYTE* peer_addr, WORD peer_port);
virtual void Release(void);
public:
BOOLEAN m_bAdded; //设备已经添加
char nodeId[MAX_ID_LENGTH];
char deviceId[MAX_ID_LENGTH];
char manufacturerId[MAX_ID_LENGTH];
char model[MAX_ID_LENGTH];
};
class CBFFTPFile2ISSMQTTProcess : public CProcess
{
private:
time_t last_sec;
int m_mid;
LONG connect_begin;
DWORD m_nCount;
DWORD m_last_sec;
char m_file_dir[MAX_PATH];
int countFilesInDirectory(const char*);
int countFilesByTypeInDirectory(const char*, const char*);
public:
struct mosquitto* m_mosq;
struISSMQTTOption issmqttOption;
std::string m_deviceID;
char subscribe_topic[256];
char publish_topic[256];
private:
BOOLEAN connected;
Json::Int64 m_terminalIrn;
Json::Int64 *m_irns;
short m_dataCount;
float *m_dataValues;
public:
CBFFTPFile2ISSMQTTProcess();
virtual ~CBFFTPFile2ISSMQTTProcess();
BOOLEAN OnPreCreate(int id);
void Destroy(void);
BOOLEAN Run(void);
BOOLEAN OnTimer(void);
virtual CProcessItem *CreateItem(int ord);
virtual void DestroyItem(int ord, BOOLEAN bDeleted = FALSE);
BOOLEAN OnReceiveSubscribeTopic(Json::Value);
BOOLEAN OnReceivedDeviceCommand(Json::Value);
BOOLEAN OnReceivedSystemAction(Json::Value);
BOOLEAN publishFrame(const char*, Json::Value);
BOOLEAN publishRealData(Json::Int64);
BOOLEAN publishMonHeartBeat(void);
void on_connect(int);
void on_disconnect(int);
void on_publish(int);
void on_subscribe(int, int, const int*);
void on_message(const struct mosquitto_message*);
void on_log(int, const char*);
};
#endif //_BFFTPFILE_ISSMQTT_PROCESS_H_

View File

@ -2,6 +2,7 @@
#include "../hostiec104/host_iec104.h"
#include "../subiec104/sub_iec104.h"
#include "../submodbustcp/sub_modbus_tcp.h"
#include "../hostmodbusrtu/host_modbus_rtu.h"
#include "../hostmodbustcp/host_modbus_tcp.h"
#include "../rtustatusproc/rtustatus.h"
@ -354,6 +355,11 @@ void CChangeMaster::StartUp(void)
vLog(LOG_INFO, "协议<%d>创建为: iec104从协议.\n", i);
procs[i] = new CSubIEC104Process();
}
else if (PROTOCOL_SUB_MODBUS_TCP == config.processes[i].proto)
{
vLog(LOG_INFO, "协议<%d>创建为: modbustcp从协议.\n", i);
procs[i] = new CSubModbusTcpProcess();
}
else if (PROTOCOL_LOCAL_DEBUG == config.processes[i].proto)
{//Local debug
vLog(LOG_INFO, "协议<%d>创建为: 本地调试协议.\n", i);

View File

@ -1075,7 +1075,7 @@ BOOLEAN CRYDevice::processUartParam(const Json::Value jsonRoot, int ord)
return TRUE;
}
BOOLEAN CRYDevice::processModbustcpParam(const Json::Value jsonRoot, int pid)
BOOLEAN CRYDevice::processHostModbustcpParam(const Json::Value jsonRoot, int pid)
{
if (pid < 0 || pid >= PROCESSES_NUM) return FALSE;
@ -1104,6 +1104,35 @@ BOOLEAN CRYDevice::processModbustcpParam(const Json::Value jsonRoot, int pid)
return TRUE;
}
BOOLEAN CRYDevice::processSubModbustcpParam(const Json::Value jsonRoot, int pid)
{
if (pid < 0 || pid >= PROCESSES_NUM) return FALSE;
config_config.processes[pid].option.network.ignored_source = TRUE;
config_config.processes[pid].option.network.socket_type = SOCK_STREAM;
config_config.processes[pid].option.network.bind_addr = INADDR_ANY;
config_config.processes[pid].option.network.bind_port = 502;
config_config.processes[pid].option.network.target_addr = INADDR_ANY;
config_config.processes[pid].option.network.target_port = 0;
if (jsonRoot["bindAddr"].isInt()) {
config_config.processes[pid].option.network.bind_addr = jsonRoot["bindAddr"].asInt();
} else if (jsonRoot["bindAddr"].isString()) {
if (inet_pton(AF_INET, jsonRoot["bindAddr"].asCString(), &config_config.processes[pid].option.network.bind_addr) == 1) {
vLog(LOG_DEBUG, "IPv4 地址转换成功,网络字节序为: %u.\n", config_config.processes[pid].option.network.bind_addr);
} else {
vLog(LOG_ERROR, "inet_pton error(%d,%s).\n", errno, strerror(errno));
}
}
if (jsonRoot["bindPort"].isInt()) {
config_config.processes[pid].option.network.bind_port = jsonRoot["bindPort"].asInt();
} else if (jsonRoot["bindPort"].isString()) {
config_config.processes[pid].option.network.bind_port = atoi(jsonRoot["bindPort"].asCString());
}
return TRUE;
}
BOOLEAN CRYDevice::processRymodbustcpParam(const Json::Value jsonRoot, int pid)
{
if (pid < 0 || pid >= PROCESSES_NUM) return FALSE;
@ -1139,20 +1168,20 @@ BOOLEAN CRYDevice::processRymodbustcpParam(const Json::Value jsonRoot, int pid)
config_config.processes[pid].option.rymodbus.bHaveFTP = TRUE;
}
//用户名
if (jsonRoot["userName"].isString()) {
snprintf(config_config.processes[pid].option.rymodbus.ftp.user, sizeof(config_config.processes[pid].option.rymodbus.ftp.user), "%s", jsonRoot["userName"].asCString());
if (jsonRoot["ftpUser"].isString()) {
snprintf(config_config.processes[pid].option.rymodbus.ftp.user, sizeof(config_config.processes[pid].option.rymodbus.ftp.user), "%s", jsonRoot["ftpUser"].asCString());
} else { //默认存在允许ftp功能
snprintf(config_config.processes[pid].option.rymodbus.ftp.user, sizeof(config_config.processes[pid].option.rymodbus.ftp.user), "%s", "administrator");
}
//密码
if (jsonRoot["passWord"].isString()) {
snprintf(config_config.processes[pid].option.rymodbus.ftp.password, sizeof(config_config.processes[pid].option.rymodbus.ftp.password), "%s", jsonRoot["passWord"].asCString());
if (jsonRoot["ftpPassword"].isString()) {
snprintf(config_config.processes[pid].option.rymodbus.ftp.password, sizeof(config_config.processes[pid].option.rymodbus.ftp.password), "%s", jsonRoot["ftpPassword"].asCString());
} else { //默认存在允许ftp功能
snprintf(config_config.processes[pid].option.rymodbus.ftp.password, sizeof(config_config.processes[pid].option.rymodbus.ftp.password), "%s", "123456");
}
//远程路径
if (jsonRoot["remotePath"].isString()) {
snprintf(config_config.processes[pid].option.rymodbus.ftp.remotePath, sizeof(config_config.processes[pid].option.rymodbus.ftp.remotePath), "%s", jsonRoot["remotePath"].asCString());
if (jsonRoot["rootDir"].isString()) {
snprintf(config_config.processes[pid].option.rymodbus.ftp.remotePath, sizeof(config_config.processes[pid].option.rymodbus.ftp.remotePath), "%s", jsonRoot["rootDir"].asCString());
} else { //默认存在允许ftp功能
snprintf(config_config.processes[pid].option.rymodbus.ftp.remotePath, sizeof(config_config.processes[pid].option.rymodbus.ftp.remotePath), "%s", "Hard Disk2/data/rtdatalog");
}
@ -1361,13 +1390,13 @@ BOOLEAN CRYDevice::processSubIEC104ProcessParam(const Json::Value jsonRoot, int
#define POINT_TYPE_YK 3
#define POINT_TYPE_YT 4
BOOLEAN CRYDevice::processModbusPointParam(const Json::Value jsonRoot, int uid, int point, int type)
BOOLEAN CRYDevice::processHostModbusPointParam(const Json::Value jsonRoot, int uid, int point, int type)
{
if (uid < 0 || uid >= UNIT_NUM) return FALSE;
if (point < 0) return FALSE;
//vLog(LOG_DEBUG, "process unit(%d), point(%d), and type(%d)\n", uid, point, type);
//vLog(LOG_DEBUG, "yx count is: %d, and yc count is: %d\n", config_config.units[uid].yxcount, config_config.units[uid].yccount);
vLog(LOG_DEBUG, "%s", jsonRoot.toStyledString().c_str());
//vLog(LOG_DEBUG, "%s", jsonRoot.toStyledString().c_str());
switch (type)
{
case POINT_TYPE_YX:
@ -1755,6 +1784,34 @@ BOOLEAN CRYDevice::processModbusPointParam(const Json::Value jsonRoot, int uid,
return TRUE;
}
BOOLEAN CRYDevice::processHostModbusPointParam(const Json::Value jsonRoot, int uid, int point, int type)
{
if (uid < 0 || uid >= UNIT_NUM) return FALSE;
if (point < 0) return FALSE;
vLog(LOG_DEBUG, "%s", jsonRoot.toStyledString().c_str());
switch (type)
{
case POINT_TYPE_YX:
if (point >= config_config.units[uid].yxcount) return FALSE;
break;
case POINT_TYPE_YC:
if (point >= config_config.units[uid].yccount) return FALSE;
break;
case POINT_TYPE_YM:
if (point >= config_config.units[uid].ymcount) return FALSE;
break;
case POINT_TYPE_YK:
if (point >= config_config.units[uid].ykcount) return FALSE;
break;
case POINT_TYPE_YT:
if (point >= config_config.units[uid].ytcount) return FALSE;
break;
default: break;
}
return TRUE;
}
bool CRYDevice::dealConfigFile(const Json::Value jsonRoot)
{
bool result = false;
@ -1822,7 +1879,7 @@ bool CRYDevice::dealConfigFile(const Json::Value jsonRoot)
config_config.processes[i].order = uartId++;
break;
case PROTOCOL_HOST_MODBUS_TCP:
processModbustcpParam(params, i);
processHostModbustcpParam(params, i);
break;
case PROTOCOL_HOST_BF_MODBUSTCP:
processRymodbustcpParam(params, i);
@ -1836,6 +1893,9 @@ bool CRYDevice::dealConfigFile(const Json::Value jsonRoot)
case PROTOCOL_SUB_IEC104:
processSubIEC104ProcessParam(params, i);
break;
case PROTOCOL_SUB_MODBUS_TCP:
processSubModbustcpParam(params, i);
break;
default:
break;
}
@ -2033,10 +2093,13 @@ bool CRYDevice::dealConfigFile(const Json::Value jsonRoot)
case PROTOCOL_HOST_MODBUS_TCP:
case PROTOCOL_HOST_MODBUS_RTU_TCP:
case PROTOCOL_HOST_BF_MODBUSTCP:
processModbusPointParam(param, uid, k, POINT_TYPE_YC);
processHostModbusPointParam(param, uid, k, POINT_TYPE_YC);
break;
case PROTOCOL_HOST_IEC104:
break;
case PROTOCOL_SUB_MODBUS_TCP:
processSubModbusPointParam(param, uid, k, POINT_TYPE_YC);
break;
}
}
if (config_config.units[uid].type == MASTER_UNIT) {
@ -2068,7 +2131,7 @@ bool CRYDevice::dealConfigFile(const Json::Value jsonRoot)
case PROTOCOL_HOST_MODBUS_TCP:
case PROTOCOL_HOST_MODBUS_RTU_TCP:
case PROTOCOL_HOST_BF_MODBUSTCP:
processModbusPointParam(param, uid, k, POINT_TYPE_YM);
processHostModbusPointParam(param, uid, k, POINT_TYPE_YM);
break;
case PROTOCOL_HOST_IEC104:
break;
@ -2101,7 +2164,7 @@ bool CRYDevice::dealConfigFile(const Json::Value jsonRoot)
case PROTOCOL_HOST_MODBUS_TCP:
case PROTOCOL_HOST_MODBUS_RTU_TCP:
case PROTOCOL_HOST_BF_MODBUSTCP:
processModbusPointParam(param, uid, k, POINT_TYPE_YK);
processHostModbusPointParam(param, uid, k, POINT_TYPE_YK);
break;
case PROTOCOL_HOST_IEC104:
break;
@ -2129,7 +2192,7 @@ bool CRYDevice::dealConfigFile(const Json::Value jsonRoot)
case PROTOCOL_HOST_MODBUS_TCP:
case PROTOCOL_HOST_MODBUS_RTU_TCP:
case PROTOCOL_HOST_BF_MODBUSTCP:
processModbusPointParam(param, uid, k, POINT_TYPE_YT);
processHostModbusPointParam(param, uid, k, POINT_TYPE_YT);
break;
case PROTOCOL_HOST_IEC104:
break;
@ -2162,7 +2225,7 @@ bool CRYDevice::dealConfigFile(const Json::Value jsonRoot)
case PROTOCOL_HOST_MODBUS_TCP:
case PROTOCOL_HOST_MODBUS_RTU_TCP:
case PROTOCOL_HOST_BF_MODBUSTCP:
processModbusPointParam(param, uid, k, POINT_TYPE_YX);
processHostModbusPointParam(param, uid, k, POINT_TYPE_YX);
break;
case PROTOCOL_HOST_IEC104:
break;
@ -2259,8 +2322,7 @@ void CRYDevice::on_message(const char *msg, const int size)
}
void CRYDevice::heart_beat(int status)
{
//发送心跳报文
{ //发送心跳报文
Json::Value payload;
payload["ttl"] = 30000;
payload["status"] = status;

View File

@ -126,12 +126,14 @@ private:
int MakeYTFrame(int);
bool OnReceivedDeviceCommand(const Json::Value);
BOOLEAN processUartParam(const Json::Value, int);
BOOLEAN processModbustcpParam(const Json::Value, int);
BOOLEAN processHostModbustcpParam(const Json::Value, int);
BOOLEAN processSubModbustcpParam(const Json::Value, int);
BOOLEAN processRymodbustcpParam(const Json::Value, int);
BOOLEAN processRyFTP2MinioParam(const Json::Value, int);
BOOLEAN processHostIEC104ProcessParam(const Json::Value, int);
BOOLEAN processSubIEC104ProcessParam(const Json::Value, int);
BOOLEAN processModbusPointParam(const Json::Value, int, int, int);
BOOLEAN processHostModbusPointParam(const Json::Value, int, int, int);
BOOLEAN processSubModbusPointParam(const Json::Value, int, int, int);
bool dealConfigFile(const Json::Value);
bool OnReceivedSystemAction(const std::string, const std::string, const Json::Value);
void on_message(const char*, const int);