从epoll入门到redis中的epoll一、网络编程模板

本文目的是介绍一下epoll网络编程的入门,并对redis-server中epoll相关源代码进行分析

一、网络编程模板

常见的网络编程模式如下(以ipv4中tcp协议编程为例),

  1. 首先创建一个socket套接字,即用于监听的文件描述符listen_fd,

  2. 将它与具体的ip和端口号绑定,

  3. 开启监听,

  4. 使用一个循环来接受客户端的请求,

  5. 创建子进程或者线程来处理已经连接的请求

    //创建监听的文件描述符
    listen_fd = socket()
    //绑定ip和端口
    bind(listen_fd, ip和端口)
    //监听
    listen(listen_fd)
    //循环处理链接和读写操作
    while(1) {
    //主进程用来接收连接
    new_client_fd = accept()
    //创建子进程或线程处理,处理新的客户端的请求
    }

二、epoll编程模板

这种模式的问题在于创建子进程、线程都有系统调用,每来一个新的TCP连接都需要分配一个进程或者线程,如果达到C10K,意味着一台机器要维护1万个进程/线程,应对高并发的场景存在一定的性能问题。能不能让一个进程/线程来维护多个socket呢?当然,就是I/O多路复用技术。

一个进程虽然任一时刻只能处理一个请求,但是处理每个请求的事件时,耗时控制在 1 毫秒以内,这样 1 秒内就可以处理上千个请求,把时间拉长来看,多个请求复用了一个进程,这就是多路复用,这种思想很类似一个 CPU 并发多个进程,所以也叫做时分多路复用。
我们熟悉的 select/poll/epoll 内核提供给用户态的多路复用系统调用,进程可以通过一个系统调用函数从内核中获取多个事件。

对比select/poll/epoll 的文章很多,这里不再阐述。因为epoll在性能方面相比select、poll存在很大的优势,所以我们直接来看epoll编程。
epoll相关的函数只有3个:

//创建epoll的句柄
int epoll_create(int __size)
//将普通的网络文件描述符添加到epoll描述符中
int epoll_ctl(int __epfd, int __op, int __fd, struct epoll_event *__event)
//等待网络事件
int epoll_wait(int __epfd, struct epoll_event *__events, int __maxevents, int __timeout)
复制代码
  1. epoll_create是创建一个epoll的描述符epoll_fd
  2. epoll_ctl函数将epoll_fd ((int __epfd) 和 socket_fd (int __fd) ,添加 EPOLL_CTL_ADD (int __op) 或删除 EPOLL_CTL_DEL (int __op) 到epoll反应堆中,最后一个参数struct epoll_event *__event 是一个结构体,里边有2个参数需要设置:
    ①设置触发模式ev.events = EPOLLIN | EPOLLET; ,epoll的触发模式包括边缘触发和水平触发
    ②设置socket对应的fd:ev.data.fd = listen_fd;
  3. epoll_wait是获取触发的事件,第1个参数为epoll_fd, 第2个参数用于接收触发了事件的数组,后续处理就是遍历这个数组,第3个参数为可以处理的事件的最大值,第4个参数为等待时间,-1表示阻塞等待,0表示立即返回不等待,大于0的值为等待的时间。

来看一下epoll的编程模型:

//创建监听的文件描述符
listen_fd = socket()
//绑定ip和端口
bind(listen_fd, ip和端口)
//监听
listen(listen_fd)

//创建epoll句柄
epoll_fd = epoll_create(MAXEPOLLSIZE);

/**** 将监听的listen_fd添加到epoll中    【begin】****/
//创建 ev 变量,在epoll_ctl函数中使用
struct epoll_event ev;
//设置触发模式
ev.events = EPOLLIN | EPOLLET;
//设置fd变量
ev.data.fd = listen_fd;
//将listen_fd添加到epoll集合中
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev)
/**** 将监听的listen_fd添加到epoll中  【end】 ****/

//创建一个数组,用于接受所有触发的读写事件
struct epoll_event fired_events[MAXEPOLLSIZE];

//循环处理链接和读写操作
while(1) {
    //等待有事件发生,fired_events中存储已经触发的事件,-1表示没有超时时间,返回触发的事件数量
    epoll_event_nums = epoll_wait(epoll_fd, fired_events, curfds, -1);
    for(j = 0; j < epoll_event_nums; j++) {
        if(fired_events[j].data.fd == listen_fd) { //如果触发事件的描述符是 listen_fd)
            //1.执行 accept()函数
            new_client_fd = accept(listen_fd, xx, xx)
            //2.将新的客户端连接fd添加到epoll集合中
            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = new_client_fd;
            epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_client_fd, &ev)
        } else { //如果是已连接的客户端触发的事件,则进行读写操作
            //如果是读事件
            recv(fired_events[j].data.fd, buf, xx, xx)
            //如果是写事件
            send()
        }
    }
}
复制代码

完整示例:epoll入门示例

三、redis-server中的epoll

了解了epoll的基础知识,我们再来看一下redis中是怎么基于epoll编程的。本文使用的redis源码是5.0.0版本。

1.封装

redis针对不同的系统,会选用不同的I/O多路复用底层库,只有在linux系统中使用epoll库。在ae.c文件中可以看到如下代码:

#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif
复制代码

通过封装,形成了4个函数:

int aeApiCreate(aeEventLoop *eventLoop);
int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);
void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask);
int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);
复制代码

aeApiCreate对应的是epoll_create, aeApiAddEvent对应的是epoll_ctl(epoll_fd, EPOLL_CTL_ADD,……) , aeApiDelEvent对应的是epoll_ctl(epoll_fd, EPOLL_CTL_DEL,……),aeApiPoll对应的是epoll_wait 。
可以看到,每个函数的第一个参数都是aeEventLoop类型的变量,它其实是一个全局变量,保存在 server.el 中(server变量是redis-server启动的时候创建的全局变量),来看一下aeEventLoop中对我们有用的内容:

typedef struct aeEventLoop {
    int setsize; /* max number of file descriptors tracked */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    // ………………
} aeEventLoop;
复制代码

这里边有一个aeFiredEvent *fired变量,它里边保存的就是在epoll_wait中触发的所有事件,对应的就是我们第二节中说的 fired_events 数组,通过循环遍历处理所有的网络事件。那aeFileEvent *events 里边保存的是什么呢?我们会在下边的小结中讲到。

2.绑定事件

从第二节中可以看到,不管是接受新连接还是处理已有连接,无外乎读、写两个事件,因此设计一个结构体,为每种网络事件分别绑定不同的读写函数即可。redis中就是这么做的:
image.png

不论是哪种事件,在事件触发时,我们都可以拿到这个网络事件对应的fd,由此我们可以想到一个数据结构,设计一个map,key是fd,value是一个复合结构,其中包括读事件处理函数、写事件处理函数,redis中也确实有这样一个结构体:

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;
复制代码

rfileProc对应的就是读事件处理函数,wfileProc对应的是写事件处理函数,至于这次网络事件中调用哪个函数,通过mask来控制。
需要注意的是,redis中使用的不是真的map,而是直接使用的是一个数组。众所周知,linux系统中的fd是一个整型,而在系统中有个“最大文件句柄”的配置项,redis中创建的这个数组的大小就是“最大文件句柄”数,直接使用fd值的下标做为key。这个数据就是三-1节中说的aeEventLoop结构体中的events变量。
整个流程如下:当遍历aeEventLoop.fired数组时,通过aeEventLoop.fired.fd可以取到fd值,再通过aeEventLoop.events[fd]取到对应的aeFileEvent结构体,根据aeEventLoop.fired.mask值决定调用rfileProc函数或者wfileProc函数。

那么事件绑定的时机是什么时候呢?
首先看aeEventLoop中的events变量(即server.el)创建的时机:

// server.c文件 2040 行,initServer函数中
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
复制代码

aeCreateEventLoop函数:

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    //events的数量就是 server.maxclients (最大连接数的数量 + 一个常量值)
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    ……
}
复制代码

对于处理新连接的事件,redis在初始化函数 initServer(void)中就进行了绑定(server.c文件2129行),可以看到代码中,将AE_READABLE事件处理的函数绑定为acceptTcpHandler:

for (j = 0; j < server.ipfd_count; j++) {
    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
        acceptTcpHandler,NULL) == AE_ERR)
        {
            serverPanic(
                "Unrecoverable error creating server.ipfd file event.");
        }
}
复制代码

为什么外层是一个循环呢?因为一台机器可能有多个ip(多块网卡),redis为每一个ip创建了一个listen_fd
对于处理已连接的读事件,接受连接之后,创建新的客户端的时候绑定的:

client *createClient(int fd) {
    client *c = zmalloc(sizeof(client));

    /* passing -1 as fd it is possible to create a non connected client.
     * This is useful since all the commands needs to be executed
     * in the context of a client. When commands are executed in other
     * contexts (for instance a Lua script) we need a non connected client. */
    if (fd != -1) {
        anetNonBlock(NULL,fd);
        anetEnableTcpNoDelay(NULL,fd);
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }
    // ………………
}
复制代码

3.代码调试

在一个窗口开启调试:

gdb redis-server
(gdb) b acceptTcpHandler
(gdb) b readQueryFromClient
(gdb) r
复制代码

在另外一个窗口进行连接:

redis-cli
复制代码

触发了第一个断点:

Breakpoint 1, acceptTcpHandler (el=0x7ffff6c2b0a0, fd=11, privdata=0x0, mask=1) at networking.c:727
727	    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
复制代码

看一下调用栈:

(gdb) bt
#0  acceptTcpHandler (el=0x7ffff6c2b0a0, fd=11, privdata=0x0, mask=1) at networking.c:727
#1  0x000000000042a4fe in aeProcessEvents (eventLoop=0x7ffff6c2b0a0, flags=11) at ae.c:443
#2  0x000000000042a6e1 in aeMain (eventLoop=0x7ffff6c2b0a0) at ae.c:501
#3  0x0000000000437239 in main (argc=1, argv=0x7fffffffe438) at server.c:4194
复制代码

我们看到 el=0x7ffff6c2b0a0, fd=11 ,按照我们之前的说法,el.events[11]对应的应该是一个aeFileEvent结构体,其中rfileProc变量应该指向 acceptTcpHandler 函数,打印一下看看:

(gdb) p el.events[11]
$2 = {mask = 1, rfileProc = 0x441b51 <acceptTcpHandler>, wfileProc = 0x0, clientData = 0x0}
复制代码

和我们上边分析的一样。
再来分析一下readQueryFromClient,在gdb中按c 命令,到达下一个断点:

(gdb) c
Continuing.

Breakpoint 2, readQueryFromClient (el=0x7ffff6c2b0a0, fd=12, privdata=0x7ffff6d0d740, mask=1) at networking.c:1501
1501	    client *c = (client*) privdata;
复制代码

看一下调用栈:

(gdb) bt
#0  readQueryFromClient (el=0x7ffff6c2b0a0, fd=12, privdata=0x7ffff6d0d740, mask=1) at networking.c:1501
#1  0x000000000042a4fe in aeProcessEvents (eventLoop=0x7ffff6c2b0a0, flags=11) at ae.c:443
#2  0x000000000042a6e1 in aeMain (eventLoop=0x7ffff6c2b0a0) at ae.c:501
#3  0x0000000000437239 in main (argc=1, argv=0x7fffffffe438) at server.c:4194
复制代码

可以看到,这个新连接的fd是12,fd=12是一个新的连接,按照分析,el.events[12]对应的读处理函数应该是readQueryFromClient,打印一下:

(gdb) p el.events[12]
$4 = {mask = 1, rfileProc = 0x443987 <readQueryFromClient>, wfileProc = 0x0, clientData = 0x7ffff6d0d740}
复制代码

结果和我们预想的一样。
再回到调用栈的代码看一下,main函数中调用了aeMain,看一下aeMain中做了什么:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}
复制代码

可以看到,外层是一个while循环(对应到第二节中我们最外层的那个循环),循环体内是aeProcessEvents函数。我们再来看一下aeProcessEvents的内容:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    //处理时间事件,略过……
    //时间上调用了epoll_wait,返回触发事件的数量
    numevents = aeApiPoll(eventLoop, tvp);
    //循环处理已触发的事件
    for (j = 0; j < numevents; j++) {
        aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
        int mask = eventLoop->fired[j].mask;
        int fd = eventLoop->fired[j].fd;
        int fired = 0; /* Number of events fired for current fd. */
        int invert = fe->mask & AE_BARRIER;

        //如果是读事件,则调用 rfileProc 函数
        if (!invert && fe->mask & mask & AE_READABLE) {
            fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            fired++;
        }

        //如果是写事件,调用 wfileProc 函数
        if (fe->mask & mask & AE_WRITABLE) {
            if (!fired || fe->wfileProc != fe->rfileProc) {
                fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }
        }
        // 其他代码省略……
    }
    //其他代码省略……
}
复制代码

可以看到,首先调用epoll_wait的包装函数aeApiPoll,然后for循环处理已经出发的事件(对应第三节中的for循环),通过mask值来判断是读事件还是写事件,如果是读事件,则调用 rfileProc 函数,如果是写事件,调用 wfileProc 函数。
以上分析已经把redis中的整个网络事件处理串起来了,还遗留一个写事件处理,sendReplyToClient 函数,读者可以自行调试一下,看看它在什么场景用到。

四、总结

本文通过一个基本框架说明了epoll编程的基本模式,并抽离出来redis-server中相关的网络模块源码进行说明。本文的目的是通过简要模式帮大家理解epoll编程,所以redis中处理的很多很多细节我们没有提到,了解基本的框架之后,我们再去专研细节。

参考文章:

  1. 《这次答应我,一举拿下 I/O 多路复用!》
  2. 《原来 8 张图,就能学废 Reactor 和 Proactor》
  3. 《Redis5设计与源码分析》