菜鸟笔记
提升您的技术认知

《unix网络编程》epoll函数

阅读 : 491

几种模型的比较

关于PPC、TPC的问题、I/O复用的由来、epoll和select的详细对比参考文章:IO复用与并发编程

1 PPC、TPC模型

      传统的网络服务器是用一个单独的线程或进程处理每一个连接。对于高性能的应用,这需要在某一个时刻同时处理大量的客户请求,这种模式效率不高,因为(Process Per Connection,PPC), TPC(Thread Per Connection)模型一次处理许多客户连接,那么随着连接客户的增多,那么资源使用、进程/线程环境切换等的时空花销就会很大。

2 select 模型

参看文章《unix网络编程》(13)select、shutdown函数 《unix网络编程》(15)poll函数以及使用poll的客户服务器程序
1) 最大并发数限制,因为一个进程所打开的 FD (文件描述符)是有限制的,由 FD_SETSIZE 设置,默认值是 1024,因此 Select 模型的最大并发数就被相应限制了。如果要改变FD_SIZE的大小需要重新编译内核。

int select(int maxfdp1, fd_set *readfds, fd_set *writefds,  
           fd_set *exceptfds, struct timeval *timeout);  

2)
效率问题, select 每次调用都会
线性扫描全部的 FD 集合,花费时间为O(n),这样效率就会呈现线性下降,即使将 FD_SETSIZE 改大其性能也会很差。

3)
内核/用户空间内存拷贝问题
select 采取了内存拷贝方法让内核把 FD 消息通知给用户空间。

4)事件集,select的参数类型fd_set没有将文件描述符和事件绑定,它仅仅是一个文件描述符的集合,因此select需要提供3个“值-结果”类型的参数分别传入和输出可读、可写和异常等事件(调用该函数时,指定所关心的描述符的值,函数返回时,结果将指示哪些描述符已经就绪),也就是select函数会修改指针readset、writeset、exceptset所指向的描述符集。

         一方面,使得select不能处理更多类型的事件,所能处理的事件类型只有读写异常三类;

        另一方面,描述符集内任何与未就绪描述符对应的位返回时都会被清空,因此每次重新调用select时,都需要再次把所有的描述符集内所关心的位置为1。

5)select函数的定时是有函数的最后一个参数决定的,它是一个timeval结构体,用于指定这段时间的秒数和微秒数。

//timeval结构:
struct timeval
{
    long    tv_sec;     /* seconds */
    long    tv_usec;    /* microseconds */
};

3 poll模型

1)最大并发数限制,poll的第二个参数nfds是第一个参数指示的结构数据的元素个数,这个nfds并没有select的限制,它只受限于系统的内存空间(可以达到系统所允许打开的最大描述符的个数,即65535)。

int poll(struct pollfd *fds, nfds_t nfds, int timeout);  

2)
效率问题,效率和select类似。

3)内核/用户空间内存拷贝问题,和select类似。

4)事件集,poll比select要“聪明”,它将描述符和事件定义在一起,任何事件都被统一处理,编程接口简洁许多。

        一方面,poll可以监听的事件类型就可以更细分为很多种(参考文章:poll监听的事件类型)。

        另一方面,而且内核每次修改的是pollfd结构体的revents成员,而events成员不变,因此下次重新调用poll无需重置pollfd类型中的事件集参数(避免了类似于select使用的的“值-结果”参数)。

5)poll的定时也是由函数的最后一个参数给出,但是它是一个int类型(指定函数要等待的毫秒数),而不是timeval结构体。

此外,从当今的可移植性角度考虑,支持select的系统比支持poll的系统要多

epoll的突破

优点

         epoll是什么?按照man手册的说法:是为处理大批量句柄而作了改进的poll。当然,这不是2.6内核才有的,它是在2.5.44内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44),它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

 (1)epoll 没有最大并发连接的限制,上限是最大可以打开文件的数目,这个数字一般远大于 2048, 一般来说这个数目和系统内存关系很大 ,具体值可以 cat /proc/sys/fs/file-max[599534] 察看。

(2)效率提升, Epoll最大的优点就在于它基于事件的就绪通知方式只管“活跃”的连接 ,而跟连接总数无关,其算法复杂度为O(1),因此在实际的网络环境中,epoll的效率就会远远高于 select 和 poll 。select和poll都是轮询方式的,每次调用要扫描整个注册文件描述符集合,并将其中就绪描述符返回给用户程序。epoll_wait采用的是回调方式,内核检测到就绪描述符时,将触发回调函数,回调函数就将该文件描述符上对应的事件插入内核就绪队列,不需要轮询。

(3)内存拷贝, Epoll 在这点上使用了“共享内存“,因此没有内存拷贝的开销。

        epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。
         另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

5)和poll类似,epoll的定时也是int类型,单位是毫秒。

不足

        epoll的局限性在于它在Linux2.6才实现,而其他平台都没有,这与Apache这样的优秀跨平台服务器无法并论。select跨平台性能很好,几乎每个平台都支持。

epoll函数及参数

int epoll_create(int size);
int epoll_create1(int  flags);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); 
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

第一个函数

         对于epoll_create1 的flag参数: 可以设置为0 或EPOLL_CLOEXEC,为0时函数表现与epoll_create一致, EPOLL_CLOEXEC标志与open 时的O_CLOEXEC 标志类似,即进程被替换时会关闭打开的文件描述符。

        创建一个epoll的句柄。自从linux2.6.8之后,size参数是被忽略的。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽

第二个函数

      epoll的事件注册函数,它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。
                第一个参数是epoll_create()的返回值。
                第二个参数表示动作,用三个宏来表示:
                      EPOLL_CTL_ADD:注册新的fd到epfd中;EPOLL_CTL_MOD:修改已经注册的fd的监听事件;EPOLL_CTL_DEL:从epfd中删除一个fd;
               第三个参数是需要监听的fd。
               第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:

struct epoll_event
{
    uint32_t     events;      /* Epoll events */
    epoll_data_t data;        /* User data variable */
};
typedef union epoll_data
{
    void        *ptr;
    int          fd;
    uint32_t     u32;
    uint64_t     u64;
} epoll_data_t;

events可以是以下几个宏的集合:

           EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);

           EPOLLOUT:表示对应的文件描述符可以写;

           EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);

           EPOLLERR:表示对应的文件描述符发生错误;

           EPOLLHUP:表示对应的文件描述符被挂断;

           EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。

           EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里


第三个函数

         收集在epoll监控的事件中已经发生的事件。参数events是分配好的epoll_event结构体数组,epoll将会把发生的事件赋值到events数组中(events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存)。maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时。

epoll的两种工作模式

          epoll有Level-Triggered和Edge-Triggered两种工作模式。

          Level-Triggered是缺省工作方式,有阻塞和非阻塞两种方式。内核告诉你一个描述符是否就绪,然后可以对就绪的fd进行IO操作。如果不做任何操作,内核还会继续通知,因此该模式下编程出错可能性小。传统的select/poll是这样的模型。此方式可以认为是一个快速的poll。

         Edge-Triggered是只支持非阻塞模式。当一个新的事件到达时,ET模式从epoll_wait调用中获取该事件,如果这次没有将该事件对应的套接字缓冲区处理完,在这个套接字中没有新的事件再次到来时,在ET模式下是无法再次从epoll_wait调用中获取这个事件的。而LT模式,只要一个事件对应的套接字缓冲区中还有数据,就总能从epoll_wait中获取这个事件。

          二者的差异在于 level-trigger 模式下只要某个 fd 处于 readable/writable 状态,无论什么时候进行 epoll_wait 都会返回该 fd;而 edge-trigger 模式下只有某个 fd 从unreadable 变为 readable 或从 unwritable 变为 writable 时,epoll_wait 才会返回该 fd

    使用LT意味着只要fd处于readable/writable状态,每次 epoll_wait 时都会返回该 fd,系统开销不说,自己处理时每次都要把这些fd轮询一遍,如果fd很多的话,不管这些fd有没有事件发生,epoll_wait 都会触发这些fd的轮询判断。
    查阅了一些资料,才知道常用的事件处理库很多都选择了 LT 模式,包括大家熟知的libevent和boost::asio等,为什么选择LT呢?那就不得不从ET的弊端的弊端说起。
    ET模式下,当有事件发生时,系统只会通知你一次,也就是调用epoll_wait 返回fd后,不管事件你处理与否,或者处理完全与否,再调用epoll_wait 时,都不会再返回该fd,这样programmer要自己保证在事件发生时及时有效的处理完。比如此时fd发生了EPOLLIN事件,在调用epoll_wait 后发现此事件,programmer要保证在本次轮询中对此fd进行了读操作,并且还要循环调用recv操作,一直读到recv的返回值小于请求值,或者遇到EAGAIN错误,不然下次轮询时,如果此fd没有再次触发事件,你就没有机会知道这个fd需要你的处理。这样无形中就增加了programmer的负担和出错的机会。
   ET模式的短处正是LT模式的长处,无论此fd是否有事件发生,或者有事件未处理完,每次epoll_wait 时总会得到此fd供你处理。显而易见,OS在LT模式下维护的 ready list 的大小肯定比ET模式下长,而且你自己轮询所有的fd时也要比ET下要多,这种消耗和ET模式下循环调用处理函数(如recv和send等),还要逻辑处理是否处理完毕,理论上应该是LT更大一些,不过个人感觉应该差别不会太大。但是LT模式下带来的逻辑处理的方便性和不易出错性,让我们有理由把它作为首选。我想这可能也是为什么epoll后来在ET的基础上又增加了LT,并且将其作为默认模式的原因吧。

在epoll的ET模式下,正确的读写方式为:
         读:只要可读,就一直读,直到返回0,或者 errno = EAGAIN
         写:只要可写,就一直写,直到数据发送完,或者 errno = EAGAIN

例如,向socket中写数据:

从socket中读数据:

epoll使用模型

epoll在ET模式下的使用,在下面代码段中,非阻塞模式下,函数do_use_fd函数新到达的文件描述符知道EAGAIN由read函数或write函数返回。

#define MAX_EVENTS 10
struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, conn_sock, nfds, epollfd;

/* Set up listening socket, 'listen_sock' (socket(),bind(), listen()) */
epollfd = epoll_create(10);
if (epollfd == -1) {
    perror("epoll_create");
    exit(EXIT_FAILURE);
}

ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
    perror("epoll_ctl: listen_sock");
    exit(EXIT_FAILURE);
}
for (;;) {
    nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
    if (nfds == -1) {
        perror("epoll_pwait");
        exit(EXIT_FAILURE);
    }
    for (n = 0; n < nfds; ++n) {
        if (events[n].data.fd == listen_sock) {
            conn_sock = accept(listen_sock,
                (struct sockaddr *) &local, &addrlen);
            if (conn_sock == -1) {
                perror("accept");
                exit(EXIT_FAILURE);
            }
            setnonblocking(conn_sock);
            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = conn_sock;
            if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock, &ev) == -1) {
                perror("epoll_ctl: conn_sock");
                exit(EXIT_FAILURE);
            }
        } else {
            do_use_fd(events[n].data.fd);
        }
    }
}

epoll完整的服务器例子

客户端使用《unix网络编程》(15)poll函数以及使用poll的客户服务器程序里的客户端。

服务器端代码(这里是github上源码):

//该版本使用epoll代替poll
#include "myheader.h"
#define OPEN_MAX 1024
#define MAXEVENTS 64 

//该函数是将套接字设置为非阻塞方式
//使用的代码是fcntl开启非阻塞I/O的典型代码
static int make_socket_non_blocking(int sfd) {
    int flags;
    if ((flags = fcntl(sfd, F_GETFL, 0)) == -1)
        err_quit("fcntl F_GETEL error");
    flags |= O_NONBLOCK;
    if (fcntl(sfd, F_SETFL, flags) == -1)
        err_quit("fcntl F_SETFL error");
    return 0;
}

int main(int argc, char **argv)
{
    int i, maxi, listenfd, connfd, sockfd;
    int nready;
    ssize_t n;
    char buf[MAXLINE];
    socklen_t clilen;
    struct epoll_event event, events[MAXEVENTS];

    struct sockaddr_in cliaddr, servaddr;
    listenfd = Socket(AF_INET, SOCK_STREAM, 0);
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(SERV_PORT);

    // 在socket()和bind()之间设置套接字选项避免地址使用错误:
    //结束服务器程序后“bind error: Address already in use” 
    int opt = 1;
    if ( setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1)
        err_exit("setsockopt error\n");

    Bind(listenfd, (const struct sockaddr*)&servaddr, sizeof(servaddr));

    //设置监听套接字为非阻塞模式
    if (make_socket_non_blocking(listenfd) == -1) {
        err_exit("make_socket_non_blocking error");
    }
    Listen(listenfd, LISTENQ);

    //创建一个epoll的句柄,该句柄占用一个fd值,因此epoll使用完后要关闭
    int efd = epoll_create1(EPOLL_CLOEXEC);
    if (efd == -1)
        err_exit("epoll_create1 error");
    event.data.fd = listenfd;          //要监听的事件类型,这里监听listen套接字
    event.events = EPOLLIN | EPOLLET;  //读入,边缘触发方式  
    //epoll的事件注册函数,注册新的fd:listenfd到efd,并指明要监听的事件
    int s = epoll_ctl(efd, EPOLL_CTL_ADD, listenfd, &event);
    if (s == -1)
        err_quit("epoll_ctl error");

    for( ; ; ) {
        int i;
        //收集监听到的消息,返回值nfds是已经准备好的描述符的个数
        //下面的for循环只扫描已经准备好的描述符,这正是epoll比poll的高效之处
        //定时器时间设置为-1表示一直等待知道有事件就绪
        int nfds = epoll_wait(efd, events, MAXEVENTS, -1);
        for (i = 0; i < nfds; i++) {
            /* An error has occured on this fd, or the socket is not 
            ready for reading (why were we notified then?) 
            如果描述符发生错误、被挂断或者不是可读的描述符都关闭该描述符并继续
            */   
            if ( (events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || 
                (!(events[i].events & EPOLLIN))) {
                    fprintf(stderr, "epoll error\n");
                    close(events[i].data.fd);
                    continue;
            }
            /* We have a notification on the listening socket, which 
            means one or more incoming connections. */   
            else if (listenfd == events[i].data.fd) {
                for ( ; ; ) {
                    struct sockaddr cliaddr;
                    char hbuf[MAXLINE], sbuf[MAXLINE];
                    socklen_t clilen = sizeof(cliaddr);
                    int connfd = accept(listenfd, (const struct sockaddr*)&cliaddr,
                        &clilen);
                    if (connfd == -1) {
                        //因为上边设置listenfd为非阻塞了,所以accept处理完listenfd后
                        //没有可以处理的套接字了,所以会返回EAGIN错误表示accept处理完了
                        //参考http://blog.csdn.net/u013074465/article/details/44993227
                        if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
                            break;
                        else { perror("accept DDDDDDD"); break;};

                    }
                    //将地址转化为主机名或者服务名 
                    //flag参数:以数字名返回主机地址和服务地址   
                    int s = getnameinfo(&cliaddr, clilen, hbuf, sizeof(hbuf),
                        sbuf, sizeof(sbuf), NI_NUMERICHOST
                        | NI_NUMERICSERV);
                    if (s == 0)  
                        printf("Accepted connection on descriptor %d "  
                        "(host=%s, port=%s)\n", connfd, hbuf, sbuf);  
                    /* Make the connection socket non-blocking and add it to the 
                    list of fds to monitor. */           
                    if (make_socket_non_blocking(connfd) == -1)
                        err_exit("make_socket_non_blocking connfd error");
                    event.data.fd = connfd;
                    event.events = EPOLLIN | EPOLLET;
                    if (epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event) == -1)
                        err_exit("epoll_ctl error connfd");
                }
                continue;
            }
            else {
                /* We have data on the fd waiting to be read. Read and 
                display it. We must read whatever data is available 
                completely, as we are running in edge-triggered mode 
                and won't get a notification again for the same 
                data. */   
                int done = 0;
                for ( ; ;) {
                    ssize_t count = read(events[i].data.fd, buf, sizeof(buf));
                    if (count == -1) {
                        /* If errno == EAGAIN, that means we have read all 
                        data. So go back to the main loop. */ 
                        if (errno != EAGAIN) {
                            printf("read...");
                            done = 1;
                        }
                        break;
                    }
                    else if (count == 0) {
                        /* End of file. The remote has closed the 
                        connection. */ 
                        done = 1;  
                        break;   
                    }

                    Write(STDOUT_FILENO, buf, count);  //write to stdout
                }
                if (done) {
                    printf("closed connection on descriptor %d\n", events[i].data.fd);
                    close(events[i].data.fd);
                }
            }
        }
    }
    close(listenfd);
    return EXIT_SUCCESS;
}

程序运行截图及源码下载

启动服务器后,服务器分别接到了两个客户端的连接,客户端1连接并发送两条消息后客户退出;客户2发送一条消息后,服务器退出。

服务器:

客户1:

客户2: