map/das-dn/comm/netproc.cpp

748 lines
22 KiB
C++
Raw Normal View History

2024-07-08 10:27:17 +08:00
#include "netproc.h"
CNetProcess::CNetProcess()
{
m_sock = -1;
last_addr = 0;
last_port = 0;
socket_type = SOCK_STREAM;
bind_addr = INADDR_ANY;
bind_port = 8000;
ignored_source = FALSE;
target_addr = INADDR_ANY;
target_port = 0;
}
CNetProcess::~CNetProcess()
{
}
BOOLEAN CNetProcess::OnPreCreate(int id)
{
int i;
optval_t opt;
struct sockaddr_in addr;
CNetProcessItem *pItem;
if (!CProcess::OnPreCreate(id)) return FALSE;
if (GetOption(&network, sizeof(network)))
{ //获取新配置
socket_type = network.socket_type;
bind_addr = network.bind_addr;
bind_port = network.bind_port;
ignored_source = network.ignored_source;
target_addr = network.target_addr;
target_port = network.target_port;
}
if ((int)m_sock > 0)
{
close(m_sock);
m_sock = -1;
}
m_sock = socket(AF_INET, socket_type, 0);
if ((int)m_sock < 0)
{
vLog(LOG_ERROR, "socket created error(%d,%s).\n", errno, strerror(errno));
return FALSE;
}
if (bind_port == 0)
{
if (socket_type == SOCK_STREAM)
{
close(m_sock);
m_sock = -1;
//建立所有必要的单元链路
for (i = 0; i < PROCESS_UNIT_NUM; i++)
{
if (GetUnitID(i) < 0) break;
pItem = GetItem(i);
if (pItem == NULL) break;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(target_port);
GetUnitAddr(GetUnitID(i), (BYTE *)&addr.sin_addr, 4);
pItem->Attach(GetUnitID(i), -1, addr.sin_addr.s_addr, target_port);
}
}
}
else if (bind_addr != INADDR_BROADCAST)
{ //绑定端口非0代表在本地进行绑定操作
opt = 1; setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = bind_addr;
addr.sin_port = htons(bind_port);
char sin_addr[20] = { '\0' };
inet_ntop(AF_INET, (void *)&addr.sin_addr, sin_addr, 16);
vLog(LOG_INFO, "prepare bind to %s:%d.\n", sin_addr, bind_port);
if (bind(m_sock, (struct sockaddr *)&addr, sizeof(addr)) < 0)
{
vLog(LOG_ERROR, "socket bind local addr error(%d,%s).\n", errno, strerror(errno));
close(m_sock);
m_sock = -1;
return FALSE;
}
//暂时不考虑支持组播
//支持TCP方式的侦听模式
for (i = 0; i < PROCESS_UNIT_NUM; i++)
{
if (GetUnitID(i) < 0) break;
pItem = GetItem(i);
if (pItem == NULL) break;
addr.sin_family = AF_INET;
addr.sin_port = htons(target_port);
pItem->Attach(GetUnitID(i), -1, -1, target_port);
}
if (socket_type == SOCK_STREAM)
{
if (listen(m_sock, MAX_CONN_COUNT) < 0)
{
vLog(LOG_ERROR, "TCP socket listen error(%d,%s).\n", errno, strerror(errno));
close(m_sock);
m_sock = -1;
return FALSE;
}
}
}
if (socket_type == SOCK_DGRAM && (target_addr == INADDR_BROADCAST || (target_addr & HTONL(0xff000000)) == HTONL(0xff000000)))
{
opt = 1;
if (setsockopt(m_sock, SOL_SOCKET, SO_BROADCAST, &opt, sizeof(opt)) < 0)
{
vLog(LOG_ERROR, "UDP socket set broadcast error(%d,%s).\n", errno, strerror(errno));
close(m_sock);
m_sock = -1;
return FALSE;
}
}
//UDP方式
if (socket_type == SOCK_DGRAM)
{
if (ignored_source)
{
if (GetUnitID(0) < 0)
{
vLog(LOG_ERROR, "No Unit(%d) Found(%d).\n", GetUnitID(0), strerror(errno));
return FALSE;
}
pItem = GetItem(0);
if (pItem == NULL) return FALSE;
pItem->Attach(GetUnitID(0), m_sock, addr.sin_addr.s_addr, target_port);
}
else
{
for (i = 0; i < PROCESS_UNIT_NUM; i++)
{
if (GetUnitID(i) < 0) break;
pItem = GetItem(i);
if (pItem == NULL) break;
addr.sin_family = AF_INET;
addr.sin_port = htons(target_port);
GetUnitAddr(GetUnitID(i), (BYTE *)&addr.sin_addr, 4);
pItem->Attach(GetUnitID(i), m_sock, addr.sin_addr.s_addr, target_port);
}
}
}
return TRUE;
}
void CNetProcess::Destroy(void)
{
CProcess::Destroy();
int i;
//关闭连接
if ((int)m_sock > 0)
{
close(m_sock);
}
//析构基本单元
for (i = 0; i < PROCESS_UNIT_NUM; i++)
{
DestroyItem(i, FALSE);
}
}
BOOLEAN CNetProcess::isClient(void)
{
return (network.bind_port == 0 ? TRUE : FALSE);
}
#if 1
BOOLEAN CNetProcess::net_socket_nonblock(SOCKET sock, BOOLEAN blocking)
{
DWORD ul = 1;
if (blocking) ul = 1;
else ul = 0;
return ioctl(sock, FIONBIO, &ul);
}
BOOLEAN CNetProcess::Connect(CNetProcessItem* pItem, int ord, BOOLEAN bDefault)
{
int sock;
char url[60];
if (NULL == pItem) return FALSE;
//sock = socket(AF_INET, SOCK_STREAM, 0);
memset(url, '\0', sizeof(url));
if (bDefault && (target_addr != INADDR_ANY))
{
inet_ntop(AF_INET, &target_addr, url, 16);
}
else
{
DWORD addr;
GetUnitAddr(GetUnitID(ord), (BYTE *)&addr, 4);
if (addr == INADDR_ANY)
{
vLog(LOG_INFO, "Unit(%d) socket connect address cann't be any!\n here we can use dns.\n", pItem->GetUnitID());
BYTE data[MAX_UNIT_PARAM_SIZE];
GetUnitOption(pItem->GetUnitID(), data, sizeof(data));
//0xAA, 0x55.........0x55, 0xAA
if (data[0] == 0xAA && data[1] == 0x55 && data[MAX_UNIT_PARAM_SIZE - 2] == 0x55 && data[MAX_UNIT_PARAM_SIZE - 1] == 0xAA)
{
memcpy(url, &data[2], 60);
}
}
else
{
inet_ntop(AF_INET, &addr, url, 16);
}
}
vLog(LOG_INFO, "Unit(%d) socket connect(%s:%d).\n", pItem->GetUnitID(), url, (int)target_port);
optval_t errormsg = -1;
int len;
len = sizeof(optval_t);
timeval tm;
fd_set set;
BOOLEAN ret = FALSE;
struct addrinfo hints;
struct addrinfo *ainfo, *rp;
int s;
sock = INVALID_SOCKET;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
s = getaddrinfo(url, NULL, &hints, &ainfo);
if (s)
{
vLog(LOG_ERROR, "Lookup error(%d,%s)\n", errno, strerror(errno));
return FALSE;
}
for (rp = ainfo; rp != NULL; rp = rp->ai_next)
{
sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sock == INVALID_SOCKET) continue;
if (rp->ai_family == AF_INET)
{
((struct sockaddr_in *)rp->ai_addr)->sin_port = htons(target_port);
}
else if(rp->ai_family == AF_INET6)
{
((struct sockaddr_in6 *)rp->ai_addr)->sin6_port = htons(target_port);
}
else
{
close(sock);
sock = INVALID_SOCKET;
continue;
}
net_socket_nonblock(sock, TRUE); //设置为非阻塞模式
if (connect(sock, rp->ai_addr, rp->ai_addrlen) == SOCKET_ERROR)
{
if (errno != EINPROGRESS && errno != EWOULDBLOCK)
{
net_socket_nonblock(sock, FALSE); //设置为阻塞模式
close(sock);
sock = INVALID_SOCKET;
continue;
}
else
{
tm.tv_sec = 5;
tm.tv_usec = 0;
FD_ZERO(&set);
FD_SET(sock, &set);
if (select(sock + 1, NULL, &set, NULL, &tm) > 0)
{
getsockopt(sock, SOL_SOCKET, SO_ERROR, &errormsg, (socklen_t *)&len);
if (errormsg == 0)
{
net_socket_nonblock(sock, FALSE); //设置为阻塞模式
pItem->Attach(GetUnitID(ord), sock, ((struct sockaddr_in *)(rp->ai_addr))->sin_addr.s_addr, target_port);
ret = TRUE;
break;
}
errno = errormsg;
}
net_socket_nonblock(sock, FALSE); //设置为阻塞模式
close(sock);
sock = INVALID_SOCKET;
continue;
}
}
else
{
ret = TRUE;
net_socket_nonblock(sock, FALSE); //设置为阻塞模式
pItem->Attach(GetUnitID(ord), sock, ((struct sockaddr_in *)(rp->ai_addr))->sin_addr.s_addr, target_port);
break;
}
}
freeaddrinfo(ainfo);
if (!ret)
{
vLog(LOG_ERROR, "Unit(%d) cann't connect(%s:%d) error(%d,%s).\n", pItem->GetUnitID(), url, (int)target_port, errno, strerror(errno));
}
else
{
vLog(LOG_INFO, "Unit(%d) connect(%s:%d) OK.\n", pItem->GetUnitID(), url, (int)target_port);
}
return ret;
}
#endif
BOOLEAN CNetProcess::Run(void)
{
int i;
int ord;
int uid;
socklen_t len;
int ret_val;
SOCKET max_fd;
SOCKET connfd;
fd_set rfds;
struct timeval timeout;
struct sockaddr_in addr;
CNetProcessItem *pItem;
int recv_len;
BYTE buffer[MAX_NET_BUFFER_SIZE];
if (!CProcess::Run()) return FALSE;
max_fd = -1;
FD_ZERO(&rfds);
timeout.tv_sec = 0;
timeout.tv_usec = 20000;
if ((int)m_sock > 0)
{
FD_SET(m_sock, &rfds);
max_fd = m_sock;
}
if (socket_type == SOCK_STREAM)
{ //加入单元链路sockets
for (i = 0; i < PROCESS_UNIT_NUM; i++)
{
pItem = GetItem(i);
if (pItem == NULL) continue;
connfd = (SOCKET)pItem->GetSock();
if ((int)connfd <= 0) continue;
FD_SET(connfd, &rfds);
if ((int)connfd > (int)max_fd) max_fd = connfd;
}
max_fd++;
if ((int)max_fd == 0) return TRUE;
ret_val = select(max_fd, &rfds, NULL, NULL, &timeout);
if (ret_val < 0)
{
vLog(LOG_ERROR, "socket(%d) TCP socket select error(%d,%s).\n", max_fd, errno, strerror(errno));
if ((int)m_sock > 0) close(m_sock);
m_sock = -1;
return FALSE;
}
if (ret_val > 0)
{
if (m_sock > 0 && FD_ISSET(m_sock, &rfds))
{
memset(&addr, 0, sizeof(addr));
len = sizeof(addr);
connfd = accept(m_sock, (struct sockaddr *)&addr, &len);
if ((int)connfd < 0)
{
vLog(LOG_ERROR, "TCP socket accept error(%d,%s).\n", errno, strerror(errno));
close(m_sock);
m_sock = -1;
return FALSE;
}
char sin_addr[20] = { '\0' };
inet_ntop(AF_INET, (void *)&addr.sin_addr, sin_addr, 16);
vLog(LOG_INFO, "TCP connect accepted from %s:%d\n", sin_addr, (int)ntohs(addr.sin_port));
if (ignored_source)
{
uid = GetUnitID(0);
ord = 0;
}
else
{
uid = GetUnitByAddr((BYTE *)&addr.sin_addr, 4);
ord = GetOrderByUnitID(uid);
}
pItem = GetItem(ord);
if (NULL == pItem)
{
vLog(LOG_ERROR, "Invalid peer(%s) connected.\n", sin_addr);
close(connfd);
return FALSE;
}
if (pItem->GetSock() > 0)
{ //关闭已经建立的连接
vLog(LOG_ERROR, "Close already opened socket.\n");
pItem->Release();
}
vLog(LOG_INFO, "Unit(%d, uid=%d) setup communication link.\n", (int)ord, (int)uid);
pItem->Attach(uid, connfd, addr.sin_addr.s_addr, ntohs(addr.sin_port));
}
else
{
for (i = 0; i < PROCESS_UNIT_NUM; i++)
{
pItem = GetItem(i);
if (NULL == pItem) continue;
connfd = pItem->GetSock();
if ((int)connfd <= 0) continue;
if (FD_ISSET(connfd, &rfds))
{ //读取数据
recv_len = recv(connfd, (char *)buffer, pItem->GetBufferSpace(), 0);
if (recv_len <= 0)
{ //对侧关闭链路或出错
vLog(LOG_ERROR, "Unit(%d) recv_len=%d(space=%d) socket error(%d,%s).\n", pItem->GetUnitID(), recv_len, pItem->GetBufferSpace(), errno, strerror(errno));
pItem->Release();
return FALSE;
}
pItem->AppendBuffer(buffer, recv_len);
while(recv_len > 0)
{
recv_len = OnPackageReceived(pItem->GetBuffer(), pItem->GetBufferLength(), i);
if (recv_len > 0)
{
pItem->PopBuffer(recv_len);
}
else if (recv_len < 0)
{ //处理得到了非法数据
pItem->Release();
}
}
}
}
}
}
}
else
{ //对于侦听的UDP方式可以直接当成数据包进行提交
max_fd++;
ret_val = select(max_fd, &rfds, NULL, NULL, &timeout);
if (ret_val < 0)
{
vLog(LOG_ERROR, "UDP Run read socket select error(%d,%s).\n", errno, strerror(errno));
close(m_sock);
m_sock = -1;
return FALSE;
}
if (ret_val > 0)
{
memset(&addr, 0, sizeof(addr));
len = sizeof(addr);
recv_len = recvfrom(m_sock, (char *)buffer, MAX_NET_BUFFER_SIZE, 0, (struct sockaddr *)&addr, &len);
if (recv_len <= 0)
{
vLog(LOG_ERROR, "UDP Run socket recvfrom error(%d,%s)!\n", errno, strerror(errno));
close(m_sock);
m_sock = -1;
return FALSE;
}
//保存最后报文地址,以便回馈信息
last_addr = addr.sin_addr.s_addr;
last_port = addr.sin_port;
//分发给各个单元
if (ignored_source)
{
OnPackageReceived(buffer, (int)recv_len, 0);
}
else
{
OnPackageReceived(buffer, (int)recv_len, GetOrderByUnitID(GetUnitByAddr((BYTE *)&last_addr, 4)));
}
}
}
return TRUE;
}
BOOLEAN CNetProcess::OnTimer(void)
{
if (!CProcess::OnTimer()) return FALSE;
return TRUE;
}
int CNetProcess::OnPackageReceived(BYTE* pBuf, int count, int ord)
{
return FALSE;
}
DWORD CNetProcess::WriteData(const BYTE* pData, int count, int ord)
{
int len;
int send_len = -1;
SOCKET sock;
fd_set wfds;
struct timeval timeout;
const BYTE* pBuf = pData;
struct sockaddr_in addr;
CNetProcessItem *pItem;
if (ord < 0 || ord >= PROCESS_UNIT_NUM) return 0L;
if (socket_type == SOCK_DGRAM)
{
if ((int)m_sock < 0)
{
vLog(LOG_ERROR, "Unit(%d) local socket bind error(%d).\n", GetUnitID(ord), m_sock);
return 0L;
}
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(target_port);
if (target_addr == INADDR_ANY)
{
if (ignored_source)
{
addr.sin_port = last_port;
addr.sin_addr.s_addr = last_addr;
}
else
{
if (!GetUnitAddr(GetUnitID(0), (BYTE *)&addr.sin_addr, 4))
{
vLog(LOG_ERROR, "UDP protocol get unit addr error.\n");
return 0L;
}
}
}
else
{
addr.sin_addr.s_addr = target_addr;
}
send_len = sendto(m_sock, (char *)pData, count, 0, (sockaddr*)&addr, sizeof(addr));
if (send_len < 0)
{
char sin_addr[20] = { '\0' };
inet_ntop(AF_INET, (void *)&addr.sin_addr, sin_addr, 16);
vLog(LOG_ERROR, "UDP socket sendto error(%d,%s), target addr is %s:%d!\n", errno, strerror(errno), sin_addr, (int)ntohs(addr.sin_port));
close(m_sock);
m_sock = -1;
return 0L;
}
}
else
{
pItem = GetItem(ord);
if (pItem == NULL) return 0L;
sock = pItem->GetSock();
if ((int)sock <= 0) return 0L;
for (send_len = 0; send_len < count;)
{
FD_ZERO(&wfds);
FD_SET(sock, &wfds);
timeout.tv_sec = 0;
timeout.tv_usec = 20000;
len = select(sock + 1, NULL, &wfds, NULL, &timeout);
if (len < 0)
{ //对侧无应答
vLog(LOG_ERROR, "TCP socket select error(%d,%s).\n", errno, strerror(errno));
pItem->Release();
return 0L;
}
if (len == 0)
{
vLog(LOG_ERROR, "TCP socket select timeout.\n");
return send_len;
}
len = send(sock, (char *)pBuf, count - send_len, 0);
if (len <= 0)
{
vLog(LOG_ERROR, "TCP socket send error(%d, %s)!\n", errno, strerror(errno));
pItem->Release();
return 0L;
}
send_len += len;
pBuf += len;
}
return send_len;
}
return send_len;
}
/////////////////////////////////////////////////////////////////////////////////////
//
// 关于Item的类实现
//
/////////////////////////////////////////////////////////////////////////////////////
CNetProcessItem::CNetProcessItem()
{
m_uid = -1;
m_sock = -1;
m_peer_addr = 0;
m_peer_port = 0;
memset(m_recv_data_buffer, 0, sizeof(m_recv_data_buffer));
m_recv_data_len = 0;
memset(m_send_data_buffer, 0, sizeof(m_send_data_buffer));
m_send_data_len = 0;
}
CNetProcessItem::~CNetProcessItem()
{
m_uid = -1;
m_sock = -1;
m_peer_addr = 0;
m_peer_port = 0;
memset(m_recv_data_buffer, 0, sizeof(m_recv_data_buffer));
m_recv_data_len = 0;
memset(m_send_data_buffer, 0, sizeof(m_send_data_buffer));
m_send_data_len = 0;
}
void CNetProcessItem::Attach(int uid, int sock, DWORD peer_addr, WORD peer_port)
{
m_uid = uid;
m_sock = sock;
m_peer_addr = 0;
m_peer_port = 0;
memset(m_recv_data_buffer, 0, sizeof(m_recv_data_buffer));
m_recv_data_len = 0;
memset(m_send_data_buffer, 0, sizeof(m_send_data_buffer));
m_send_data_len = 0;
}
void CNetProcessItem::Release(void)
{
if ((int)m_sock > 0)
{
close(m_sock);
}
m_sock = -1;
m_peer_addr = 0;
m_peer_port = 0;
memset(m_recv_data_buffer, 0, sizeof(m_recv_data_buffer));
m_recv_data_len = 0;
memset(m_send_data_buffer, 0, sizeof(m_send_data_buffer));
m_send_data_len = 0;
}
int CNetProcessItem::WriteToNetwork(const BYTE* pBuf, int count)
{
int ret_val;
int len;
int send_len;
fd_set wfds;
struct timeval tv;
if ((int)m_sock <= 0) return 0;
for (send_len = 0; send_len < count;)
{
tv.tv_sec = 0;
tv.tv_usec = 20000;
FD_ZERO(&wfds);
FD_SET(m_sock, &wfds);
ret_val = select(m_sock + 1, NULL, &wfds, NULL, &tv);
if (ret_val < 0)
{
vLog(LOG_ERROR, "SendToNetwork TCP socket select error(%d,%s).\n", errno, strerror(errno));
Release();
return 0L;
}
if (ret_val == 0)
{
return 0;
}
len = send(m_sock, (char *)pBuf, count - send_len, 0);
if (len <= 0)
{
vLog(LOG_ERROR, "SendToNetwork TCP socket send error(%d, %s)!\n", errno, strerror(errno));
Release();
return 0L;
}
send_len += len;
pBuf += len;
}
return send_len;
}
int CNetProcessItem::ReadFromNetwork(BYTE* pBuf, int count)
{
int ret_val;
SOCKET max_fd;
fd_set rfds;
struct timeval timeout;
int recv_len = 0;
max_fd = -1;
FD_ZERO(&rfds);
timeout.tv_sec = 0;
timeout.tv_usec = 20000;
if ((int)m_sock > 0)
{
FD_SET(m_sock, &rfds);
max_fd = m_sock;
}
max_fd++;
if ((int)max_fd == 0) return TRUE;
ret_val = select(max_fd, &rfds, NULL, NULL, &timeout);
if (ret_val < 0)
{
vLog(LOG_ERROR, "socket(%d) TCP socket select error(%d,%s).\n", max_fd, errno, strerror(errno));
Release();
return -1;
}
if (ret_val > 0)
{
if (FD_ISSET(m_sock, &rfds))
{ //读取数据
recv_len = recv(m_sock, (char *)pBuf, count, 0);
if (recv_len <= 0)
{ //对侧关闭链路或出错
vLog(LOG_ERROR, "Unit(%d) recv_len=%d socket error(%d,%s).\n", GetUnitID(), recv_len, errno, strerror(errno));
Release();
return -1;
}
}
}
return recv_len;
}
/////////////////////////////////////////////////////////////////////////////////////
//
// 关于Item的操作函数
//
/////////////////////////////////////////////////////////////////////////////////////
CNetProcessItem *CNetProcess::CreateItem(int ord)
{
return(new CNetProcessItem);
}
void CNetProcess::DestroyItem(int ord, BOOLEAN bDeleted)
{
if (ord < 0 || ord >= PROCESS_UNIT_NUM) return;
if (!bDeleted)
{
CNetProcessItem *pItem = (CNetProcessItem *)m_pItems[ord];
if (pItem)
{
pItem->Release();
delete pItem;
}
}
m_pItems[ord] = NULL;
}