Redis6.x io事件驱动模型
一、redis启动流程
- server.c中的main方法是启动的入口,启动主要分为三个步骤:initServer、InitServerLast和aeMain(server.el)。
二、 initServer初始化Server启动相关的结构体

- aeCreateEventLoop:创建事件循环组el对象aeEventLoop
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
int flags;
} aeEventLoop;
- 设置accept监听和监听处理函数acceptTcpHandler
- aeCreateFileEvent函数设置accept监听和文件描述符fd对应的aeFileEvent监听函数rfileProc。其中accept监听是由aeApiAddEvent函数实现的,不同操作系统,aeApiAddEvent的实现方式不同的,生产环境大部分操作系统是linux系统,所以我们主要跟踪aeApiAddEvent的epoll实现,epoll_ctl()绑定文件描述符fd和ADD操作事件,后续el主线程通过epoll_await()监听事件的发生。
- 事件循环组el监听到accept事件后会根据fd调用对应accept监听处理函数acceptTcpHandler,通过系统函数accept()获得客户端socket对应的cfd,connCreateAcceptedSocket通过cfd创建链接conn,给conn中的客户端对应的fd绑定监听并设置监听函数readQueryFromClient(),最后,生成客户端client结构体,并保存在server.clients。
readQueryFromClient是客户端请求的入口函数,接收客户端的具体流程,在另外一篇文章介绍
/*networking:acceptTcpHandler. accept监听处理函数*/
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
anetCloexec(cfd);
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
//创建connection
connection *connCreateSocket() {
connection *conn = zcalloc(sizeof(connection));
conn->type = &CT_Socket;
conn->fd = -1;
return conn;
}
ConnectionType CT_Socket = {
.ae_handler = connSocketEventHandler,
.close = connSocketClose,
.write = connSocketWrite,
.read = connSocketRead,
.accept = connSocketAccept,
.connect = connSocketConnect,
.set_write_handler = connSocketSetWriteHandler,
.set_read_handler = connSocketSetReadHandler,
.get_last_error = connSocketGetLastError,
.blocking_connect = connSocketBlockingConnect,
.sync_write = connSocketSyncWrite,
.sync_read = connSocketSyncRead,
.sync_readline = connSocketSyncReadLine,
.get_type = connSocketGetType
}
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
if (func == conn->read_handler) return C_OK;
conn->read_handler = func;
if (!conn->read_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
else
if (aeCreateFileEvent(server.el,conn->fd,
AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}
void linkClient(client *c) {
listAddNodeTail(server.clients,c);
/* Note that we remember the linked list node where the client is stored,
* this way removing the client in unlinkClient() will not require
* a linear scan, but just a constant time operation. */
c->client_list_node = listLast(server.clients);
uint64_t id = htonu64(c->id);
raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
}
三、InitServerLast 初始化设置IO线程
void InitServerLast() {
bioInit();
initThreadedIO();
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
/* 为 threaded I/O初始化的数据 */
void initThreadedIO(void) {
server.io_threads_active = 0;
/*只有一个io现场的话,不需要开启初始化其他线程,io操作在main线程中 */
if (server.io_threads_num == 1) return;
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
setIOPendingCount(i, 0);
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
四、aeMain 主线程
/*主线程el事件循环组*/
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop,
AE_ALL_EVENTS | AE_CALL_BEFORE_SLEEP| AE_CALL_AFTER_SLEEP);
}
}
- aeMain是主线程事件循环组el的具体实现,以时间流水线时间片的方式执行,循环执行四个步骤: eventLoop->beforesleep(eventLoop)、aeApiPoll(eventLoop, tvp)、 eventLoop->aftersleep(eventLoop)和处理aeApiPoll事件的功能。
- 这里主要介绍aeApiPoll(eventLoop, tvp)和处理aeApiPoll事件的功能:
- aeApiPoll和前面介绍的aeApiAddEvent一样,根据不同的操作系统有不同的实现。这里linux系统,用epoll_wait来多路复用监听accept和socket的read事件,和前面注册的aeCreateFileEvent具体实现epoll_ctl对应。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
- 处理aeApiPoll事件,根据事件的文件描述符df,调用对应的event的事件中的回调函数fe->rfileProc(eventLoop,fd,fe->clientData,mask),rfileProc具体实现是前面介绍的通过aeCreateFileEvent注册的acceptTcpHandler或readQueryFromClient。
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
//根据mask&AE_READABLE类型,调用读处理函数
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}