学无先后,达者为师

网站首页 编程语言 正文

浅谈Redis6.x io事件驱动模型

作者:cap_qin 更新时间: 2022-08-19 编程语言

Redis6.x io事件驱动模型

一、redis启动流程

  • server.c中的main方法是启动的入口,启动主要分为三个步骤:initServer、InitServerLast和aeMain(server.el)。在这里插入图片描述

二、 initServer初始化Server启动相关的结构体

在这里插入图片描述

  • aeCreateEventLoop:创建事件循环组el对象aeEventLoop
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
    int flags;
} aeEventLoop;
  • 设置accept监听和监听处理函数acceptTcpHandler
    在这里插入图片描述
  1. aeCreateFileEvent函数设置accept监听和文件描述符fd对应的aeFileEvent监听函数rfileProc。其中accept监听是由aeApiAddEvent函数实现的,不同操作系统,aeApiAddEvent的实现方式不同的,生产环境大部分操作系统是linux系统,所以我们主要跟踪aeApiAddEvent的epoll实现,epoll_ctl()绑定文件描述符fd和ADD操作事件,后续el主线程通过epoll_await()监听事件的发生。
  2. 事件循环组el监听到accept事件后会根据fd调用对应accept监听处理函数acceptTcpHandler,通过系统函数accept()获得客户端socket对应的cfd,connCreateAcceptedSocket通过cfd创建链接conn,给conn中的客户端对应的fd绑定监听并设置监听函数readQueryFromClient(),最后,生成客户端client结构体,并保存在server.clients。
    readQueryFromClient是客户端请求的入口函数,接收客户端的具体流程,在另外一篇文章介绍
/*networking:acceptTcpHandler. accept监听处理函数*/
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        anetCloexec(cfd);
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
}
//创建connection
connection *connCreateSocket() {
    connection *conn = zcalloc(sizeof(connection));
    conn->type = &CT_Socket;
    conn->fd = -1;

    return conn;
}

ConnectionType CT_Socket = {
    .ae_handler = connSocketEventHandler,
    .close = connSocketClose,
    .write = connSocketWrite,
    .read = connSocketRead,
    .accept = connSocketAccept,
    .connect = connSocketConnect,
    .set_write_handler = connSocketSetWriteHandler,
    .set_read_handler = connSocketSetReadHandler,
    .get_last_error = connSocketGetLastError,
    .blocking_connect = connSocketBlockingConnect,
    .sync_write = connSocketSyncWrite,
    .sync_read = connSocketSyncRead,
    .sync_readline = connSocketSyncReadLine,
    .get_type = connSocketGetType
}
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    if (func == conn->read_handler) return C_OK;

    conn->read_handler = func;
    if (!conn->read_handler)
        aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
    else
        if (aeCreateFileEvent(server.el,conn->fd,
                    AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
    return C_OK;
}
void linkClient(client *c) {
    listAddNodeTail(server.clients,c);
    /* Note that we remember the linked list node where the client is stored,
     * this way removing the client in unlinkClient() will not require
     * a linear scan, but just a constant time operation. */
    c->client_list_node = listLast(server.clients);
    uint64_t id = htonu64(c->id);
    raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
}

三、InitServerLast 初始化设置IO线程

void InitServerLast() {
    bioInit();
    initThreadedIO();
    set_jemalloc_bg_thread(server.jemalloc_bg_thread);
    server.initial_memory_usage = zmalloc_used_memory();
}
/* 为 threaded I/O初始化的数据 */
void initThreadedIO(void) {
    server.io_threads_active = 0; 

    /*只有一个io现场的话,不需要开启初始化其他线程,io操作在main线程中 */
    if (server.io_threads_num == 1) return;

    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }

    /* Spawn and initialize the I/O threads. */
    for (int i = 0; i < server.io_threads_num; i++) {
        /* Things we do for all the threads including the main thread. */
        io_threads_list[i] = listCreate();
        if (i == 0) continue; /* Thread 0 is the main thread. */

        /* Things we do only for the additional threads. */
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        setIOPendingCount(i, 0);
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}

四、aeMain 主线程

/*主线程el事件循环组*/
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, 
        AE_ALL_EVENTS | AE_CALL_BEFORE_SLEEP| AE_CALL_AFTER_SLEEP);
    }
}
  • aeMain是主线程事件循环组el的具体实现,以时间流水线时间片的方式执行,循环执行四个步骤: eventLoop->beforesleep(eventLoop)、aeApiPoll(eventLoop, tvp)、 eventLoop->aftersleep(eventLoop)和处理aeApiPoll事件的功能。
    在这里插入图片描述
  • 这里主要介绍aeApiPoll(eventLoop, tvp)和处理aeApiPoll事件的功能:
  1. aeApiPoll和前面介绍的aeApiAddEvent一样,根据不同的操作系统有不同的实现。这里linux系统,用epoll_wait来多路复用监听accept和socket的read事件,和前面注册的aeCreateFileEvent具体实现epoll_ctl对应。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}
  1. 处理aeApiPoll事件,根据事件的文件描述符df,调用对应的event的事件中的回调函数fe->rfileProc(eventLoop,fd,fe->clientData,mask),rfileProc具体实现是前面介绍的通过aeCreateFileEvent注册的acceptTcpHandler或readQueryFromClient。
 for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

             //根据mask&AE_READABLE类型,调用读处理函数   
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            processed++;
        }

原文链接:https://blog.csdn.net/qlxin2080/article/details/126273430

栏目分类
最近更新