完整 epoll 示例
最编程
2024-04-22 17:25:51
...
#include <deque>
#include <map>
#include <vector>
#include <pthread.h>
#include <semaphore.h>
#include <time.h>
#include <sys/time.h>
#include <sys/shm.h>
#include <errno.h>
#include <sys/types.h>
#include <fcntl.h>
#include <stdio.h>
#include <string>
#include <cstdio>
#include <unistd.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <cstdlib>
#include <cctype>
#include <sstream>
#include <utility>
#include <stdexcept>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <iostream>
#include <signal.h>
using namespace std;
#pragma pack(1)
//管道消息结构
struct pipemsg {
int op;
int fd;
unsigned int ip;
unsigned short port;
};
//地址端口结构
struct ipport {
unsigned int ip;
unsigned short port;
bool operator < (const ipport rhs) const {return (ip < rhs.ip || (ip == rhs.ip && port < rhs.port));}
bool operator == (const ipport rhs) const {return (ip == rhs.ip && port == rhs.port);}
};
//对应于对方地址端口的连接信息
struct peerinfo {
int fd; //对应连接句柄
unsigned int contime; //最后连接时间
unsigned int rcvtime; //收到数据时间
unsigned int rcvbyte; //收到字节个数
unsigned int sndtime; //发送数据时间
unsigned int sndbyte; //发送字节个数
};
//连接结构
struct conninfo {
int rfd; //管道读端
int wfd; //管道写端
map<struct ipport, struct peerinfo> peer; //对方信息
};
#pragma pack()
//全局运行标志
bool g_bRun;
//全局连接信息
struct conninfo g_ConnInfo;
void setnonblocking(int sock)
{
int opts;
opts = fcntl(sock,F_GETFL);
if (opts < 0)
{
perror("fcntl(sock,GETFL)");
exit(1);
}
opts = opts|O_NONBLOCK;
if (fcntl(sock, F_SETFL, opts) < 0)
{
perror("fcntl(sock,SETFL,opts)");
exit(1);
}
}
void setreuseaddr(int sock)
{
int opt;
opt = 1;
if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(&opt)) < 0)
{
perror("setsockopt");
exit(1);
}
}
static void sig_pro(int signum)
{
cout << "sig_pro, recv signal:" << signum << endl;
if (signum == SIGQUIT)
{
g_bRun = false;
}
}
//接收连接线程
void * AcceptThread(void *arg)
{
cout << "AcceptThread, enter" << endl;
int ret; //临时变量,存放返回值
int epfd; //监听用的epoll
int listenfd; //监听socket
int connfd; //接收到的连接socket临时变量
int i; //临时变量,轮询数组用
int nfds; //临时变量,有多少个socket有事件
struct epoll_event ev; //事件临时变量
const int MAXEVENTS = 1024; //最大事件数
struct epoll_event events[MAXEVENTS]; //监听事件数组
socklen_t clilen; //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件
struct sockaddr_in cliaddr;
struct sockaddr_in svraddr;
unsigned short uListenPort = 5000;
int iBacklogSize = 5;
int iBackStoreSize = 1024;
struct pipemsg msg; //消息队列数据
//创建epoll,对2.6.8以后的版本,其参数无效,只要大于0的数值就行,内核自己动态分配
epfd = epoll_create(iBackStoreSize);
if (epfd < 0)
{
cout << "AcceptThread, epoll_create fail:" << epfd << ",errno:" << errno << endl;
return NULL;
}
//创建监听socket
listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd < 0)
{
cout << "AcceptThread, socket fail:" << epfd << ",errno:" << errno << endl;
close(epfd);
return NULL;
}
//把监听socket设置为非阻塞方式
setnonblocking(listenfd);
//设置监听socket为端口重用
setreuseaddr(listenfd);
//设置与要处理的事件相关的文件描述符
ev.data.fd = listenfd;
//设置要处理的事件类型
ev.events = EPOLLIN|EPOLLET;
//注册epoll事件
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
if (ret != 0)
{
cout << "AcceptThread, epoll_ctl fail:" << ret << ",errno:" << errno << endl;
close(listenfd);
close(epfd);
return NULL;
}
bzero(&svraddr, sizeof(svraddr));
svraddr.sin_family = AF_INET;
svraddr.sin_addr.s_addr = htonl(INADDR_ANY);
svraddr.sin_port=htons(uListenPort);
bind(listenfd,(sockaddr *)&svraddr, sizeof(svraddr));
//监听,准备接收连接
ret = listen(listenfd, iBacklogSize);
if (ret != 0)
{
cout << "AcceptThread, listen fail:" << ret << ",errno:" << errno << endl;
close(listenfd);
close(epfd);
return NULL;
}
while (g_bRun)
{
//等待epoll事件的发生,如果当前有信号的句柄数大于输出事件数组的最大大小,超过部分会在下次epoll_wait时输出,事件不会丢
nfds = epoll_wait(epfd, events, MAXEVENTS, 500);
//处理所发生的所有事件
for (i = 0; i < nfds && g_bRun; ++i)
{
if (events[i].data.fd == listenfd) //是本监听socket上的事件
{
cout << "AcceptThread, events:" << events[i].events << ",errno:" << errno << endl;
if (events[i].events&EPOLLIN) //有连接到来
{
do
{
clilen = sizeof(struct sockaddr);
connfd = accept(listenfd,(sockaddr *)&cliaddr, &clilen);
if (connfd > 0)
{
cout << "AcceptThread, accept:" << connfd << ",errno:" << errno << ",connect:" << inet_ntoa(cliaddr.sin_addr) << ":" << ntohs(cliaddr.sin_port) << endl;
//往管道写数据
msg.op = 1;
msg.fd = connfd;
msg.ip = cliaddr.sin_addr.s_addr;
msg.port = cliaddr.sin_port;
ret = write(g_ConnInfo.wfd, &msg, 14);
if (ret != 14)
{
cout << "AcceptThread, write fail:" << ret << ",errno:" << errno << endl;
close(connfd);
}
}
else
{
cout << "AcceptThread, accept:" << connfd << ",errno:" << errno << endl;
if (errno == EAGAIN) //没有连接需要接收了
{
break;
}
else if (errno == EINTR) //可能被中断信号打断,,经过验证对非阻塞socket并未收到此错误,应该可以省掉该步判断
{
;
}
else //其它情况可以认为该描述字出现错误,应该关闭后重新监听
{
//此时说明该描述字已经出错了,需要重新创建和监听
close(listenfd);
epoll_ctl(epfd, EPOLL_CTL_DEL, listenfd, &ev);
//创建监听socket
listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd < 0)
{
cout << "AcceptThread, socket fail:" << epfd << ",errno:" << errno << endl;
close(epfd);
return NULL;
}
//把监听socket设置为非阻塞方式
setnonblocking(listenfd);
//设置监听socket为端口重用
setreuseaddr(listenfd);
//设置与要处理的事件相关的文件描述符
ev.data.fd = listenfd;
//设置要处理的事件类型
ev.events = EPOLLIN|EPOLLET;
//注册epoll事件
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
if (ret != 0)
{
cout << "AcceptThread, epoll_ctl fail:" << ret << ",errno:" << errno << endl;
close(listenfd);
close(epfd);
return NULL;
}
bzero(&svraddr, sizeof(svraddr));
svraddr.sin_family = AF_INET;
svraddr.sin_addr.s_addr = htonl(INADDR_ANY);
svraddr.sin_port=htons(uListenPort);
bind(listenfd,(sockaddr *)&svraddr, sizeof(svraddr));
//监听,准备接收连接
ret = listen(listenfd, iBacklogSize);
if (ret != 0)
{
cout << "AcceptThread, listen fail:" << ret << ",errno:" << errno << endl;
close(listenfd);
close(epfd);
return NULL;
}
}
}
} while (g_bRun);
}
else if (events[i].events&EPOLLERR || events[i].events&EPOLLHUP) //有异常发生
{
//此时说明该描述字已经出错了,需要重新创建和监听
close(listenfd);
epoll_ctl(epfd, EPOLL_CTL_DEL, listenfd, &ev);
//创建监听socket
listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd < 0)
{
cout << "AcceptThread, socket fail:" << epfd << ",errno:" << errno << endl;
close(epfd);
return NULL;
}
//把监听socket设置为非阻塞方式
setnonblocking(listenfd);
//设置监听socket为端口重用
setreuseaddr(listenfd);
//设置与要处理的事件相关的文件描述符
ev.data.fd = listenfd;
//设置要处理的事件类型
ev.events = EPOLLIN|EPOLLET;
//注册epoll事件
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
if (ret != 0)
{
cout << "AcceptThread, epoll_ctl fail:" << ret << ",errno:" << errno << endl;
close(listenfd);
close(epfd);
return NULL;
}
bzero(&svraddr, sizeof(svraddr));
svraddr.sin_family = AF_INET;
svraddr.sin_addr.s_addr = htonl(INADDR_ANY);
svraddr.sin_port=htons(uListenPort);
bind(listenfd,(sockaddr *)&svraddr, sizeof(svraddr));
//监听,准备接收连接
ret = listen(listenfd, iBacklogSize);
if (ret != 0)
{
cout << "AcceptThread, listen fail:" << ret << ",errno:" << errno << endl;
close(listenfd);
close(epfd);
return NULL;
}
}
}
}
}
//关闭监听描述字
if (listenfd > 0)
{
close(listenfd);
}
//关闭创建的epoll
if (epfd > 0)
{
close(epfd);
}
cout << "AcceptThread, exit" << endl;
return NULL;
}
//读数据线程
void * ReadThread(void *arg)
{
cout << "ReadThread, enter" << endl;
int ret; //临时变量,存放返回值
int epfd; //连接用的epoll
int i; //临时变量,轮询数组用
int nfds; //临时变量,有多少个socket有事件
struct epoll_event ev; //事件临时变量
const int MAXEVENTS = 1024; //最大事件数
struct epoll_event events[MAXEVENTS]; //监听事件数组
int iBackStoreSize = 1024;
const int MAXBUFSIZE = 8192; //读数据缓冲区大小
char buf[MAXBUFSIZE];
int nread; //读到的字节数
struct ipport tIpPort; //地址端口信息
struct peerinfo tPeerInfo; //对方连接信息
map<int, struct ipport> mIpPort; //socket对应的对方地址端口信息
map<int, struct ipport>::iterator itIpPort; //临时迭代子
map<struct ipport, struct peerinfo>::iterator itPeerInfo; //临时迭代子
struct pipemsg msg; //消息队列数据
//创建epoll,对2.6.8以后的版本,其参数无效,只要大于0的数值就行,内核自己动态分配
epfd = epoll_create(iBackStoreSize);
if (epfd < 0)
{
cout << "ReadThread, epoll_create fail:" << epfd << ",errno:" << errno << endl;
return NULL;
}
while (g_bRun)
{
//从管道读数据
do
{
ret = read(g_ConnInfo.rfd, &msg, 14);
if (ret > 0)
{
//队列中的fd必须是有效的
if (ret == 14 && msg.fd > 0)
{
if (msg.op == 1) //收到新的连接
{
cout << "ReadThread, recv connect:" << msg.fd << ",errno:" << errno << endl;
//把socket设置为非阻塞方式
setnonblocking(msg.fd);
//设置描述符信息和数组下标信息
ev.data.fd = msg.fd;
//设置用于注测的读操作事件
ev.events = EPOLLIN|EPOLLET;
//注册ev
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, msg.fd, &ev);
if (ret != 0)
{
cout << "ReadThread, epoll_ctl fail:" << ret << ",errno:" << errno << endl;
close(msg.fd);
}
else
{
mIpPort[msg.fd] = tIpPort;
tPeerInfo.fd = msg.fd;
tPeerInfo.contime = time(NULL);
tPeerInfo.rcvtime = 0;
tPeerInfo.rcvbyte = 0;
tPeerInfo.sndtime = 0;
tPeerInfo.sndbyte = 0;
g_ConnInfo.peer[tIpPort] = tPeerInfo;
}
}
else if (msg.op == 2) //断开某个连接
{
cout << "ReadThread, recv close:" << msg.fd << ",errno:" << errno << endl;
close(msg.fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, msg.fd, &ev);
itIpPort = mIpPort.find(msg.fd);
if (itIpPort != mIpPort.end())
{
mIpPort.erase(itIpPort);
itPeerInfo = g_ConnInfo.peer.find(itIpPort->second);
if (itPeerInfo != g_ConnInfo.peer.end())
{
g_ConnInfo.peer.erase(itPeerInfo);
}
}
}
}
}
else
{
break;
}
} while(g_bRun);
//等待epoll事件的发生,如果当前有信号的句柄数大于输出事件数组的最大大小,超过部分会在下次epoll_wait时输出,事件不会丢
nfds = epoll_wait(epfd, events, MAXEVENTS, 500);
//处理所发生的所有事件
for (i = 0; i < nfds && g_bRun; ++i)
{
cout << "ReadThread, events:" << events[i].events << ",errno:" << errno << endl;
if (events[i].events&EPOLLIN) //有数据可读
{
do
{
bzero(buf, MAXBUFSIZE);
nread = read(events[i].data.fd, buf, MAXBUFSIZE);
if (nread > 0) //读到数据
{
cout << "ReadThread, read:" << nread << ",errno:" << errno << endl;
itIpPort = mIpPort.find(events[i].data.fd);
if (itIpPort != mIpPort.end())
{
itPeerInfo = g_ConnInfo.peer.find(itIpPort->second);
if (itPeerInfo != g_ConnInfo.peer.end())
{
itPeerInfo->second.rcvtime = time(NULL);
itPeerInfo->second.rcvbyte += nread;
}
}
}
else if (nread < 0) //读取失败
{
if (errno == EAGAIN) //没有数据了
{
cout << "ReadThread, read:" << nread << ",errno:" << errno << ",no data" << endl;
break;
}
else if(errno == EINTR) //可能被内部中断信号打断,经过验证对非阻塞socket并未收到此错误,应该可以省掉该步判断
{
cout << "ReadThread, read:" << nread << ",errno:" << errno << ",interrupt" << endl;
}
else //客户端主动关闭
{
cout << "ReadThread, read:" << nread << ",errno:" << errno << ",peer error" << endl;
close(events[i].data.fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &ev);
itIpPort = mIpPort.find(events[i].data.fd);
if (itIpPort != mIpPort.end())
{
mIpPort.erase(itIpPort);
itPeerInfo = g_ConnInfo.peer.find(itIpPort->second);
if (itPeerInfo != g_ConnInfo.peer.end())
{
g_ConnInfo.peer.erase(itPeerInfo);
}
}
break;
}
}
else if (nread == 0) //客户端主动关闭
{
cout << "ReadThread, read:" << nread << ",errno:" << errno << ",peer close" << endl;
close(events[i].data.fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &ev);
itIpPort = mIpPort.find(events[i].data.fd);
if (itIpPort != mIpPort.end())
{
mIpPort.erase(itIpPort);
itPeerInfo = g_ConnInfo.peer.find(itIpPort->second);
if (itPeerInfo != g_ConnInfo.peer.end())
{
g_ConnInfo.peer.erase(itPeerInfo);
}
}
break;
}
} while (g_bRun);
}
else if (events[i].events&EPOLLERR || events[i].events&EPOLLHUP) //有异常发生
{
cout << "ReadThread, read:" << nread << ",errno:" << errno << ",err or hup" << endl;
close(events[i].data.fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &ev);
itIpPort = mIpPort.find(events[i].data.fd);
if (itIpPort != mIpPort.end())
{
mIpPort.erase(itIpPort);
itPeerInfo = g_ConnInfo.peer.find(itIpPort->second);
if (itPeerInfo != g_ConnInfo.peer.end())
{
g_ConnInfo.peer.erase(itPeerInfo);
}
}
}
}
}
//关闭所有连接
for
推荐阅读
-
使用 ADO 添加数据的 MFC 列表控件示例
-
pandas ExcelWriter 的用法和代码示例
-
关于治愈系动画电影《猫的报恩》的完整评论
-
儿童编程:Scratch 示例 2 - 猫和老鼠
-
2024 Python 中最完整的 python,让童年成为回忆【猫和老鼠的游戏】(附源代码+详细解析
-
观察者模式示例和分析 - 示例 1:猫、狗和老鼠
-
RegexUtil 的完整示例,RegexUtil 是一款适用于 Android 开发的优秀正则表达式工具类。
-
PHP + Ajax 简单获取验证操作示例
-
最完整的 JavaScript 新功能指南:ES2024 ~ ES2016
-
mSC-FDE 单载波频域均衡通信链路的完整 matlab 仿真,包括 UW 序列、QPSK、定时同步、载波同步、MMSE 估计等。