map/das-dn/cmg/main.cpp
2024-08-09 08:50:19 +08:00

2459 lines
90 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
* File: main.cpp
* Author: zhouhuang
*
* Created on October 10, 2015, 3:17 PM
*/
#include <cstdlib>
#include <signal.h>
#include <getopt.h>
#include "changemaster.h"
#include "process.h"
#if 1
#include <fstream>
#include <string>
#include <map>
#include <unordered_map>
#include <libgen.h>
#include <dirent.h>
#include <fnmatch.h>
#include <signal.h>
#if 1
#include <nopoll.h>
#include <nopoll_decl.h>
#include "KompexSQLitePrerequisites.h"
#include "KompexSQLiteDatabase.h"
#include "KompexSQLiteStatement.h"
#include "KompexSQLiteException.h"
#include "KompexSQLiteStreamRedirection.h"
#include "KompexSQLiteBlob.h"
using namespace Kompex;
#else
#include <libwebsockets.h>
#endif
#include <json.h>
#if 0
#include <uuid.h>
#endif
#include <md5.h>
#include <base64.h>
#include <mbedtls/aes.h>
#include <mbedtls/error.h>
#include "soe.h"
#include "yklog.h"
#include "ytlog.h"
#include "public.h"
#define MAX_MSG_COUNT 4096
bool g_dataAcquisitionReload = false;
std::string g_traceId;
LONG m_soeload = 0;
LONG m_yxbwload = 0;
LONG m_ycbwload = 0;
typedef std::vector<std::string> stringvector;
stringvector m_gLinkIDs; //链路名和Id
//typedef std::unordered_map<std::string, int> name2intmap;
typedef struct {
short uid;
std::string address;
} struct_uidaddress;
typedef std::unordered_map<std::string, struct_uidaddress> name2unitmap;
//name2intmap unitname2uid_map;
#define CMD_CONTROL_OPERATION 0
#define CMD_CONTROL_SETTING 1
typedef struct {
int uid;
int point;
int order;
int type;
} struct_service_item;
typedef std::unordered_map<std::string, struct_service_item> name2servicemap;
typedef std::unordered_map<std::string, name2servicemap> unitname2servicemap;
unitname2servicemap unitname2service_map;
typedef struct {
std::string name;
Json::Value value;
} struct_attr;
typedef std::vector<struct_attr> attrvector;
SQLiteDatabase m_database;
#if 0
struct {
Json::Int64 m_iLinkIrn;
time_t m_iStartTime;
} struMonLinkLog;
#endif
#if 1
#define WORKER_ID_BITS 5
#define DATA_CENTER_ID_BITS 5
#define SEQUENCE_BITS 12
#define MAX_WORKER_ID ((1 << WORKER_ID_BITS) - 1)
#define MAX_DATA_CENTER_ID ((1 << DATA_CENTER_ID_BITS) - 1)
#define SEQUENCE_MASK ((1 << SEQUENCE_BITS) - 1)
#define EPOCH 1640995200000 // 2022-01-01 00:00:00
typedef struct {
uint64_t worker_id;
uint64_t data_center_id;
uint64_t sequence;
uint64_t last_timestamp;
} Snowflake;
uint64_t current_time() {
struct timeval tv;
gettimeofday(&tv, NULL);
return (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
}
uint64_t snowflake_next_id(Snowflake *sf) {
uint64_t timestamp = current_time();
if (timestamp < sf->last_timestamp) {
return 0;
}
if (sf->last_timestamp == timestamp) {
sf->sequence = (sf->sequence + 1) & SEQUENCE_MASK;
if (sf->sequence == 0) {
while (current_time() <= sf->last_timestamp);
}
} else {
sf->sequence = 0; // reset sequence
}
sf->last_timestamp = timestamp;
return ((timestamp - EPOCH) << (WORKER_ID_BITS + DATA_CENTER_ID_BITS + SEQUENCE_BITS)) |
(sf->data_center_id << (WORKER_ID_BITS + SEQUENCE_BITS)) |
(sf->worker_id << SEQUENCE_BITS) |
sf->sequence;
}
Snowflake g_sf = {1, 1, 0, 0};
#endif
std::vector<std::string> split(const std::string &s, char delimiter)
{
std::vector<std::string> tokens;
std::istringstream tokenStream(s);
std::string token;
while (std::getline(tokenStream, token, delimiter)) {
tokens.push_back(token);
}
return tokens;
}
struSystem config_system32;
struConfig config_config;
struDatabase config_database;
struNodeOption config_nodes;
struUnitStatic config_static_units[UNIT_NUM];
bool configInitializeMemory(void)
{
int i, j;
if (config_database.yxs) delete [] config_database.yxs;
if (config_database.ycs) delete [] config_database.ycs;
if (config_database.yms) delete [] config_database.yms;
if (config_database.yks) delete [] config_database.yks;
if (config_database.yts) delete [] config_database.yts;
for (i = 0; i < UNIT_NUM; i++) {
if (config_config.units[i].yxs) delete [] config_config.units[i].yxs;
if (config_config.units[i].ycs) delete [] config_config.units[i].ycs;
if (config_config.units[i].yms) delete [] config_config.units[i].yms;
if (config_config.units[i].yks) delete [] config_config.units[i].yks;
if (config_config.units[i].yts) delete [] config_config.units[i].yts;
}
memset(&config_system32, 0, sizeof(config_system32));
memset(&config_config, 0, sizeof(config_config));
memset(&config_database, 0, sizeof(config_database));
memset(&config_nodes, 0, sizeof(config_nodes));
memset(config_static_units, 0, sizeof(config_static_units));
config_database.yxs = new struYX[DATABASE_YX_NUM];
if (config_database.yxs == NULL) return FALSE;
memset(config_database.yxs, 0, sizeof(struYX) * DATABASE_YX_NUM);
config_database.ycs = new struYC[DATABASE_YC_NUM];
if (config_database.ycs == NULL) return FALSE;
memset(config_database.ycs, 0, sizeof(struYC) * DATABASE_YC_NUM);
config_database.yms = new struYM[DATABASE_YM_NUM];
if (config_database.yms == NULL) return FALSE;
memset(config_database.yms, 0, sizeof(struYM) * DATABASE_YM_NUM);
config_database.yks = new struYK[DATABASE_YK_NUM];
if (config_database.yks == NULL) return FALSE;
memset(config_database.yks, 0, sizeof(struYK) * DATABASE_YK_NUM);
config_database.yts = new struYT[DATABASE_YT_NUM];
if (config_database.yts == NULL) return FALSE;
memset(config_database.yts, 0, sizeof(struYT) * DATABASE_YT_NUM);
strcpy(config_system32.yk_pass, "12345");
snprintf(config_system32.version, sizeof(config_system32.version), "%s", "0");
config_system32.yk_keep = 30;
config_system32.interval_time = 2;
config_system32.log_enabled = FALSE;
config_system32.zjd_log_bind_addr = INADDR_ANY;
config_system32.zjd_log_bind_port = 0;
for (i = 0; i < HARDWARE_PORTS_NUM; i++) {
config_config.hardware.ports[i].baud = 9600;
config_config.hardware.ports[i].data = 8;
config_config.hardware.ports[i].parity = 0;
config_config.hardware.ports[i].stop = 0;
config_config.hardware.ports[i].timeout = 1000; //1s
}
for (i = 0; i < PROCESSES_NUM; i++) {
config_config.processes[i].softdog = 0;
for (j = 0; j < PROCESS_UNIT_NUM; j++) {
config_config.processes[i].units[j] = -1;
}
}
for (i = 0; i < UNIT_NUM; i++) {
config_config.units[i].softdog = 0;
}
return true;
}
bool configWriteSystemCFG(void)
{
FILE* pf;
char pathName[512];
snprintf(pathName, sizeof(pathName), "%s/%s", configpath, FILE_SYSTEM_CONFIG);
pf = fopen(pathName, "wb+");
if (pf != NULL) {
fwrite(&config_system32, sizeof(config_system32), 1, pf);
fclose(pf);
return true;
}
return false;
}
bool configWriteNodeCFG(void)
{
FILE *pf;
char pathName[512];
snprintf(pathName, sizeof(pathName), "%s/%s", configpath, FILE_NODE_CONFIG);
pf = fopen(pathName, "wb+");
if (pf != NULL) {
fwrite(&config_nodes, sizeof(config_nodes), 1, pf);
fclose(pf);
return true;
}
return false;
}
bool configWriteHardwareCFG(void)
{
FILE* pf;
char pathName[512];
snprintf(pathName, sizeof(pathName), "%s/%s", configpath, FILE_HARDWARE_CONFIG);
pf = fopen(pathName, "wb+");
if (pf != NULL) {
fwrite(&config_config.hardware, sizeof(config_config.hardware), 1, pf);
fclose(pf);
return true;
}
return false;
}
bool configWriteProcessCFG(void)
{
FILE* pf;
char pathName[512];
snprintf(pathName, sizeof(pathName), "%s/%s", configpath, FILE_PROCESS_CONFIG);
pf = fopen(pathName, "wb+");
if (pf != NULL) {
fwrite(&config_config.processes, sizeof(config_config.processes), 1, pf);
fclose(pf);
return true;
}
return false;
}
bool configWriteUnitCFG(void)
{
int i;
char filename[512];
char pathName[512];
FILE *pf = NULL;
FILE *pfPoint = NULL;
snprintf(pathName, sizeof(pathName), "%s/%s", configpath, FILE_UNIT_CONFIG);
pf = fopen(pathName, "wb+");
if (pf != NULL) {
for (i = 0; i < UNIT_NUM; i++) {
if (fwrite(&config_config.units[i], sizeof(config_config.units[i]), 1, pf) != 1) break;
if (config_config.units[i].yxcount > 0) {
if (config_config.units[i].yxs != NULL) {
snprintf(filename, sizeof(filename), FILE_UNIT_YX_CONFIG, configpath, i);
pfPoint = fopen(filename, "wb+");
if (pfPoint != NULL) {
fwrite(config_config.units[i].yxs, sizeof(struUnitYX)*config_config.units[i].yxcount, 1, pfPoint);
fclose(pfPoint);
}
}
}
if (config_config.units[i].yccount > 0) {
if (config_config.units[i].ycs != NULL) {
snprintf(filename, sizeof(filename), FILE_UNIT_YC_CONFIG, configpath, i);
pfPoint = fopen(filename, "wb+");
if (pfPoint != NULL) {
fwrite(config_config.units[i].ycs, sizeof(struUnitYC)*config_config.units[i].yccount, 1, pfPoint);
fclose(pfPoint);
}
}
}
if (config_config.units[i].ymcount > 0) {
if (config_config.units[i].yms != NULL) {
snprintf(filename, sizeof(filename), FILE_UNIT_YM_CONFIG, configpath, i);
pfPoint = fopen(filename, "wb+");
if (pfPoint != NULL) {
fwrite(config_config.units[i].yms, sizeof(struUnitYM)*config_config.units[i].ymcount, 1, pfPoint);
fclose(pfPoint);
}
}
}
if (config_config.units[i].ykcount > 0) {
if (config_config.units[i].yks != NULL) {
snprintf(filename, sizeof(filename), FILE_UNIT_YK_CONFIG, configpath, i);
pfPoint = fopen(filename, "wb+");
if (pfPoint != NULL) {
fwrite(config_config.units[i].yks, sizeof(struUnitYK)*config_config.units[i].ykcount, 1, pfPoint);
fclose(pfPoint);
}
}
}
if (config_config.units[i].ytcount > 0) {
if (config_config.units[i].yts != NULL) {
snprintf(filename, sizeof(filename), FILE_UNIT_YT_CONFIG, configpath, i);
pfPoint = fopen(filename, "wb+");
if (pfPoint != NULL) {
fwrite(config_config.units[i].yts, sizeof(struUnitYT)*config_config.units[i].ytcount, 1, pfPoint);
fclose(pfPoint);
}
}
}
}
fclose(pf);
return true;
}
return false;
}
bool configWriteStaticUnitCFG(void)
{
FILE *pf;
char pathName[512];
snprintf(pathName, sizeof(pathName), "%s/%s", configpath, FILE_UNIT_STATIC);
pf = fopen(pathName, "w+");
if (pf != NULL) {
fwrite(config_static_units, sizeof(config_static_units), 1, pf);
fclose(pf);
return true;
}
return false;
}
bool configWriteDatabaseCFG(void)
{
FILE *pf;
char pathName[512];
snprintf(pathName, sizeof(pathName), "%s/%s", configpath, FILE_DATABASE_CONFIG);
pf = fopen(pathName, "wb+");
if (pf != NULL) {
fwrite(config_database.yxs, sizeof(struYX) * DATABASE_YX_NUM, 1, pf);
fwrite(config_database.ycs, sizeof(struYC) * DATABASE_YC_NUM, 1, pf);
fwrite(config_database.yms, sizeof(struYM) * DATABASE_YM_NUM, 1, pf);
fwrite(config_database.yks, sizeof(struYK) * DATABASE_YK_NUM, 1, pf);
fwrite(config_database.yts, sizeof(struYT) * DATABASE_YT_NUM, 1, pf);
fclose(pf);
return true;
}
return false;
}
#if 1
BYTE *websocket_msg_join(noPollMsg **msg, int msg_count, int *buffer_size)
{
BYTE* buffer;
if (msg_count <= 0 || msg_count >= MAX_MSG_COUNT) return NULL;
if (msg[0] == NULL) return NULL;
*buffer_size = 0;
for (int i = 0; i < msg_count; i++)
{
*buffer_size += nopoll_msg_get_payload_size(msg[i]);
}
buffer = new BYTE[*buffer_size + 1];
int len = 0;
for (int i = 0; i < msg_count; i++)
{
memcpy(&buffer[len], nopoll_msg_get_payload(msg[i]), nopoll_msg_get_payload_size(msg[i]));
len += nopoll_msg_get_payload_size(msg[i]);
}
buffer[*buffer_size] = 0;
/* return joined message */
return buffer;
}
int websocket_write(noPollConn* conn, const char * buffer, int buffer_len)
{
#if 1
int result;
result = nopoll_conn_send_text(conn, (const char *)buffer, buffer_len);
/* ensure we have written all bytes but limit operation to 2 seconds */
result = nopoll_conn_flush_writes(conn, 2000000, result);
return (result == buffer_len) ? 0 : -1;
#else
// FIRST PART: normal send operation
int tries = 0;
int bytes_written;
// do write operation and check
bytes_written = nopoll_conn_send_text(conn, buffer, buffer_len);
if (bytes_written == length)
{ // operation completed, just return bytes written
return 0;
}
// SECOND PART: retry in the case of failure
// some failure found, check errno
while (tries < 5 && errno == NOPOLL_EWOULDBLOCK && nopoll_conn_pending_write_bytes(conn) > 0)
{ // ok, unable to write all data but that data is waiting to be flushed
// you can return here and then make your application to retry again or
// try it right now, but with a little pause before continue
nopoll_sleep(10000); // lets wait 10ns
// flush and check if write operation completed
if (nopoll_conn_complete_pending_write(conn) == 0) return 0;
// limit loop
tries++;
}
// failure, return error code reported by the first call or the last retry
return -1;
#endif
}
#else
#endif
bool publish_sensor_data(noPollConn* conn, const std::string traceId, const char* command, const Json::Value payload)
{
Json::StreamWriterBuilder builder;
builder["indentation"] = "";
builder["emitUTF8"] = true;
Json::Value jsonRoot;
jsonRoot["cmd"] = command;
char str[128];
#if 0
uuid_t uuid;
uuid_generate_time(uuid);
uuid_unparse_upper(uuid, str);
#else
snprintf(str, sizeof(str), "%lld", snowflake_next_id(&g_sf));
#endif
if (traceId == "") {
jsonRoot["cmdId"] = str;
} else {
jsonRoot["cmdId"] = traceId;
}
Json::Int64 mtime = (Json::Int64)time(NULL);
mtime *= 1000;
jsonRoot["time"] = mtime;
jsonRoot["data"] = payload;
std::string outputConfig = Json::writeString(builder, jsonRoot);
vLog(LOG_DEBUG, "send cmd: %s, payload: %d\n%s\n", command, outputConfig.length(), outputConfig.c_str());
int rc = websocket_write(conn, outputConfig.c_str(), outputConfig.length());
if (rc != 0) {
vLog(LOG_DEBUG, "websocket write is error<%d>,insert into database.\n", rc);
//插入数据库
return false;
}
return true;
}
static int GetUnitYXCount(int uid)
{
if (uid < 0 || uid >= UNIT_NUM) return 0;
return config.units[uid].yxcount;
}
static int GetUnitYCCount(int uid)
{
if (uid < 0 || uid >= UNIT_NUM) return 0;
return config.units[uid].yccount;
}
static int GetUnitYMCount(int uid)
{
if (uid < 0 || uid >= UNIT_NUM) return 0;
return config.units[uid].ymcount;
}
static float GetUnitYCReal(int uid, int order)
{
int udb;
long value;
float coef;
float base;
struUnit* pUnit;
struUnitYC* pYC;
if (uid < 0 || uid >= UNIT_NUM) return 0;
pUnit = &config.units[uid];
if ((pUnit->state & 0x01) != TRUE) return 0;
if (order < 0 || order >= pUnit->yccount) return 0;
pYC = &pUnit->ycs[order];
udb = pYC->order;
if (udb < 0 || udb >= DATABASE_YC_NUM) {
value = pYC->value;
coef = 1.0f;
base = 0.0f;
} else {
value = database.ycs[udb].value;
coef = pYC->coef;
base = pYC->base;
pYC->value = value;
pYC->update_time = database.ycs[udb].update_time;
pYC->qds = database.ycs[udb].qds;
}
return (float)((float)value * coef + base);
}
static float GetUnitYCRealFromValue(int uid, int order, long value)
{
int udb;
float coef;
float base;
struUnit* pUnit;
struUnitYC* pYC;
if (uid < 0 || uid >= UNIT_NUM) return 0;
pUnit = &config.units[uid];
if ((pUnit->state & 0x01) != TRUE) return 0;
if (order < 0 || order >= pUnit->yccount) return 0;
pYC = &pUnit->ycs[order];
udb = pYC->order;
if (udb < 0 || udb >= DATABASE_YC_NUM) {
coef = 1.0f;
base = 0.0f;
} else {
coef = pYC->coef;
base = pYC->base;
}
return (float)(value * coef + base);
}
static float GetUnitYMReal(int uid, int order)
{
int udb;
long value;
float coef;
float base;
struUnit* pUnit;
struUnitYM* pYM;
if (uid < 0 || uid >= UNIT_NUM) return 0;
pUnit = &config.units[uid];
if ((pUnit->state & 0x01) != TRUE) return 0;
if (order < 0 || order >= pUnit->ymcount) return 0;
pYM = &pUnit->yms[order];
udb = pYM->order;
if (udb < 0 || udb >= DATABASE_YM_NUM) {
value = pYM->value;
coef = 1.0f;
base = 0.0f;
} else {
value = (long)database.yms[udb].value;
pYM->update_time = database.yms[udb].update_time;
coef = pYM->coef;
base = pYM->base;
pYM->value = value;
}
return (float)(value * coef + base);
}
static BYTE GetUnitYX(int uid, int point)
{
int udb;
BOOLEAN value;
struUnit* pUnit;
struUnitYX* pYX;
if (uid < 0 || uid >= UNIT_NUM) return 0;
pUnit = &config.units[uid];
if ((pUnit->state & 0x01) != TRUE) return 0;
if (point < 0 || point >= pUnit->yxcount) return 0;
pYX = &pUnit->yxs[point];
udb = pYX->order;
if (udb < 0 || udb >= DATABASE_YX_NUM) {
value = pYX->value;
} else {
value = database.yxs[udb].value;
pYX->value = value;
pYX->update_time = database.yxs[udb].update_time;
}
return value;
}
int GetUnitYXBW(int& uid, BOOLEAN& value, BYTE& qds, int& type, unionCP56Time& st)
{
int order;
int point;
while (yxbw.GetYXBW(m_yxbwload, st, order, value, qds, uid, point, type))
{
m_yxbwload++;
return point;
}
return -1;
}
int GetUnitSOE(int& uid, BOOLEAN& value, BYTE& qds, unionCP56Time& st)
{
int order;
int point;
while (soe.GetSOE(m_soeload, st, order, value, qds, uid, point)) {
m_soeload++;
return point;
}
return -1;
}
int GetUnitYCBW(int& uid, LONG& value, BYTE& qds, int& type, unionCP56Time& st)
{
int order;
int point;
while (ycbw.GetYCBW(m_ycbwload, st, order, value, qds, uid, point, type)) {
m_ycbwload++;
return point;
}
return -1;
}
BOOLEAN GetUnitYK(int uid, int& order, BYTE& value, BYTE& act, BYTE& result)
{
int i;
int udb;
struUnit* pUnit;
struYK* pYK;
if (uid < 0 || uid >= UNIT_NUM) return FALSE;
pUnit = &config.units[uid];
if ((pUnit->state & 0x01) != TRUE) return FALSE;
for (i = 0; i < pUnit->ykcount; i++) {
udb = pUnit->yks[i].order;
if (udb < 0 || udb >= DATABASE_YK_NUM) continue;
pYK = &database.yks[udb];
order = i;
value = pYK->value;
act = (BYTE)pYK->state;
result = (BYTE)pYK->result;
if (pYK->op_unit != uid) continue;
switch (act) {
case YKS_SELED:
if (result == YKR_SUCC) {
pYK->result = YKR_OPER;
yklog.PushYKLog(system32.now, udb, value, YKT_SELRET, YKS_PROC, uid);
return TRUE;
} else if (result == YKR_FAIL) {
pYK->state = YKS_IDLE;
pYK->result = YKR_IDLE;
pYK->op_unit = -1;
return TRUE;
} else if (result == YKR_IDLE) {
pYK->state = YKS_IDLE;
pYK->op_unit = -1;
}
break;
case YKS_EXEED:
pYK->state = YKS_IDLE;
pYK->result = YKR_IDLE;
pYK->op_unit = -1;
if (result == YKR_SUCC || result == YKR_FAIL) {
return TRUE;
}
break;
case YKS_ABRED:
pYK->state = YKS_IDLE;
pYK->result = YKR_IDLE;
pYK->op_unit = -1;
return TRUE;
case YKS_EXEING:
if (result == YKR_OVER) {
pYK->state = YKS_IDLE;
pYK->result = YKR_IDLE;
pYK->op_unit = -1;
return TRUE;
}
break;
case YKS_SELREQ:
if (result == YKR_OVER) {
pYK->state = YKS_IDLE;
pYK->result = YKR_IDLE;
pYK->op_unit = -1;
return TRUE;
}
break;
case YKS_SELING:
if (result == YKR_OVER) {
pYK->state = YKS_IDLE;
pYK->result = YKR_IDLE;
pYK->op_unit = -1;
return TRUE;
}
break;
case YKS_ABRREQ:
case YKS_EXEREQ:
case YKS_IDLE:
break;
default:
pYK->state = YKS_IDLE;
pYK->result = YKR_IDLE;
pYK->op_unit = -1;
break;
}
}
return FALSE;
}
void SetUnitYK(int uid, int order, BYTE value, BYTE act, BYTE result)
{
int udb;
struUnit* pUnit;
struYK* pYK;
if (uid < 0 || uid >= UNIT_NUM) return;
pUnit = &config.units[uid];
if ((pUnit->state & 0x01) != TRUE) return;
if (order < 0 || order >= pUnit->ykcount) return;
udb = pUnit->yks[order].order;
if (udb < 0 || udb >= DATABASE_YK_NUM) return;
pYK = &database.yks[udb];
switch (act) {
case YKS_SELREQ:
if (pYK->state == YKS_IDLE /*&& pYK->op_unit == -1*/) {
pYK->state = act;
pYK->result = YKR_IDLE;
pYK->value = value;
pYK->op_time = system32.timers;
pYK->op_unit = uid;
yklog.PushYKLog(system32.now, udb, value, YKT_SELREQ, YKS_PROC, uid);
}
break;
case YKS_ABRREQ:
if (pYK->op_unit != uid) break;
if (pYK->value != value) break;
if (pYK->state != YKS_IDLE && pYK->state != YKS_EXEED) {
pYK->state = act;
pYK->result = YKR_IDLE;
yklog.PushYKLog(system32.now, udb, value, YKT_ABRREQ, YKS_PROC, uid);
}
break;
case YKS_EXEREQ:
if (pYK->op_unit != uid) break;
if (pYK->value != value) break;
if (pYK->state == YKS_SELED && pYK->result == YKR_OPER) {
pYK->state = act;
pYK->result = YKR_IDLE;
yklog.PushYKLog(system32.now, udb, value, YKT_EXEREQ, YKS_PROC, uid);
}
break;
}
}
BOOLEAN GetUnitYT(int uid, int& order, DWORD& value, BYTE& act, BYTE& result)
{
int i;
int udb;
struUnit* pUnit;
struYT* pYT;
if (uid < 0 || uid >= UNIT_NUM) return FALSE;
pUnit = &config.units[uid];
if ((pUnit->state & 0x01) != TRUE) return FALSE;
for (i = 0; i < pUnit->ytcount; i++) {
udb = pUnit->yts[i].order;
if (udb < 0 || udb >= DATABASE_YT_NUM) continue;
pYT = &database.yts[udb];
order = i;
value = (DWORD)pYT->value;
act = (BYTE)pYT->state;
result = (BYTE)pYT->result;
if (pYT->op_unit != uid) continue;
switch (act) {
case YTS_SELED:
if (result == YTR_SUCC) {
pYT->result = YTR_OPER;
ytlog.PushYTLog(system32.now, udb, value, YTT_SELRET, YTS_PROC, uid);
return TRUE;
} else if (result == YTR_FAIL) {
pYT->state = YTS_IDLE;
pYT->result = YTR_IDLE;
pYT->op_unit = -1;
return TRUE;
} else if (result == YTR_IDLE) {
pYT->state = YTS_IDLE;
pYT->op_unit = -1;
}
break;
case YTS_EXEED:
pYT->state = YTS_IDLE;
pYT->result = YTR_IDLE;
pYT->op_unit = -1;
if (result == YTR_SUCC || result == YTR_FAIL) {
return TRUE;
}
break;
case YTS_ABRED:
pYT->state = YTS_IDLE;
pYT->result = YTR_IDLE;
pYT->op_unit = -1;
return TRUE;
case YTS_EXEING:
if (result == YTR_OVER) {
pYT->state = YTS_IDLE;
pYT->result = YTR_IDLE;
pYT->op_unit = -1;
return TRUE;
}
break;
case YTS_SELREQ:
if (result == YTR_OVER) {
pYT->state = YTS_IDLE;
pYT->result = YTR_IDLE;
pYT->op_unit = -1;
return TRUE;
}
break;
case YTS_SELING:
if (result == YTR_OVER) {
pYT->state = YTS_IDLE;
pYT->result = YTR_IDLE;
pYT->op_unit = -1;
return TRUE;
}
break;
case YTS_ABRREQ:
case YTS_EXEREQ:
case YTS_IDLE:
break;
default:
pYT->state = YTS_IDLE;
pYT->result = YTR_IDLE;
pYT->op_unit = -1;
break;
}
}
return FALSE;
}
void SetUnitYT(int uid, int order, DWORD value, BYTE act, BYTE result)
{
int udb;
struUnit* pUnit;
struYT* pYT;
if (uid < 0 || uid >= UNIT_NUM) return;
pUnit = &config.units[uid];
if ((pUnit->state & 0x01) != TRUE) return;
if (order < 0 || order >= pUnit->ytcount) return;
udb = pUnit->yts[order].order;
if (udb < 0 || udb >= DATABASE_YT_NUM) return;
pYT = &database.yts[udb];
switch (act) {
case YTS_SELREQ:
if (pYT->state == YTS_IDLE /*&& pYT->op_unit == -1*/) {
pYT->state = act;
pYT->result = YTR_IDLE;
pYT->value = value;
pYT->op_time = system32.timers;
pYT->op_unit = uid;
ytlog.PushYTLog(system32.now, udb, value, YTT_SELREQ, YTS_PROC, uid);
}
break;
case YTS_ABRREQ:
if (pYT->op_unit != uid) break;
if (pYT->value != (long)value) break;
if (pYT->state != YTS_IDLE && pYT->state != YTS_EXEED) {
pYT->state = act;
pYT->result = YTR_IDLE;
ytlog.PushYTLog(system32.now, udb, value, YTT_ABRREQ, YTS_PROC, uid);
}
break;
case YTS_EXEREQ:
if ((pYT->state == YTS_SELED || pYT->state == YTS_IDLE)) {
pYT->state = act;
pYT->result = YTR_IDLE;
pYT->value = value;
pYT->op_time = system32.timers;
pYT->op_unit = uid;
ytlog.PushYTLog(system32.now, udb, value, YTT_EXEREQ, YTS_PROC, uid);
}
break;
}
}
int MakeYKFrame(noPollConn* conn, int uid)
{
int order;
BYTE value, action, result;
//for (int i = 0; i < m_total_units.size(); i++)
{
//uid = m_total_units.at(i);
if (uid < 0 || uid >= UNIT_NUM) return -1;
if (!GetUnitYK(uid, order, value, action, result)) return -1;
vLog(LOG_WARN, "Unit(%d) yk(%d) %s state is %s result is %s\n", uid, order, (value ? "CLOSE" : "TRIP"), val_to_str(action, yk_state, "STATE=%d"), val_to_str(result, yk_result, "RESULT=%d"));
//发送确认
Json::Value jsonRoot;
jsonRoot["deviceId"] = static_units[uid].deviceId;
jsonRoot["serviceName"] = (const char *)config.units[uid].yks[order].name;
jsonRoot["opValue"] = value;
if (YKS_SELED == action && YKR_FAIL == result) {
action = YKS_ABRED;
} else if (YKS_SELREQ == action && YKR_OVER == result) {
action = YKS_ABRED;
} else if (YKS_SELING == action && YKR_OVER == result) {
action = YKS_ABRED;
}
if (YKS_SELED == action) {
SetUnitYK(uid, order, value, YKS_EXEREQ, YKR_IDLE);
vLog(LOG_WARN, "Unit(%d) yk(%d) %s state is YKS_EXEREQ result is YKR_IDLE.\n", uid, order, (value ? "CLOSE" : "TRIP"));
return 1;
} else if (YKS_ABRED == action) {
SetUnitYK(uid, order, value, YKS_ABRREQ, YKR_IDLE);
vLog(LOG_WARN, "Unit(%d) yk(%d) %s state is YKS_ABRREQ result is YKR_IDLE.\n", uid, order, (value ? "CLOSE" : "TRIP"));
jsonRoot["result"] = false;
} else {
jsonRoot["result"] = true;
}
publish_sensor_data(conn, g_traceId, "deviceControlResp", jsonRoot);
return 1;
}
return 0;
}
int MakeYTFrame(noPollConn* conn, int uid)
{
int order;
BYTE action, result;
DWORD value;
//for (int i = 0; i < m_total_units.size(); i++)
{
if (uid < 0 || uid >= UNIT_NUM) return -1;
if (!GetUnitYT(uid, order, value, action, result)) return -1;
vLog(LOG_DEBUG, "Unit(%d) set point(%d) %d state is %s result is %s\n", uid, order, value, val_to_str(action, yt_state, "STATE=%d"), val_to_str(result, yt_result, "RESULT=%d"));
//发送确认
Json::Value jsonRoot;
jsonRoot["deviceId"] = static_units[uid].deviceId;
jsonRoot["serviceName"] = (const char *)config.units[uid].yts[order].name;
jsonRoot["opValue"] = value;
if (YTS_SELED == action && YTR_FAIL == result) {
action = YTS_ABRED;
} else if (YTS_SELREQ == action && YTR_OVER == result) {
action = YTS_ABRED;
} else if (YTS_SELING == action && YTR_OVER == result) {
action = YTS_ABRED;
}
if (YTS_SELED == action) {
SetUnitYT(uid, order, value, YTS_EXEREQ, YTR_IDLE);
vLog(LOG_DEBUG, "Unit(%d) set point(%d) %d state is YTS_EXEREQ result is YTR_IDLE.\n", uid, order, value);
return 1;
} else if (YTS_ABRED == action) {
SetUnitYT(uid, order, value, YTS_ABRREQ, YTR_IDLE);
vLog(LOG_DEBUG, "Unit(%d) set point(%d) %d state is YTS_ABRREQ result is YTR_IDLE.\n", uid, order, value);
jsonRoot["result"] = false;
} else {
jsonRoot["result"] = true;
}
publish_sensor_data(conn, g_traceId, "deviceControlResp", jsonRoot);
return 1;
}
return 0;
}
bool OnReceivedDeviceCommand(const Json::Value jsonRoot)
{
if (jsonRoot["deviceId"].isNull()) return FALSE;
if (jsonRoot["serviceName"].isNull()) return FALSE;
if (jsonRoot["opValue"].isNull()) return FALSE;
std::string deviceId = jsonRoot["deviceId"].asString();
std::string serviceName = jsonRoot["serviceName"].asString();
if (unitname2service_map.find(deviceId) == unitname2service_map.end()) {
vLog(LOG_WARN, "下发了一个不存在services的控制设备id<%s>。\n", deviceId.c_str());
return false;
}
name2servicemap name2service_map = unitname2service_map[deviceId];
if (name2service_map.find(serviceName) == name2service_map.end()) {
vLog(LOG_WARN, "下发了一个不存在的service名称<%s>。\n", serviceName.c_str());
return false;
}
struct_service_item item = name2service_map[serviceName];
int operType = item.type;
int uid = item.uid;
int point = item.point;
if (operType == CMD_CONTROL_OPERATION) { //遥控
if (point < 0) {
vLog(LOG_ERROR, "未能找到对应的遥控点号,请检查并确认。\n");
return FALSE;
}
int operValue = jsonRoot["opValue"].asInt();
SetUnitYK(uid, point, (operValue & 0x03), YKS_SELREQ, YKR_IDLE);
vLog(LOG_WARN, "Unit(%d) yk(%d) %s state is YKS_SELREQ result is YKR_IDLE.\n", uid, point, ((operValue & 0x03) ? "CLOSE" : "TRIP"));
} else if (operType == CMD_CONTROL_SETTING) { //遥调
union {
float fval;
DWORD dval;
} operValue;
if (point < 0) {
vLog(LOG_ERROR, "未能找到对应的遥调点号,请检查并确认。\n");
return FALSE;
}
if (jsonRoot["opValue"].isDouble()) operValue.fval = jsonRoot["opValue"].asFloat();
else if (jsonRoot["opValue"].isInt()) operValue.dval = jsonRoot["opValue"].asInt();
SetUnitYT(uid, point, operValue.dval, YTS_EXEREQ, YTR_IDLE);
vLog(LOG_DEBUG, "Unit(%d) set point(%d) %d state is YTS_EXEREQ result is YTR_IDLE.\n", uid, point, operValue.dval);
} else {
vLog(LOG_ERROR, "平台下发的<%d>命令错误。operType不是<0-遥控或1-遥调>,系统不支持的命令!\n", operType);
return FALSE;
}
return TRUE;
}
static BOOLEAN processUartParam(const Json::Value jsonRoot, int ord)
{
if (ord < 0 || ord >= HARDWARE_PORTS_NUM) return FALSE;
config_config.hardware.ports[ord].state = TRUE;
snprintf(config_config.hardware.ports[ord].name, sizeof(config_config.hardware.ports[ord].name), "ttyS%d", ord);
config_config.hardware.ports[ord].baud = 9600;
config_config.hardware.ports[ord].data = 8;
config_config.hardware.ports[ord].parity = 0;
config_config.hardware.ports[ord].stop = 0;
config_config.hardware.ports[ord].timeout = 1000;
if (jsonRoot["name"].isString()) {
snprintf(config_config.hardware.ports[ord].name, sizeof(config_config.hardware.ports[ord].name), "%s", jsonRoot["name"].asCString());
}
if (jsonRoot["baud"].isInt()) {
config_config.hardware.ports[ord].baud = jsonRoot["baud"].asInt();
} else if (jsonRoot["baud"].isString()) {
config_config.hardware.ports[ord].baud = atoi(jsonRoot["baud"].asCString());
}
if (jsonRoot["data"].isInt()) {
config_config.hardware.ports[ord].data = jsonRoot["data"].asInt();
} else if (jsonRoot["data"].isString()) {
config_config.hardware.ports[ord].data = atoi(jsonRoot["data"].asCString());
}
if (jsonRoot["parity"].isInt()) {
config_config.hardware.ports[ord].parity = jsonRoot["parity"].asInt();
} else if (jsonRoot["parity"].isString()) {
config_config.hardware.ports[ord].parity = atoi(jsonRoot["parity"].asCString());
}
if (jsonRoot["stop"].isInt()) {
if (jsonRoot["stop"].asInt() == 1) config_config.hardware.ports[ord].stop = 0;
else if (jsonRoot["stop"].asInt() == 2) config_config.hardware.ports[ord].stop = 2;
} else if (jsonRoot["stop"].isString()) {
if (jsonRoot["stop"].asString() == "1") config_config.hardware.ports[ord].stop = 0;
else if (jsonRoot["stop"].asString() == "2") config_config.hardware.ports[ord].stop = 2;
}
if (jsonRoot["timeout"].isInt()) {
config_config.hardware.ports[ord].timeout = jsonRoot["timeout"].asInt();
} else if (jsonRoot["timeout"].isString()) {
config_config.hardware.ports[ord].timeout = atoi(jsonRoot["timeout"].asCString());
}
return TRUE;
}
static BOOLEAN processNetworkParam(const Json::Value jsonRoot, int pid)
{
if (pid < 0 || pid >= PROCESSES_NUM) return FALSE;
config_config.processes[pid].option.network.ignored_source = TRUE;
config_config.processes[pid].option.network.socket_type = SOCK_STREAM;
config_config.processes[pid].option.network.bind_addr = INADDR_ANY;
config_config.processes[pid].option.network.bind_port = 0;
config_config.processes[pid].option.network.target_addr = INADDR_ANY;
config_config.processes[pid].option.network.target_port = 502;
if (jsonRoot["host"].isInt()) {
config_config.processes[pid].option.network.target_addr = jsonRoot["host"].asInt();
} else if (jsonRoot["host"].isString()) {
if (inet_pton(AF_INET, jsonRoot["host"].asCString(), &config_config.processes[pid].option.network.target_addr) == 1) {
vLog(LOG_DEBUG, "IPv4 地址转换成功,网络字节序为: %u.\n", config_config.processes[pid].option.network.target_addr);
} else {
vLog(LOG_ERROR, "inet_pton error(%d,%s).\n", errno, strerror(errno));
}
}
if (jsonRoot["port"].isInt()) {
config_config.processes[pid].option.network.target_port = jsonRoot["port"].asInt();
} else if (jsonRoot["port"].isString()) {
config_config.processes[pid].option.network.target_port = atoi(jsonRoot["port"].asCString());
}
}
static BOOLEAN processHostIEC104ProcessParam(const Json::Value jsonRoot, int pid)
{
if (pid < 0 || pid >= PROCESSES_NUM) return FALSE;
config_config.processes[pid].option.iec104.net.ignored_source = TRUE;
config_config.processes[pid].option.iec104.net.socket_type = SOCK_STREAM;
config_config.processes[pid].option.iec104.net.target_addr = INADDR_ANY;
config_config.processes[pid].option.iec104.net.target_port = 2404;
config_config.processes[pid].option.iec104.asdu_addr_size = 2;
config_config.processes[pid].option.iec104.cot_size = 2;
config_config.processes[pid].option.iec104.info_addr_size = 3;
config_config.processes[pid].option.iec104.t0 = 30;
config_config.processes[pid].option.iec104.t1 = 15;
config_config.processes[pid].option.iec104.t2 = 10;
config_config.processes[pid].option.iec104.t3 = 20;
config_config.processes[pid].option.iec104.k = 12;
config_config.processes[pid].option.iec104.w = 8;
if (jsonRoot["host"].isInt()) {
config_config.processes[pid].option.iec104.net.target_addr = jsonRoot["host"].asInt();
} else if (jsonRoot["host"].isString()) {
if (inet_pton(AF_INET, jsonRoot["host"].asCString(), &config_config.processes[pid].option.iec104.net.target_addr) == 1) {
vLog(LOG_DEBUG, "IPv4 地址转换成功,网络字节序为: %u.\n", config_config.processes[pid].option.iec104.net.target_addr);
} else {
vLog(LOG_ERROR, "inet_pton error(%d,%s).\n", errno, strerror(errno));
}
}
if (jsonRoot["port"].isInt()) {
config_config.processes[pid].option.iec104.net.target_port = jsonRoot["port"].asInt();
} else if (jsonRoot["port"].isString()) {
config_config.processes[pid].option.iec104.net.target_port = atoi(jsonRoot["port"].asCString());
}
if (jsonRoot["t0"].isInt()) {
config_config.processes[pid].option.iec104.t0 = jsonRoot["t0"].asInt();
} else if (jsonRoot["t0"].isString()) {
config_config.processes[pid].option.iec104.t0 = atoi(jsonRoot["t0"].asCString());
}
if (jsonRoot["t1"].isInt()) {
config_config.processes[pid].option.iec104.t0 = jsonRoot["t1"].asInt();
} else if (jsonRoot["t1"].isString()) {
config_config.processes[pid].option.iec104.t0 = atoi(jsonRoot["t1"].asCString());
}
if (jsonRoot["t2"].isInt()) {
config_config.processes[pid].option.iec104.t0 = jsonRoot["t2"].asInt();
} else if (jsonRoot["t2"].isString()) {
config_config.processes[pid].option.iec104.t0 = atoi(jsonRoot["t2"].asCString());
}
if (jsonRoot["t3"].isInt()) {
config_config.processes[pid].option.iec104.t0 = jsonRoot["t3"].asInt();
} else if (jsonRoot["t3"].isString()) {
config_config.processes[pid].option.iec104.t0 = atoi(jsonRoot["t3"].asCString());
}
return TRUE;
}
static bool dealConfigFile(const Json::Value jsonRoot)
{
bool result = false;
do {
FILE* pf = fopen("aaa.json", "w+");
if (pf) {
fwrite(jsonRoot.toStyledString().c_str(), jsonRoot.toStyledString().length(), 1, pf);
fclose(pf);
}
if (!configInitializeMemory()) {
vLog(LOG_ERROR, "Fail initialize memory!\n");
break;
}
if (!jsonRoot["nodeId"].isString()) {
vLog(LOG_ERROR, "配置文件格式错误缺少节点ID的配置信息请检查。\n");
break;
}
if (jsonRoot["version"].isNull()) {
vLog(LOG_WARN, "配置文件格式错误,缺少版本信息,请检查。\n");
}
if (jsonRoot["version"].isInt()) {
snprintf(config_system32.version, sizeof(config_system32.version), "%d", jsonRoot["version"].asInt());
} else if (jsonRoot["version"].isString()) {
snprintf(config_system32.version, sizeof(config_system32.version), "%d", jsonRoot["version"].asCString());
}
name2unitmap deviceIDs;
deviceIDs.clear();
//更新节点配置
config_nodes.m_node[0].m_netnode_no = 0;
config_nodes.m_node[0].m_tcitype = 1;
config_nodes.m_node[0].m_target_addr = INADDR_LOOPBACK;
config_nodes.m_node[0].m_target_port = 15000;
snprintf(config_nodes.m_node[0].m_machine_name, sizeof(config_nodes.m_node[0].m_machine_name), "%s", jsonRoot["nodeId"].asCString());
//更新设备列表
const Json::Value equipments = jsonRoot["equipments"];
{
long yxorder = 0, ycorder = 0, ymorder = 0, ykorder = 0, ytorder = 0;
char dbyxname[512];
snprintf(dbyxname, sizeof(dbyxname), "%s/%s", configpath, FILE_DATABASE_YX_STATIC);
FILE *pfdbyxname = fopen(dbyxname, "wb+");
char dbycname[512];
snprintf(dbycname, sizeof(dbycname), "%s/%s", configpath, FILE_DATABASE_YC_STATIC);
FILE *pfdbycname = fopen(dbycname, "wb+");
char dbymname[512];
snprintf(dbymname, sizeof(dbymname), "%s/%s", configpath, FILE_DATABASE_YM_STATIC);
FILE *pfdbymname = fopen(dbymname, "wb+");
char dbykname[512];
snprintf(dbykname, sizeof(dbykname), "%s/%s", configpath, FILE_DATABASE_YK_STATIC);
FILE *pfdbykname = fopen(dbykname, "wb+");
char dbytname[512];
snprintf(dbytname, sizeof(dbytname), "%s/%s", configpath, FILE_DATABASE_YT_STATIC);
FILE *pfdbytname = fopen(dbytname, "wb+");
int count = equipments.size();
for (int i = 0; i < count; i++) {
const Json::Value equipment = equipments[i];
int uid = i;
snprintf(config_static_units[uid].name, sizeof(config_static_units[uid].name), "device_%d", uid);
snprintf(config_static_units[uid].model, sizeof(config_static_units[uid].model), "model_%d", uid);
snprintf(config_static_units[uid].manufacturerId, sizeof(config_static_units[uid].manufacturerId), "%s", "iss");
std::string id = "";
std::string address = "";
if (equipment["id"].isString()) {
id = equipment["id"].asString();
snprintf(config_static_units[uid].deviceId, sizeof(config_static_units[uid].deviceId), "%s", id.c_str());
} else {
snprintf(config_static_units[uid].deviceId, sizeof(config_static_units[uid].deviceId), "iss_%d", uid);
}
if (equipment["addr"].isString()) {
address = equipment["addr"].asString();
}
struct_uidaddress uidAddress;
uidAddress.uid = uid;
uidAddress.address = address;
if (deviceIDs.find(id) == deviceIDs.end()) {
deviceIDs.insert(name2unitmap::value_type(id, uidAddress));
} else {
vLog(LOG_DEBUG, "found two devices have the same id.\n");
}
attrvector yxs, ycs, yms, yks, yts;
const Json::Value attrs = equipment["attrs"];
if (!attrs.isNull()) {
int size = attrs.size();
for (int j = 0; j < size; j++) {
const Json::Value attr = attrs[j];
std::string type = "";
if (attr["type"].isString()) type = attr["type"].asString();
struct_attr name_param;
name_param.name = "";
if (attr["name"].isString()) name_param.name = attr["name"].asString();
if (attr["params"].isObject()) name_param.value = attr["params"];
if (type == "yc") ycs.push_back(name_param);
else if (type == "yx") yxs.push_back(name_param);
else if (type == "ym") yms.push_back(name_param);
}
}
const Json::Value services = equipment["services"];
if (!services.isNull()) {
int size = services.size();
for (int j = 0; j < size; j++) {
const Json::Value service = services[j];
std::string type = "";
if (service["type"].isString()) type = service["type"].asString();
struct_attr name_param;
name_param.name = "";
if (service["name"].isString()) name_param.name = service["name"].asString();
if (service["params"].isObject()) name_param.value = service["params"];
if (type == "yk") yks.push_back(name_param);
else if (type == "yt") yts.push_back(name_param);
}
}
config_config.units[uid].value = SPI_ON;
config_config.units[uid].softdog = UNIT_WATCHDOG_TIME;
config_config.units[uid].state = TRUE;
config_config.units[uid].type = MASTER_UNIT;
config_config.units[uid].yxcount = yxs.size();
config_config.units[uid].yccount = ycs.size();
config_config.units[uid].ymcount = yms.size();
config_config.units[uid].ykcount = yks.size();
config_config.units[uid].ytcount = yts.size();
if (config_config.units[uid].yccount > 0) {
config_config.units[uid].ycs = new struUnitYC[config_config.units[uid].yccount];
if (NULL != config_config.units[uid].ycs) {
memset(config_config.units[uid].ycs, 0, sizeof(struUnitYC) * config_config.units[uid].yccount);
for (int k = 0; k < config_config.units[uid].yccount; k++) {
snprintf(config_config.units[uid].ycs[k].name, sizeof(config_config.units[uid].ycs[k].name), "%s", ycs[k].name.c_str());
config_config.units[uid].ycs[k].order = ycorder++;
config_config.units[uid].ycs[k].value = 0;
config_config.units[uid].ycs[k].qds = 0x80; //默认为无效
config_config.units[uid].ycs[k].factor = 1;
config_config.units[uid].ycs[k].change_pos = 2;
config_config.units[uid].ycs[k].coef = 1.0f;
config_config.units[uid].ycs[k].base = 0;
config_config.units[uid].ycs[k].upBound = 9999999.999f;
config_config.units[uid].ycs[k].lowBound = -9999999.999f;
config_config.units[uid].ycs[k].limit1Enable = FALSE;
config_config.units[uid].ycs[k].limit1Low = 0;
config_config.units[uid].ycs[k].limit1High = 0.0f;
config_config.units[uid].ycs[k].limit2Enable = FALSE;
config_config.units[uid].ycs[k].limit2Low = 0;
config_config.units[uid].ycs[k].limit2High = 0.0f;
Json::Value param = ycs[k].value;
if (!param.isNull()) {
if (param["coef"].isDouble()) config_config.units[uid].ycs[k].coef = param["coef"].asFloat();
if (param["base"].isDouble()) config_config.units[uid].ycs[k].base = param["base"].asFloat();
if (param["upBound"].isDouble()) config_config.units[uid].ycs[k].upBound = param["upBound"].asFloat();
if (param["lowBound"].isDouble()) config_config.units[uid].ycs[k].lowBound = param["lowBound"].asFloat();
if (param["limit1Enable"].isInt()) config_config.units[uid].ycs[k].limit1Enable = param["limit1Enable"].asInt();
if (param["limit2Enable"].isInt()) config_config.units[uid].ycs[k].limit2Enable = param["limit2Enable"].asInt();
if (param["limit1Low"].isDouble()) config_config.units[uid].ycs[k].limit1Low = param["limit1Low"].asFloat();
if (param["limit2Low"].isDouble()) config_config.units[uid].ycs[k].limit2Low = param["limit2Low"].asFloat();
if (param["limit1High"].isDouble()) config_config.units[uid].ycs[k].limit1High = param["limit1High"].asFloat();
if (param["limit2High"].isDouble()) config_config.units[uid].ycs[k].limit2High = param["limit2High"].asFloat();
}
if (config_config.units[uid].type == MASTER_UNIT) {
config_database.ycs[config_config.units[uid].ycs[k].order].coef = config_config.units[uid].ycs[k].coef;
config_database.ycs[config_config.units[uid].ycs[k].order].base = config_config.units[uid].ycs[k].base;
}
if (pfdbycname) {
fseek(pfdbycname, sizeof(struYCStatic) * config_config.units[uid].ycs[k].order, SEEK_SET);
fwrite(ycs[k].name.c_str(), (MAX_NAME_SIZE << 2), 1, pfdbycname);
}
}
}
}
if (config_config.units[uid].ymcount > 0) {
config_config.units[uid].yms = new struUnitYM[config_config.units[uid].ymcount];
if (NULL != config_config.units[uid].yms) {
memset(config_config.units[uid].yms, 0, sizeof(struUnitYM) * config_config.units[uid].ymcount);
for (int k = 0; k < config_config.units[uid].ymcount; k++) {
snprintf(config_config.units[uid].yms[k].name, sizeof(config_config.units[uid].yms[k].name), "%s", yms[k].name.c_str());
config_config.units[uid].yms[k].order = ymorder++;
config_config.units[uid].yms[k].value = 0;
config_config.units[uid].yms[k].coef = 1.0f;
config_config.units[uid].yms[k].base = 0;
if (config_config.units[uid].type == MASTER_UNIT) {
config_database.yms[config_config.units[uid].yms[k].order].coef = config_config.units[uid].yms[k].coef;
config_database.yms[config_config.units[uid].yms[k].order].base = config_config.units[uid].yms[k].base;
}
if (pfdbymname) {
fseek(pfdbymname, sizeof(struYMStatic) * config_config.units[uid].yms[k].order, SEEK_SET);
fwrite(yms[k].name.c_str(), (MAX_NAME_SIZE << 2), 1, pfdbymname);
}
}
}
}
if (config_config.units[uid].ykcount > 0) {
config_config.units[uid].yks = new struUnitYK[config_config.units[uid].ykcount];
if (NULL != config_config.units[uid].yks) {
memset(config_config.units[uid].yks, 0, sizeof(struUnitYK) * config_config.units[uid].ykcount);
for (int k = 0; k < config_config.units[uid].ykcount; k++) {
snprintf(config_config.units[uid].yks[k].name, sizeof(config_config.units[uid].yks[k].name), "%s", yks[k].name.c_str());
config_config.units[uid].yks[k].order = ykorder++;
if (pfdbykname) {
fseek(pfdbykname, sizeof(struYKStatic) * config_config.units[uid].yks[k].order, SEEK_SET);
fwrite(yks[k].name.c_str(), (MAX_NAME_SIZE << 2), 1, pfdbykname);
}
}
}
}
if (config_config.units[uid].ytcount > 0) {
config_config.units[uid].yts = new struUnitYT[config_config.units[uid].ytcount];
if (NULL != config_config.units[uid].yts) {
memset(config_config.units[uid].yts, 0, sizeof(struUnitYT) * config_config.units[uid].ytcount);
for (int k = 0; k < config_config.units[uid].ytcount; k++) {
snprintf(config_config.units[uid].yts[k].name, sizeof(config_config.units[uid].yts[k].name), "%s", yts[k].name.c_str());
config_config.units[uid].yts[k].order = ytorder++;
if (pfdbytname) {
fseek(pfdbytname, sizeof(struYTStatic) * config_config.units[uid].yts[k].order, SEEK_SET);
fwrite(yts[k].name.c_str(), (MAX_NAME_SIZE << 2), 1, pfdbytname);
}
}
}
}
if (config_config.units[uid].yxcount > 0) {
config_config.units[uid].yxs = new struUnitYX[config_config.units[uid].yxcount];
if (NULL != config_config.units[uid].yxs) {
memset(config_config.units[uid].yxs, 0, sizeof(struUnitYX) * config_config.units[uid].yxcount);
for (int k = 0; k < config_config.units[uid].yxcount; k++) {
snprintf(config_config.units[uid].yxs[k].name, sizeof(config_config.units[uid].yxs[k].name), "%s", yxs[k].name.c_str());
config_config.units[uid].yxs[k].order = yxorder++;
config_config.units[uid].yxs[k].value = 0;
config_config.units[uid].yxs[k].qds = 0x80; //默认为无效
config_config.units[uid].yxs[k].invert = 0;
config_database.yxs[config_config.units[uid].yxs[k].order].auto_reset = 0;
if (pfdbyxname) {
fseek(pfdbyxname, sizeof(struYKStatic) * config_config.units[uid].yxs[k].order, SEEK_SET);
fwrite(yxs[k].name.c_str(), (MAX_NAME_SIZE << 2), 1, pfdbyxname);
}
Json::Value param = yxs[k].value;
if (!param.isNull()) {
if (param["invert"].asInt()) config_config.units[uid].yxs[k].invert = param["invert"].asInt();
}
}
}
}
}
if (pfdbyxname) fclose(pfdbyxname);
if (pfdbycname) fclose(pfdbycname);
if (pfdbymname) fclose(pfdbymname);
if (pfdbykname) fclose(pfdbykname);
if (pfdbytname) fclose(pfdbytname);
}
//更新链路配置文件
const Json::Value links = jsonRoot["links"];
int count = links.size();
int uartId = 0;
m_gLinkIDs.clear();
for (int i = 0; i < count; i++) {
const Json::Value link = links[i];
config_config.processes[i].state = TRUE;
config_config.processes[i].type = MASTER_UNIT;
config_config.processes[i].time_gap = 300; //默认参数
config_config.processes[i].poll_gap = 5;
config_config.processes[i].mode = PROCESS_MODE_MASTER;
if (link["linkName"].isString()) {
snprintf(config_config.processes[i].name, sizeof(config_config.processes[i].name), "%s", link["linkName"].asCString());
}
if (link["linkId"].isString()) {
config_config.processes[i].irn = strtoll(link["linkId"].asCString(), NULL, 10);
}
if (link["protocol"].isInt()) {
config_config.processes[i].proto = link["protocol"].asInt();
}
BYTE addrType = ADDR_TYPE_NORMAL; //根据协议设定单元地址类型
//处理链路参数,根据协议参数的不同来处理
const Json::Value params = link["params"];
if (!params.isNull()) {
switch (config_config.processes[i].proto) {
case PROTOCOL_HOST_MODBUS_RTU:
processUartParam(params, uartId);
config_config.processes[i].order = uartId++;
break;
case PROTOCOL_HOST_MODBUS_TCP:
case PROTOCOL_HOST_MODBUS_RTU_TCP:
addrType = ADDR_TYPE_IPV4;
processNetworkParam(params, i);
break;
case PROTOCOL_HOST_IEC104:
addrType = ADDR_TYPE_IPV4_FACNO;
processHostIEC104ProcessParam(params, i);
break;
}
}
//处理链接设备
const Json::Value devices = link["devices"];
if (devices.isArray()) {
int size = devices.size();
for (int j = 0; j < size; j++) {
std::string id = devices[j].asCString();
if (deviceIDs.find(id) != deviceIDs.end()) {
int uid = deviceIDs[id].uid;
std::string address = deviceIDs[id].address;
config_config.processes[i].units[j] = uid;
//根据协议修改地址
if (address != "") {
if (addrType == ADDR_TYPE_HEX) {
StringToHex(address.c_str(), config_config.units[uid].addr);
} else if (addrType == ADDR_TYPE_IPV4) {
DWORD addr;
inet_pton(AF_INET, address.c_str(), (struct in_addr*)&addr);
memcpy(config_config.units[uid].addr, &addr, sizeof(addr));
} else if (addrType == ADDR_TYPE_IPV4_FACNO) {
std::vector<std::string> tokens = split(address, ':');
if (tokens.size() >= 2) {
DWORD addr;
inet_pton(AF_INET, tokens.at(0).c_str(), (struct in_addr*)&addr);
memcpy(config_config.units[uid].addr, &addr, sizeof(addr));
addr = (DWORD)atoi(tokens.at(1).c_str());
memcpy(&config_config.units[uid].addr[4], &addr, sizeof(addr));
} else {
DWORD addr;
inet_pton(AF_INET, tokens.at(0).c_str(), (struct in_addr*)&addr);
memcpy(config_config.units[uid].addr, &addr, sizeof(addr));
addr = 1;
memcpy(&config_config.units[uid].addr[4], &addr, sizeof(addr));
}
} else {
DWORD addr = (DWORD)atoi(address.c_str());
memcpy(config_config.units[uid].addr, &addr, sizeof(addr));
}
}
}
}
}
}
//保存数据
configWriteNodeCFG();
configWriteSystemCFG();
configWriteHardwareCFG();
configWriteProcessCFG();
configWriteUnitCFG();
configWriteDatabaseCFG();
//保存静态文件
configWriteStaticUnitCFG(); //units.sta
result = true;
} while (0);
//重启application
if (result) {
g_dataAcquisitionReload = true;
} else {
vLog(LOG_WARN, "配置文件读取失败,请检查后下发!\n");
}
}
bool OnReceivedSystemAction(noPollConn* conn, const std::string cmdId, const std::string cmd, const Json::Value data)
{
do {
if (cmd == "configUpdate") { //配置更新
g_traceId = cmdId;
dealConfigFile(data);
} else if (cmd == "deviceControl") {
g_traceId = cmdId; //response命令的traceId
OnReceivedDeviceCommand(data);
} else {
vLog(LOG_DEBUG, "command: %s is not supported.\n", cmd.c_str());
}
} while (0);
return true;
}
void on_message(noPollConn* conn, const char *msg, const int size)
{
if (msg == NULL) return;
if (size <= 0) return;
Json::Value jsonRoot;
jsonRoot.clear();
std::string err;
Json::CharReaderBuilder builder;
Json::CharReader* reader(builder.newCharReader());
vLog(LOG_DEBUG, "Received: %s.\n", msg);
do {
if (!reader->parse(msg, msg + size, &jsonRoot, &err)) {
vLog(LOG_ERROR, "reader->parse(msg, msg + size, &jsonRoot, &err) error<%d,%s>。\n", errno, err.c_str());
break;
}
if (jsonRoot["cmd"].isNull()) {
vLog(LOG_ERROR, "json format lost cmd part.\n");
break;
}
if (jsonRoot["data"].isNull()) {
vLog(LOG_ERROR, "json format lost data part.\n");
break;
}
std::string cmd = jsonRoot["cmd"].asString();
std::string cmdId = jsonRoot["cmdId"].asString();
Json::Int64 mtime = jsonRoot["time"].asInt64();
Json::Value datas = jsonRoot["data"];
OnReceivedSystemAction(conn, cmdId, cmd, datas);
} while (0);
}
#endif
pthread_mutex_t mutex;
void stop(int signo)
{
pthread_mutex_lock(&mutex);
int i;
vLog(LOG_ERROR, "cmg received exit signel(%d)\n", signo);
{
for (i = 0; i < PROCESSES_NUM; i++)
{
if (procs[i] != NULL)
{
procs[i]->Destroy();
delete procs[i];
procs[i] = NULL;
}
}
destroy_thread();
}
if (zlog_inited) zlog_fini();
m_database.SaveDatabaseFromMemoryToFile("./ry-das-dn.db");
m_database.Close();
signo = 0;
pthread_mutex_unlock(&mutex);
exit(signo);
}
void heart_beat(noPollConn* conn, int status)
{
//发送心跳报文
Json::Value payload;
payload["ttl"] = 30000;
payload["status"] = status;
if (status == 1) {
Json::Value jsonItem;
Json::Value jsonValue;
//for (int i = 0; i < static_cast<int>(m_gLinkIDs.size()); i++) {
for (int i = 0; i < PROCESSES_NUM; i++) {
if (config.processes[i].state == TRUE) {
char linkId[32];
snprintf(linkId, sizeof(linkId), "%lld", config.processes[i].irn);
jsonValue["linkId"] = linkId;
jsonValue["online"] = (config.processes[i].softdog >= PROCESS_WATCHDOG_TIME) ? false : true;
jsonItem.append(jsonValue);
}
}
if (jsonItem.size() > 0) {
payload["links"] = jsonItem;
}
}
publish_sensor_data(conn, "", "heartbeat", payload);
}
#if 0
DWORD OnReadChannelRingBuff(char *rbuff, DWORD len)
{
DWORD l;
if (channelBuffer.buf == NULL) return -1;
len = wMin(len, (channelBuffer.save - channelBuffer.load));
/* 第一部分的拷贝:从环形缓冲区读取数据直至缓冲区最后一个 */
l = wMin(len, MAX_DISPLAY_BUFFER_SIZE - (channelBuffer.load & (MAX_DISPLAY_BUFFER_SIZE - 1)));
memcpy(rbuff, channelBuffer.buf + (channelBuffer.load & (MAX_DISPLAY_BUFFER_SIZE - 1)), l);
/* 如果溢出则在缓冲区头读取剩余的部分如果没溢出这句代码相当于无效 */
memcpy(rbuff + l, channelBuffer.buf, len - l);
channelBuffer.load += len;
return len;
}
BOOLEAN publishMonLinkLog(noPollConn* conn)
{
Json::Value jsonRoot;
Json::Value jsonItem;
if (NULL == channelBuffer.buf) return FALSE;
if (!channelBuffer.enabled) return FALSE;
char str_len[4];
char disp[2048];
int disp_len = 0;
int len = 0;
while (channelBuffer.load != channelBuffer.save) { //读取一个字节判断该字节是否位t和r
len = OnReadChannelRingBuff(&disp[disp_len], 1);
if (disp[disp_len] == 't' || disp[disp_len] == 'r') {
OnReadChannelRingBuff(str_len, 4);
len = atoi(str_len);
if (len) {
disp_len = 0;
if (disp[0] == 'r') disp[0] = 'R';
else if (disp[0] == 't') disp[0] = 'S';
disp[1] = ':';
len = OnReadChannelRingBuff((char *)&disp[2], len);
disp[len+2] = '\0';
jsonItem.append(disp);
continue;
}
}
disp_len += len;
}
if (disp_len) jsonItem.append(disp);
jsonRoot["linkIrn"] = struMonLinkLog.m_iLinkIrn;
jsonRoot["logs"] = jsonItem;
if (jsonItem.size() <= 0) return TRUE;
publish_sensor_data(conn, "", "LINK_LOG", jsonRoot);
}
#endif
bool publishinitDeviceData(noPollConn* conn, int uid)
{
if (uid < 0 || uid >= UNIT_NUM) return false;
if ((config.units[uid].state & 0x80) != 0x80) {
return false;
}
if ((config.units[uid].state & 0x40) == 0x40) { //该设备已经发送过初始化
return false;
}
Json::Value root;
Json::Value values;
int i;
int count = GetUnitYCCount(uid);
if (count) {
for (i = 0; i < count; i++) {
values[(const char *)config.units[uid].ycs[i].name] = GetUnitYCReal(uid, i);
}
}
count = GetUnitYMCount(uid);
if (count) {
for (i = 0; i < count; i++) {
values[(const char *)config.units[uid].yms[i].name] = GetUnitYMReal(uid, i);
}
}
count = GetUnitYXCount(uid);
if (count) {
for (i = 0; i < count; i++) {
values[(const char *)config.units[uid].yxs[i].name] = GetUnitYX(uid, i);
}
}
if (values.size()) {
root["deviceId"] = static_units[uid].deviceId;
root["values"] = values;
config.units[uid].state |= 0x40;
return publish_sensor_data(conn, "", "initDeviceData", root);
}
return false;
}
bool publishAnalogData(noPollConn* conn, int uid)
{
if (uid < 0 || uid >= UNIT_NUM) return false;
Json::Value root;
Json::Value values;
int count = GetUnitYCCount(uid);
if (count) {
for (int i = 0; i < count; i++) {
values[(const char *)config.units[uid].ycs[i].name] = GetUnitYCReal(uid, i);
}
}
if (values.size()) {
root["deviceId"] = static_units[uid].deviceId;
root["values"] = values;
return publish_sensor_data(conn, "", "analogData", root);
}
return false;
}
bool publishStateData(noPollConn* conn, int uid)
{
if (uid < 0 || uid >= UNIT_NUM) return false;
Json::Value root;
Json::Value values;
int count = GetUnitYXCount(uid);
if (count) {
for (int i = 0; i < count; i++) {
values[(const char *)config.units[uid].yxs[i].name] = GetUnitYX(uid, i);
}
}
if (values.size()) {
root["deviceId"] = static_units[uid].deviceId;
root["values"] = values;
return publish_sensor_data(conn, "", "stateData", root);
}
return false;
}
bool publishHistoryAnalogData(noPollConn* conn, int uid)
{
if (uid < 0 || uid >= UNIT_NUM) return false;
Json::Value root;
Json::Value values;
if (values.size()) {
root["deviceId"] = static_units[uid].deviceId;
root["values"] = values;
return publish_sensor_data(conn, "", "historyAnalogData", root);
}
return false;
}
bool publishHistoryStateData(noPollConn* conn, int uid)
{
if (uid < 0 || uid >= UNIT_NUM) return false;
Json::Value root;
Json::Value values;
if (values.size()) {
root["deviceId"] = static_units[uid].deviceId;
root["values"] = values;
return publish_sensor_data(conn, "", "historyStateData", root);
}
return false;
}
#if 0
bool publishSOEData(noPollConn* conn)
{
int i, uid;
int soe_point;
BOOLEAN soe_value;
BYTE soe_qds;
unionCP56Time soe_time;
QWORD dataTime;
Json::Value jsonRoot;
Json::Value jsonItem;
Json::Value jsonValue;
for (i = 0; i < DATABASE_SOE_NUM; i++)
{
soe_point = GetUnitSOE(uid, soe_value, soe_qds, soe_time);
if (soe_point < 0) break;
jsonValue["irn"] = (Json::Int64)GetUnitYXIRNByPoint(uid, soe_point);
jsonValue["dataValue"] = soe_value;
dataTime = unionCP56TimetoTime_t(&soe_time);
dataTime *= 1000;
dataTime += soe_time.millisecond % 1000;
jsonValue["dataTime"] = (Json::Int64)dataTime;
jsonItem.append(jsonValue);
}
if (jsonItem.size() <= 0) return FALSE;
#if 0
QLONG pushTime = system32.timers;
pushTime *= 1000;
jsonRoot["pushTime"] = (Json::Int64)pushTime;
#endif
jsonRoot["soe"] = jsonItem;
return publish_sensor_data(conn, "", "SOE_DATA", jsonRoot);
}
bool publishYXBWData(noPollConn* conn)
{
int i, uid;
int yxbw_point;
int yxbw_type;
BOOLEAN yxbw_value;
BYTE yxbw_qds;
unionCP56Time yxbw_time;
QWORD dataTime;
Json::Value jsonRoot;
Json::Value jsonItem;
Json::Value jsonValue;
for (i = 0; i < DATABASE_YXBW_NUM; i++)
{
yxbw_point = GetUnitYXBW(uid, yxbw_value, yxbw_qds, yxbw_type, yxbw_time);
if (yxbw_point < 0) break;
jsonValue["irn"] = (Json::Int64)GetUnitYXIRNByPoint(uid, yxbw_point);
jsonValue["dataValue"] = yxbw_value;
dataTime = unionCP56TimetoTime_t(&yxbw_time);
dataTime *= 1000;
dataTime += yxbw_time.millisecond % 1000;
jsonValue["dataTime"] = (Json::Int64)dataTime;
jsonItem.append(jsonValue);
}
jsonRoot["yxs"] = jsonItem;
if (jsonItem.size() <= 0) return FALSE;
#if 0
QLONG pushTime = system32.timers;
pushTime *= 1000;
jsonRoot["pushTime"] = (Json::Int64)pushTime;
#endif
return publish_sensor_data(conn, "", "stateData", jsonRoot);
}
bool publishYCBWData(noPollConn* conn)
{
int i;
int uid;
Json::Value jsonRoot;
Json::Value jsonItem;
Json::Value jsonValue;
int ycbw_point;
int ycbw_type;
LONG ycbw_value;
BYTE ycbw_qds;
unionCP56Time ycbw_time;
QWORD dataTime;
for (i = 0; i < DATABASE_YCBW_NUM; i++)
{
ycbw_point = GetUnitYCBW(uid, ycbw_value, ycbw_qds, ycbw_type, ycbw_time);
if (ycbw_point < 0) break;
jsonValue["irn"] = (Json::Int64)GetUnitYCIRNByPoint(uid, ycbw_point);
jsonValue["dataValue"] = GetUnitYCRealFromValue(uid, ycbw_point, ycbw_value);
dataTime = unionCP56TimetoTime_t(&ycbw_time);
dataTime *= 1000;
dataTime += ycbw_time.millisecond % 1000;
jsonValue["dataTime"] = (Json::Int64)dataTime;
jsonItem.append(jsonValue);
}
jsonRoot["ycs"] = jsonItem;
if (jsonItem.size() <= 0) return FALSE;
return publish_sensor_data(conn, "", "analogData", jsonRoot);
}
#endif
void freeMem(void)
{
if (database.yxs) {
delete [] database.yxs;
database.yxs = NULL;
}
if (database.ycs) {
delete [] database.ycs;
database.ycs = NULL;
}
if (database.yms) {
delete [] database.yms;
database.yms = NULL;
}
if (database.yks) {
delete [] database.yks;
database.yks = NULL;
}
if (database.yts) {
delete [] database.yts;
database.yts = NULL;
}
for (int i = 0; i < UNIT_NUM; i++) {
if (config.units[i].yxs) {
delete [] config.units[i].yxs;
config.units[i].yxs = NULL;
}
if (config.units[i].ycs) {
delete [] config.units[i].ycs;
config.units[i].ycs = NULL;
}
if (config.units[i].yms) {
delete [] config.units[i].yms;
config.units[i].yms = NULL;
}
if (config.units[i].yks) {
delete [] config.units[i].yks;
config.units[i].yks = NULL;
}
if (config.units[i].yts) {
delete [] config.units[i].yts;
config.units[i].yts = NULL;
}
}
if (msgBuffer.buf) {
delete [] msgBuffer.buf;
msgBuffer.buf = NULL;
}
if (channelBuffer.buf) {
delete [] channelBuffer.buf;
channelBuffer.buf = NULL;
}
}
#if 0
static int client_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) {
switch (reason) {
case LWS_CALLBACK_CLIENT_ESTABLISHED:
printf("Client connected to server\n");
// 发送消息到服务器
char *payload = "Hello, world.\n";
lws_write(wsi, payload, strlen(payload), LWS_WRITE_TEXT);
break;
case LWS_CALLBACK_CLIENT_RECEIVE:
printf("Received from server: %s\n", (char *)in);
break;
case LWS_CALLBACK_CLOSED:
printf("Connection closed\n");
lws_cancel_service(lws_get_context(wsi));
break;
default:
break;
}
return 0;
}
static const struct lws_protocols protocols[] = {
{
"http-only", // 这不是WebSocket协议但客户端可能需要处理HTTP重定向等
NULL,
0,
0, NULL, 0
},
{
"lws-minimal-client", // WebSocket客户端协议名
client_callback,
sizeof(struct per_session_data__), /* per_session_data should be sufficiently big */
0, NULL, 0
},
{ NULL, NULL, 0, 0, NULL, 0 } /* Sentinel */
};
#endif
void releaseAllUnits(void)
{
for (int i = 0; i < UNIT_NUM; i++) {
if ((config.units[i].state & 0x01) != 0x01) continue;
config.units[i].state &= 0xBF; //0x40取反
}
}
int main(int argc, char** argv)
{
int c;
BOOLEAN enable_sqlite_configuration = FALSE;
BOOLEAN enable_auto_platform = TRUE;
char issmqtt_config_pathName[256] = {'\0'};
char host[256] = {"127.0.0.1"};
int port = 7790;
char nodeId[128] = {"runyang_dn"};
char version[128] = {"v1.0"};
//获取可执行文件所在目录
const char default_config[] = "[global]\n"
"default format = \"%d.%ms [%-6V] - %m%n\"\n"
"[rules]\n"
"my_cat.* >stderr\n";
int rc = dzlog_init("./application.conf", "my_cat");
if (rc < 0) {
rc = dzlog_init(default_config, "my_cat");
if (rc < 0) {
zlog_inited = FALSE;
} else {
fprintf(stderr, "dzlog_init(\"./application.conf\", \"my_cat\") failed, load default config.\n");
zlog_inited = TRUE;
}
} else {
zlog_inited = TRUE;
}
pthread_mutex_init(&mutex, NULL);
static struct option long_options[] =
{
{ "directory", required_argument, NULL, 'd' },
{ "host", required_argument, NULL, 'h'},
{ "port", required_argument, NULL, 'p'},
{ "nodeId", required_argument, NULL, 'n' },
{ "version", required_argument, NULL, 'v'},
{ "help", no_argument, NULL, 'H' },
{ NULL, 0, NULL, 0 }
};
while (1)
{
int opt_index = 0;
c = getopt_long(argc, argv, "d:h:p:n:v:H", long_options, &opt_index);
if (c == -1) break;
switch (c)
{
case 'd':
snprintf(configpath, sizeof(configpath), "%s", optarg);
break;
case 'h':
snprintf(host, sizeof(host), "%s", optarg);
break;
case 'p':
port = strtol(optarg, NULL, 10);
break;
case 'n':
snprintf(nodeId, sizeof(nodeId), "%s", optarg);
break;
case 'v':
snprintf(version, sizeof(version), "%s", optarg);
break;
case '?':
case 'H':
vLog(LOG_DEBUG, "Usage: %s [OPTION]... \n"
" -d, --directory : set configuration file directory. Default is /data/config/rtufiles\n"
" -h, --host : ws ip. Default is localhost\n"
" -p, --port : ws port, default is 1883\n"
" -n, --nodeid : ws nodeId, default is runyang_dn\n"
" -v, --version : sw version, default is v1.0\n"
" -H, --help : print this usage\n", argv[0]);
return (EXIT_SUCCESS);
default:
vLog(LOG_DEBUG, "?? getopt returned character code 0%c ??\n", c);
}
}
signal(SIGHUP, stop);
signal(SIGQUIT, stop);
signal(SIGKILL, stop);
signal(SIGPIPE, stop);
signal(SIGSTOP, stop);
signal(SIGINT, stop);
signal(SIGILL, stop);
signal(SIGSEGV, stop);
signal(SIGTERM, stop);
signal(SIGABRT, stop);
signal(SIGFPE, stop);
#if 1
noPollCtx *ctx = nopoll_ctx_new();
nopoll_log_enable(ctx, nopoll_false);
nopoll_log_color_enable(ctx, nopoll_false);
noPollConn *conn;
DWORD last_connect_sec = 0;
#else
struct lws_context_creation_info info;
struct lws_client_connect_info ccinfo;
struct lws_context *context;
memset(&info, 0, sizeof info);
info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
info.protocols = protocols;
info.gid = -1;
info.uid = -1;
info.options = 0;
context = lws_create_context(&info);
if (!context) {
fprintf(stderr, "libwebsocket init failed\n");
return -1;
}
char url[512];
snprintf(url, sizeof(url), "/node/%s/%s/", nodeId, version);
memset(&ccinfo, 0, sizeof ccinfo);
ccinfo.context = context;
ccinfo.address = host;
ccinfo.port = port;
ccinfo.path = url;
ccinfo.host = host;
ccinfo.origin = "origin";
ccinfo.protocol = protocols[1].name;
ccinfo.ietf_version_or_minus_one = -1; /* auto */
lws_client_connect_via_info(&ccinfo);
#endif
do {
int status = 2; //0 - 离线, 1 - 在线, 2 - 未配置
g_dataAcquisitionReload = false;
vLog(LOG_DEBUG, "system initialize...\n");
char szHostCode[128];
memset(szHostCode, '\0', sizeof(szHostCode));
if (!initialize_system(FALSE, FALSE, NULL, szHostCode)) {
vLog(LOG_ERROR, "system initialize error.\n");
if (enable_auto_platform) { //主动和平台链接
char szHostName[32] = "";
gethostname(szHostName, sizeof(szHostName));
nodes.m_node[0].m_netnode_no = 0;
nodes.m_node[0].m_tcitype = MASTER_TCI;
nodes.m_node[0].m_target_addr = INADDR_LOOPBACK;
nodes.m_node[0].m_target_port = 15000;
snprintf(nodes.m_node[0].m_machine_name, sizeof(nodes.m_node[0].m_machine_name), "%s", szHostName);
nodes.m_node[0].irn = 0; //没有配置
vLog(LOG_DEBUG, "nodes.m_node[0].m_machine_name is: %s\n", nodes.m_node[0].m_machine_name);
}
} else {
vLog(LOG_DEBUG, "configed node id is: %s.\n", nodes.m_node[0].m_machine_name);
snprintf(nodeId, sizeof(nodeId), "%s", nodes.m_node[0].m_machine_name);
if (system32.version[0] != '\0') {
snprintf(version, sizeof(version), "%s", system32.version);
} else {
snprintf(version, sizeof(version), "%s", "1");
}
if (enable_auto_platform) { //增加协议和单元配置的数量统计
//m_gLinkIDs.clear();
unitname2service_map.clear();
#if 0
for (int i = 0; i < PROCESSES_NUM; i++) {
char id[128];
snprintf(id, sizefof(id), "%lld", config.processes[i].irn);
m_gLinkIDs.push_back(id);
}
#endif
for (int i = 0; i < UNIT_NUM; i++) {
if (config.units[i].state != TRUE) continue;
std::string unit_id = static_units[i].deviceId;
name2servicemap name2service_map;
for (int j = 0; j < config.units[i].ykcount; j++) {
std::string name = config.units[i].yks[j].name;
if (name2service_map.find(name) == name2service_map.end()) {
struct_service_item item;
item.type = CMD_CONTROL_OPERATION;
item.uid = i;
item.point = j;
item.order = config.units[i].yks[j].order;
name2service_map.insert(name2servicemap::value_type(name, item));
} else {
vLog(LOG_WARN, "同一个设备不能有两个相同的遥控名<%s>,请检查。\n", name.c_str());
}
}
for (int j = 0; j < config.units[i].ytcount; j++) {
std::string name = config.units[i].yts[j].name;
if (name2service_map.find(name) == name2service_map.end()) {
struct_service_item item;
item.type = CMD_CONTROL_SETTING;
item.uid = i;
item.point = j;
item.order = config.units[i].yts[j].order;
name2service_map.insert(name2servicemap::value_type(name, item));
} else {
vLog(LOG_WARN, "同一个设备不能有两个相同的遥调名<%s>,请检查。\n", name.c_str());
}
}
if (unitname2service_map.find(unit_id) == unitname2service_map.end()) {
unitname2service_map.insert(unitname2servicemap::value_type(unit_id, name2service_map));
} else {
vLog(LOG_WARN, "系统配置了两个相同的设备ID<%s>,请检查。\n", unit_id.c_str());
}
}
status = 1;
}
}
if (enable_auto_platform) {
#if 1
//创建WS链接
if (ctx != NULL) {
nopoll_conn_connect_timeout(ctx, 50000);
char url[512];
char cPort[64];
snprintf(url, sizeof(url), "/node/%s/%s", nodeId, version);
snprintf(cPort, sizeof(cPort), "%d", port);
vLog(LOG_DEBUG, "%d here to connect:%s:%s.\n", __LINE__, host, cPort);
releaseAllUnits();
conn = nopoll_conn_new(ctx, host, cPort, NULL, url, NULL, NULL);
last_connect_sec = system32.timers;
}
//创建历史数据库
m_database.Open("./ry-das-dn.db", SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, 0);
m_database.MoveDatabaseToMemory();
//创建数据库表
try {
SQLiteStatement *pStmt = new SQLiteStatement(&m_database);
pStmt->SqlStatement("CREATE TABLE IF NOT EXISTS HIS_RECORD_TABLE ("
"ID INTEGER PRIMARY KEY AUTOINCREMENT,"
"MSG TEXT NOT NULL,"
"DATETIME INTEGER NOT NULL,"
"STATUS INTEGER NOT NULL);");
//比较表空间
delete pStmt;
} catch (SQLiteException &exception) {
vLog(LOG_ERROR, "\nException Occured");
exception.Show();
vLog(LOG_ERROR, "SQLite result code: %d,%s\n", exception.GetSqliteResultCode(), exception.GetErrorDescription().c_str());
return FALSE;
}
#endif
}
unsigned int m_runCount = 0;
unsigned int count = 0;
unsigned int critical = 0;
CChangeMaster masterTci;
if (!masterTci.Init()) {
break;
}
masterTci.MasterTciFirstRun();
time_t last_sec = 0;
while (TRUE) {
m_runCount++;
masterTci.MasterSend();
usleep(MASTER_TCI_SEND_INTERVAL);
if (MASTER_TCI == CChangeMaster::m_tcitype) {
if (m_runCount > count) {
count = m_runCount;
critical = 0;
} else {
critical++;
if (critical > 15) {
break;
}
}
if (enable_auto_platform) {
#if 1
#if 0
nopoll_bool isOk = nopoll_conn_is_ok(conn);
if (nopoll_conn_is_ok(conn) != nopoll_true) {
nopoll_conn_close(conn);
char url[512];
char cPort[64];
snprintf(url, sizeof(url), "/node/%s/%s", nodeId, version);
snprintf(cPort, sizeof(cPort), "%d", port);
vLog(LOG_DEBUG, "tcp connect error url is:%s.\n", url);
vLog(LOG_DEBUG, "%d here to connect:%s:%s.\n", __LINE__, host, cPort);
releaseAllUnits();
conn = nopoll_conn_new(ctx, host, cPort, NULL, url, NULL, NULL);
//continue;
} else {
if (!nopoll_conn_wait_until_connection_ready(conn, 5)) {
nopoll_conn_close(conn);
char url[512];
char cPort[64];
snprintf(url, sizeof(url), "/node/%s/%s", nodeId, version);
snprintf(cPort, sizeof(cPort), "%d", port);
vLog(LOG_DEBUG, "websocket connect not ready url is:%s.\n", url);
vLog(LOG_DEBUG, "%d here to connect:%s:%s.\n", __LINE__, host, cPort);
releaseAllUnits();
conn = nopoll_conn_new(ctx, host, cPort, NULL, url, NULL, NULL);
//continue;
}
}
#else
nopoll_bool isOk = nopoll_conn_is_ready(conn);
#endif
#if 1
if (isOk) {
int msg_count = 0;
noPollMsg *msg[40];
while ((msg[msg_count] = nopoll_conn_get_msg(conn)) != NULL) {
vLog(LOG_DEBUG, "recv length = %d, %d\n", nopoll_msg_get_payload_size(msg[msg_count]), nopoll_msg_is_final(msg[msg_count]));
if (nopoll_msg_is_final(msg[msg_count])) {
msg_count++;
break;
} else {
msg_count++;
}
}
if (msg_count > 0) {
int buffer_len;
BYTE *buffer = websocket_msg_join(msg, msg_count, &buffer_len);
if (buffer) {
on_message(conn, (const char *)buffer, buffer_len);
delete [] buffer;
buffer = NULL;
}
}
for (int i = 0; i < msg_count; i++) {
nopoll_msg_unref(msg[i]);
}
} else {
if (last_connect_sec > 0 && system32.timers > (last_connect_sec + 30)) {
nopoll_conn_connect_timeout(ctx, 50000);
char url[512];
char cPort[64];
snprintf(url, sizeof(url), "/node/%s/%s", nodeId, version);
snprintf(cPort, sizeof(cPort), "%d", port);
vLog(LOG_DEBUG, "%d here to connect:%s:%s.\n", __LINE__, host, cPort);
releaseAllUnits();
conn = nopoll_conn_new(ctx, host, cPort, NULL, url, NULL, NULL);
last_connect_sec = system32.timers;
}
}
#endif
BOOLEAN sec_changed = FALSE;
if (last_sec != (time_t)system32.timers) {
last_sec = system32.timers;
sec_changed = TRUE;
}
if (sec_changed) {
#if 0
if (struMonLinkLog.m_iStartTime > 0) {
if (struMonLinkLog.m_iStartTime > 0 && system32.timers <= (struMonLinkLog.m_iStartTime + 30)) { //默认为30s调试设置为2s
//发送报文
publishMonLinkLog(conn);
} else {
channelBuffer.enabled = FALSE;
channelBuffer.mon_port = -1;
struMonLinkLog.m_iLinkIrn = -1;
struMonLinkLog.m_iStartTime = 0;
}
}
#endif
if ((last_sec % 20) == 0) {
heart_beat(conn, status);
}
}
for (int i = 0; i < UNIT_NUM; i++) {
if (config.units[i].state != TRUE) continue;
MakeYKFrame(conn, i);
MakeYTFrame(conn, i);
if (sec_changed) {
publishinitDeviceData(conn, i);
if ((last_sec % 60) == 0) { //更新数据
publishAnalogData(conn, i);
publishStateData(conn, i);
}
}
}
#else
lws_service(context, 500);
#endif
}
}
if (g_dataAcquisitionReload) {
break;
}
}
if (critical > 15) {
vLog(LOG_ERROR, "unknow error.\n");
}
masterTci.ChangeDelete();
destroy_thread();
freeMem();
vLog(LOG_DEBUG, "App: dataAcquisition start reload.\n");
} while(1);
pthread_mutex_destroy(&mutex);
if (zlog_inited) zlog_fini();
#if 1
nopoll_conn_close(conn);
nopoll_ctx_unref(ctx);
m_database.SaveDatabaseFromMemoryToFile("./ry-das-dn.db");
m_database.Close();
#endif
vLog(LOG_DEBUG, "system stop okay.\n");
return EXIT_SUCCESS;
}