#include "netproc.h" CNetProcess::CNetProcess() { m_sock = -1; last_addr = 0; last_port = 0; socket_type = SOCK_STREAM; bind_addr = INADDR_ANY; bind_port = 8000; ignored_source = FALSE; target_addr = INADDR_ANY; target_port = 0; } CNetProcess::~CNetProcess() { } BOOLEAN CNetProcess::OnPreCreate(int id) { int i; optval_t opt; struct sockaddr_in addr; CNetProcessItem *pItem; if (!CProcess::OnPreCreate(id)) return FALSE; if (GetOption(&network, sizeof(network))) { //获取新配置 socket_type = network.socket_type; bind_addr = network.bind_addr; bind_port = network.bind_port; ignored_source = network.ignored_source; target_addr = network.target_addr; target_port = network.target_port; } if ((int)m_sock > 0) { close(m_sock); m_sock = -1; } m_sock = socket(AF_INET, socket_type, 0); if ((int)m_sock < 0) { vLog(LOG_ERROR, "socket created error(%d,%s).\n", errno, strerror(errno)); return FALSE; } if (bind_port == 0) { if (socket_type == SOCK_STREAM) { close(m_sock); m_sock = -1; //建立所有必要的单元链路 for (i = 0; i < PROCESS_UNIT_NUM; i++) { if (GetUnitID(i) < 0) break; pItem = GetItem(i); if (pItem == NULL) break; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(target_port); GetUnitAddr(GetUnitID(i), (BYTE *)&addr.sin_addr, 4); pItem->Attach(GetUnitID(i), -1, addr.sin_addr.s_addr, target_port); } } } else if (bind_addr != INADDR_BROADCAST) { //绑定端口非0代表在本地进行绑定操作 opt = 1; setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = bind_addr; addr.sin_port = htons(bind_port); char sin_addr[20] = { '\0' }; inet_ntop(AF_INET, (void *)&addr.sin_addr, sin_addr, 16); vLog(LOG_INFO, "prepare bind to %s:%d.\n", sin_addr, bind_port); if (bind(m_sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) { vLog(LOG_ERROR, "socket bind local addr error(%d,%s).\n", errno, strerror(errno)); close(m_sock); m_sock = -1; return FALSE; } //暂时不考虑支持组播 //支持TCP方式的侦听模式 for (i = 0; i < PROCESS_UNIT_NUM; i++) { if (GetUnitID(i) < 0) break; pItem = GetItem(i); if (pItem == NULL) break; addr.sin_family = AF_INET; addr.sin_port = htons(target_port); pItem->Attach(GetUnitID(i), -1, -1, target_port); } if (socket_type == SOCK_STREAM) { if (listen(m_sock, MAX_CONN_COUNT) < 0) { vLog(LOG_ERROR, "TCP socket listen error(%d,%s).\n", errno, strerror(errno)); close(m_sock); m_sock = -1; return FALSE; } } } if (socket_type == SOCK_DGRAM && (target_addr == INADDR_BROADCAST || (target_addr & HTONL(0xff000000)) == HTONL(0xff000000))) { opt = 1; if (setsockopt(m_sock, SOL_SOCKET, SO_BROADCAST, &opt, sizeof(opt)) < 0) { vLog(LOG_ERROR, "UDP socket set broadcast error(%d,%s).\n", errno, strerror(errno)); close(m_sock); m_sock = -1; return FALSE; } } //UDP方式 if (socket_type == SOCK_DGRAM) { if (ignored_source) { if (GetUnitID(0) < 0) { vLog(LOG_ERROR, "No Unit(%d) Found(%d).\n", GetUnitID(0), strerror(errno)); return FALSE; } pItem = GetItem(0); if (pItem == NULL) return FALSE; pItem->Attach(GetUnitID(0), m_sock, addr.sin_addr.s_addr, target_port); } else { for (i = 0; i < PROCESS_UNIT_NUM; i++) { if (GetUnitID(i) < 0) break; pItem = GetItem(i); if (pItem == NULL) break; addr.sin_family = AF_INET; addr.sin_port = htons(target_port); GetUnitAddr(GetUnitID(i), (BYTE *)&addr.sin_addr, 4); pItem->Attach(GetUnitID(i), m_sock, addr.sin_addr.s_addr, target_port); } } } return TRUE; } void CNetProcess::Destroy(void) { CProcess::Destroy(); int i; //关闭连接 if ((int)m_sock > 0) { close(m_sock); } //析构基本单元 for (i = 0; i < PROCESS_UNIT_NUM; i++) { DestroyItem(i, FALSE); } } BOOLEAN CNetProcess::isClient(void) { return (network.bind_port == 0 ? TRUE : FALSE); } #if 1 BOOLEAN CNetProcess::net_socket_nonblock(SOCKET sock, BOOLEAN blocking) { DWORD ul = 1; if (blocking) ul = 1; else ul = 0; return ioctl(sock, FIONBIO, &ul); } BOOLEAN CNetProcess::Connect(CNetProcessItem* pItem, int ord, BOOLEAN bDefault) { int sock; char url[60]; if (NULL == pItem) return FALSE; //sock = socket(AF_INET, SOCK_STREAM, 0); memset(url, '\0', sizeof(url)); if (bDefault && (target_addr != INADDR_ANY)) { inet_ntop(AF_INET, &target_addr, url, 16); } else { DWORD addr; GetUnitAddr(GetUnitID(ord), (BYTE *)&addr, 4); if (addr == INADDR_ANY) { vLog(LOG_INFO, "Unit(%d) socket connect address cann't be any!\n here we can use dns.\n", pItem->GetUnitID()); BYTE data[MAX_UNIT_PARAM_SIZE]; GetUnitOption(pItem->GetUnitID(), data, sizeof(data)); //0xAA, 0x55.........0x55, 0xAA if (data[0] == 0xAA && data[1] == 0x55 && data[MAX_UNIT_PARAM_SIZE - 2] == 0x55 && data[MAX_UNIT_PARAM_SIZE - 1] == 0xAA) { memcpy(url, &data[2], 60); } } else { inet_ntop(AF_INET, &addr, url, 16); } } vLog(LOG_INFO, "Unit(%d) socket connect(%s:%d).\n", pItem->GetUnitID(), url, (int)target_port); optval_t errormsg = -1; int len; len = sizeof(optval_t); timeval tm; fd_set set; BOOLEAN ret = FALSE; struct addrinfo hints; struct addrinfo *ainfo, *rp; int s; sock = INVALID_SOCKET; memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; s = getaddrinfo(url, NULL, &hints, &ainfo); if (s) { vLog(LOG_ERROR, "Lookup error(%d,%s)\n", errno, strerror(errno)); return FALSE; } for (rp = ainfo; rp != NULL; rp = rp->ai_next) { sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); if (sock == INVALID_SOCKET) continue; if (rp->ai_family == AF_INET) { ((struct sockaddr_in *)rp->ai_addr)->sin_port = htons(target_port); } else if(rp->ai_family == AF_INET6) { ((struct sockaddr_in6 *)rp->ai_addr)->sin6_port = htons(target_port); } else { close(sock); sock = INVALID_SOCKET; continue; } net_socket_nonblock(sock, TRUE); //设置为非阻塞模式 if (connect(sock, rp->ai_addr, rp->ai_addrlen) == SOCKET_ERROR) { if (errno != EINPROGRESS && errno != EWOULDBLOCK) { net_socket_nonblock(sock, FALSE); //设置为阻塞模式 close(sock); sock = INVALID_SOCKET; continue; } else { tm.tv_sec = 5; tm.tv_usec = 0; FD_ZERO(&set); FD_SET(sock, &set); if (select(sock + 1, NULL, &set, NULL, &tm) > 0) { getsockopt(sock, SOL_SOCKET, SO_ERROR, &errormsg, (socklen_t *)&len); if (errormsg == 0) { net_socket_nonblock(sock, FALSE); //设置为阻塞模式 pItem->Attach(GetUnitID(ord), sock, ((struct sockaddr_in *)(rp->ai_addr))->sin_addr.s_addr, target_port); ret = TRUE; break; } errno = errormsg; } net_socket_nonblock(sock, FALSE); //设置为阻塞模式 close(sock); sock = INVALID_SOCKET; continue; } } else { ret = TRUE; net_socket_nonblock(sock, FALSE); //设置为阻塞模式 pItem->Attach(GetUnitID(ord), sock, ((struct sockaddr_in *)(rp->ai_addr))->sin_addr.s_addr, target_port); break; } } freeaddrinfo(ainfo); if (!ret) { vLog(LOG_ERROR, "Unit(%d) cann't connect(%s:%d) error(%d,%s).\n", pItem->GetUnitID(), url, (int)target_port, errno, strerror(errno)); } else { vLog(LOG_INFO, "Unit(%d) connect(%s:%d) OK.\n", pItem->GetUnitID(), url, (int)target_port); } return ret; } #endif BOOLEAN CNetProcess::Run(void) { int i; int ord; int uid; socklen_t len; int ret_val; SOCKET max_fd; SOCKET connfd; fd_set rfds; struct timeval timeout; struct sockaddr_in addr; CNetProcessItem *pItem; int recv_len; BYTE buffer[MAX_NET_BUFFER_SIZE]; if (!CProcess::Run()) return FALSE; max_fd = -1; FD_ZERO(&rfds); timeout.tv_sec = 0; timeout.tv_usec = 20000; if ((int)m_sock > 0) { FD_SET(m_sock, &rfds); max_fd = m_sock; } if (socket_type == SOCK_STREAM) { //加入单元链路sockets for (i = 0; i < PROCESS_UNIT_NUM; i++) { pItem = GetItem(i); if (pItem == NULL) continue; connfd = (SOCKET)pItem->GetSock(); if ((int)connfd <= 0) continue; FD_SET(connfd, &rfds); if ((int)connfd > (int)max_fd) max_fd = connfd; } max_fd++; if ((int)max_fd == 0) return TRUE; ret_val = select(max_fd, &rfds, NULL, NULL, &timeout); if (ret_val < 0) { vLog(LOG_ERROR, "socket(%d) TCP socket select error(%d,%s).\n", max_fd, errno, strerror(errno)); if ((int)m_sock > 0) close(m_sock); m_sock = -1; return FALSE; } if (ret_val > 0) { if (m_sock > 0 && FD_ISSET(m_sock, &rfds)) { memset(&addr, 0, sizeof(addr)); len = sizeof(addr); connfd = accept(m_sock, (struct sockaddr *)&addr, &len); if ((int)connfd < 0) { vLog(LOG_ERROR, "TCP socket accept error(%d,%s).\n", errno, strerror(errno)); close(m_sock); m_sock = -1; return FALSE; } char sin_addr[20] = { '\0' }; inet_ntop(AF_INET, (void *)&addr.sin_addr, sin_addr, 16); vLog(LOG_INFO, "TCP connect accepted from %s:%d\n", sin_addr, (int)ntohs(addr.sin_port)); if (ignored_source) { uid = GetUnitID(0); ord = 0; } else { uid = GetUnitByAddr((BYTE *)&addr.sin_addr, 4); ord = GetOrderByUnitID(uid); } pItem = GetItem(ord); if (NULL == pItem) { vLog(LOG_ERROR, "Invalid peer(%s) connected.\n", sin_addr); close(connfd); return FALSE; } if (pItem->GetSock() > 0) { //关闭已经建立的连接 vLog(LOG_ERROR, "Close already opened socket.\n"); pItem->Release(); } vLog(LOG_INFO, "Unit(%d, uid=%d) setup communication link.\n", (int)ord, (int)uid); pItem->Attach(uid, connfd, addr.sin_addr.s_addr, ntohs(addr.sin_port)); } else { for (i = 0; i < PROCESS_UNIT_NUM; i++) { pItem = GetItem(i); if (NULL == pItem) continue; connfd = pItem->GetSock(); if ((int)connfd <= 0) continue; if (FD_ISSET(connfd, &rfds)) { //读取数据 recv_len = recv(connfd, (char *)buffer, pItem->GetBufferSpace(), 0); if (recv_len <= 0) { //对侧关闭链路或出错 vLog(LOG_ERROR, "Unit(%d) recv_len=%d(space=%d) socket error(%d,%s).\n", pItem->GetUnitID(), recv_len, pItem->GetBufferSpace(), errno, strerror(errno)); pItem->Release(); return FALSE; } pItem->AppendBuffer(buffer, recv_len); while(recv_len > 0) { recv_len = OnPackageReceived(pItem->GetBuffer(), pItem->GetBufferLength(), i); if (recv_len > 0) { pItem->PopBuffer(recv_len); } else if (recv_len < 0) { //处理得到了非法数据 pItem->Release(); } } } } } } } else { //对于侦听的UDP方式可以直接当成数据包进行提交 max_fd++; ret_val = select(max_fd, &rfds, NULL, NULL, &timeout); if (ret_val < 0) { vLog(LOG_ERROR, "UDP Run read socket select error(%d,%s).\n", errno, strerror(errno)); close(m_sock); m_sock = -1; return FALSE; } if (ret_val > 0) { memset(&addr, 0, sizeof(addr)); len = sizeof(addr); recv_len = recvfrom(m_sock, (char *)buffer, MAX_NET_BUFFER_SIZE, 0, (struct sockaddr *)&addr, &len); if (recv_len <= 0) { vLog(LOG_ERROR, "UDP Run socket recvfrom error(%d,%s)!\n", errno, strerror(errno)); close(m_sock); m_sock = -1; return FALSE; } //保存最后报文地址,以便回馈信息 last_addr = addr.sin_addr.s_addr; last_port = addr.sin_port; //分发给各个单元 if (ignored_source) { OnPackageReceived(buffer, (int)recv_len, 0); } else { OnPackageReceived(buffer, (int)recv_len, GetOrderByUnitID(GetUnitByAddr((BYTE *)&last_addr, 4))); } } } return TRUE; } BOOLEAN CNetProcess::OnTimer(void) { if (!CProcess::OnTimer()) return FALSE; return TRUE; } int CNetProcess::OnPackageReceived(BYTE* pBuf, int count, int ord) { return FALSE; } DWORD CNetProcess::WriteData(const BYTE* pData, int count, int ord) { int len; int send_len = -1; SOCKET sock; fd_set wfds; struct timeval timeout; const BYTE* pBuf = pData; struct sockaddr_in addr; CNetProcessItem *pItem; if (ord < 0 || ord >= PROCESS_UNIT_NUM) return 0L; if (socket_type == SOCK_DGRAM) { if ((int)m_sock < 0) { vLog(LOG_ERROR, "Unit(%d) local socket bind error(%d).\n", GetUnitID(ord), m_sock); return 0L; } memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(target_port); if (target_addr == INADDR_ANY) { if (ignored_source) { addr.sin_port = last_port; addr.sin_addr.s_addr = last_addr; } else { if (!GetUnitAddr(GetUnitID(0), (BYTE *)&addr.sin_addr, 4)) { vLog(LOG_ERROR, "UDP protocol get unit addr error.\n"); return 0L; } } } else { addr.sin_addr.s_addr = target_addr; } send_len = sendto(m_sock, (char *)pData, count, 0, (sockaddr*)&addr, sizeof(addr)); if (send_len < 0) { char sin_addr[20] = { '\0' }; inet_ntop(AF_INET, (void *)&addr.sin_addr, sin_addr, 16); vLog(LOG_ERROR, "UDP socket sendto error(%d,%s), target addr is %s:%d!\n", errno, strerror(errno), sin_addr, (int)ntohs(addr.sin_port)); close(m_sock); m_sock = -1; return 0L; } } else { pItem = GetItem(ord); if (pItem == NULL) return 0L; sock = pItem->GetSock(); if ((int)sock <= 0) return 0L; for (send_len = 0; send_len < count;) { FD_ZERO(&wfds); FD_SET(sock, &wfds); timeout.tv_sec = 0; timeout.tv_usec = 20000; len = select(sock + 1, NULL, &wfds, NULL, &timeout); if (len < 0) { //对侧无应答 vLog(LOG_ERROR, "TCP socket select error(%d,%s).\n", errno, strerror(errno)); pItem->Release(); return 0L; } if (len == 0) { vLog(LOG_ERROR, "TCP socket select timeout.\n"); return send_len; } len = send(sock, (char *)pBuf, count - send_len, 0); if (len <= 0) { vLog(LOG_ERROR, "TCP socket send error(%d, %s)!\n", errno, strerror(errno)); pItem->Release(); return 0L; } send_len += len; pBuf += len; } return send_len; } return send_len; } ///////////////////////////////////////////////////////////////////////////////////// // // 关于Item的类实现 // ///////////////////////////////////////////////////////////////////////////////////// CNetProcessItem::CNetProcessItem() { m_uid = -1; m_sock = -1; m_peer_addr = 0; m_peer_port = 0; memset(m_recv_data_buffer, 0, sizeof(m_recv_data_buffer)); m_recv_data_len = 0; memset(m_send_data_buffer, 0, sizeof(m_send_data_buffer)); m_send_data_len = 0; } CNetProcessItem::~CNetProcessItem() { m_uid = -1; m_sock = -1; m_peer_addr = 0; m_peer_port = 0; memset(m_recv_data_buffer, 0, sizeof(m_recv_data_buffer)); m_recv_data_len = 0; memset(m_send_data_buffer, 0, sizeof(m_send_data_buffer)); m_send_data_len = 0; } void CNetProcessItem::Attach(int uid, int sock, DWORD peer_addr, WORD peer_port) { m_uid = uid; m_sock = sock; m_peer_addr = 0; m_peer_port = 0; memset(m_recv_data_buffer, 0, sizeof(m_recv_data_buffer)); m_recv_data_len = 0; memset(m_send_data_buffer, 0, sizeof(m_send_data_buffer)); m_send_data_len = 0; } void CNetProcessItem::Release(void) { if ((int)m_sock > 0) { close(m_sock); } m_sock = -1; m_peer_addr = 0; m_peer_port = 0; memset(m_recv_data_buffer, 0, sizeof(m_recv_data_buffer)); m_recv_data_len = 0; memset(m_send_data_buffer, 0, sizeof(m_send_data_buffer)); m_send_data_len = 0; } int CNetProcessItem::WriteToNetwork(const BYTE* pBuf, int count) { int ret_val; int len; int send_len; fd_set wfds; struct timeval tv; if ((int)m_sock <= 0) return 0; for (send_len = 0; send_len < count;) { tv.tv_sec = 0; tv.tv_usec = 20000; FD_ZERO(&wfds); FD_SET(m_sock, &wfds); ret_val = select(m_sock + 1, NULL, &wfds, NULL, &tv); if (ret_val < 0) { vLog(LOG_ERROR, "SendToNetwork TCP socket select error(%d,%s).\n", errno, strerror(errno)); Release(); return 0L; } if (ret_val == 0) { return 0; } len = send(m_sock, (char *)pBuf, count - send_len, 0); if (len <= 0) { vLog(LOG_ERROR, "SendToNetwork TCP socket send error(%d, %s)!\n", errno, strerror(errno)); Release(); return 0L; } send_len += len; pBuf += len; } return send_len; } int CNetProcessItem::ReadFromNetwork(BYTE* pBuf, int count) { int ret_val; SOCKET max_fd; fd_set rfds; struct timeval timeout; int recv_len = 0; max_fd = -1; FD_ZERO(&rfds); timeout.tv_sec = 0; timeout.tv_usec = 20000; if ((int)m_sock > 0) { FD_SET(m_sock, &rfds); max_fd = m_sock; } max_fd++; if ((int)max_fd == 0) return TRUE; ret_val = select(max_fd, &rfds, NULL, NULL, &timeout); if (ret_val < 0) { vLog(LOG_ERROR, "socket(%d) TCP socket select error(%d,%s).\n", max_fd, errno, strerror(errno)); Release(); return -1; } if (ret_val > 0) { if (FD_ISSET(m_sock, &rfds)) { //读取数据 recv_len = recv(m_sock, (char *)pBuf, count, 0); if (recv_len <= 0) { //对侧关闭链路或出错 vLog(LOG_ERROR, "Unit(%d) recv_len=%d socket error(%d,%s).\n", GetUnitID(), recv_len, errno, strerror(errno)); Release(); return -1; } } } return recv_len; } ///////////////////////////////////////////////////////////////////////////////////// // // 关于Item的操作函数 // ///////////////////////////////////////////////////////////////////////////////////// CNetProcessItem *CNetProcess::CreateItem(int ord) { return(new CNetProcessItem); } void CNetProcess::DestroyItem(int ord, BOOLEAN bDeleted) { if (ord < 0 || ord >= PROCESS_UNIT_NUM) return; if (!bDeleted) { CNetProcessItem *pItem = (CNetProcessItem *)m_pItems[ord]; if (pItem) { pItem->Release(); delete pItem; } } m_pItems[ord] = NULL; }