在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 教程/ 大數(shù)據(jù)/ Redis 事件驅(qū)動(dòng)詳解
Redis 數(shù)據(jù)淘汰機(jī)制
積分排行榜
小剖 Memcache
Redis 數(shù)據(jù)結(jié)構(gòu) intset
分布式鎖
從哪里開始讀起,怎么讀
Redis 數(shù)據(jù)結(jié)構(gòu) dict
不在浮沙筑高臺(tái)
Redis 集群(上)
Redis 監(jiān)視器
源碼閱讀工具
Redis 日志和斷言
內(nèi)存數(shù)據(jù)管理
Redis 數(shù)據(jù)結(jié)構(gòu)綜述
源碼日志
Web 服務(wù)器存儲(chǔ) session
消息中間件
Redis 與 Lua 腳本
什么樣的源代碼適合閱讀
Redis 數(shù)據(jù)結(jié)構(gòu) sds
Memcached slab 分配策略
訂閱發(fā)布機(jī)制
Redis 是如何提供服務(wù)的
Redis 事務(wù)機(jī)制
Redis 集群(下)
主從復(fù)制
Redis 應(yīng)用
RDB 持久化策略
Redis 數(shù)據(jù)遷移
Redis 事件驅(qū)動(dòng)詳解
初探 Redis
Redis 與 Memcache
AOF 持久化策略
Redis 數(shù)據(jù)結(jié)構(gòu) redisOb
作者簡介
Redis 數(shù)據(jù)結(jié)構(gòu) ziplist
Redis 數(shù)據(jù)結(jié)構(gòu) skiplist
Redis 哨兵機(jī)制

Redis 事件驅(qū)動(dòng)詳解

概述

在講述 Redis 如何提供服務(wù)之前,有必要介紹 Redis 的事件驅(qū)動(dòng)模型。

我們知道,進(jìn)程能夠進(jìn)行網(wǎng)絡(luò)的讀寫操作,但有些時(shí)候這些讀寫操作是不可行的,譬如因?yàn)閮?nèi)核的網(wǎng)絡(luò)發(fā)送緩沖區(qū)滿了導(dǎo)致不可寫;網(wǎng)絡(luò)收取緩存中無數(shù)據(jù)可讀,導(dǎo)致不可讀。那如果有一種機(jī)制,可以在一個(gè)事件(可讀或者可寫)發(fā)生的時(shí)候,才告知到進(jìn)程,這樣就避免了進(jìn)程在一個(gè)事件出現(xiàn)等待阻塞的情況,提高了進(jìn)程的吞吐能力。 Redis 內(nèi)部有一個(gè)小型的事件驅(qū)動(dòng),它和 Libevent 網(wǎng)絡(luò)庫的事件驅(qū)動(dòng)一樣,都是依托操作系統(tǒng)的 I/O 多路復(fù)用技術(shù)支撐起來的,這種 IO 驅(qū)動(dòng)模型有個(gè)經(jīng)典的名字:Reactor 模型,反應(yīng)爐。

利用 I/O 多路復(fù)用技術(shù),監(jiān)聽感興趣的 I/O 事件,例如讀事件,寫事件等,同時(shí)也要維護(hù)一個(gè)以文件描述符為主鍵,數(shù)據(jù)為某個(gè)預(yù)設(shè)函數(shù)的事件表,這里其實(shí)就是一個(gè)數(shù)組或者鏈表。當(dāng)事件觸發(fā)時(shí),比如某個(gè)文件描述符可讀,系統(tǒng)會(huì)返回文件描述符值,用這個(gè)值在事件表中找到相應(yīng)的數(shù)據(jù)項(xiàng)(包括回調(diào)函數(shù)等),從而實(shí)現(xiàn)回調(diào)。同樣的,定時(shí)事件也是可以實(shí)現(xiàn)的,因?yàn)橄到y(tǒng)提供的 I/O 多路復(fù)用技術(shù)中的函數(shù)允許我們?cè)O(shè)置等待超時(shí)的時(shí)間,預(yù)設(shè)定時(shí)間內(nèi)沒有事件發(fā)生時(shí),會(huì)返回。

上面一段話比較綜合,可能需要一些 Linux 系統(tǒng)編程和網(wǎng)絡(luò)編程的基礎(chǔ),但你會(huì)看到多數(shù) Reactor 事件驅(qū)動(dòng)程序都是這么實(shí)現(xiàn)的。

事件驅(qū)動(dòng)數(shù)據(jù)結(jié)構(gòu)

Redis 事件驅(qū)動(dòng)內(nèi)部有四個(gè)主要的數(shù)據(jù)結(jié)構(gòu),分別是:事件循環(huán)結(jié)構(gòu)體,文件事件結(jié)構(gòu)體,時(shí)間事件結(jié)構(gòu)體和觸發(fā)事件結(jié)構(gòu)體。

// 文件事件結(jié)構(gòu)體
/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE) */
  // 回調(diào)函數(shù)指針
   aeFileProc *rfileProc;
   aeFileProc *wfileProc;
  // clientData 參數(shù)一般是指向redisClient 的指針
   void *clientData;
} aeFileEvent;
// 時(shí)間事件結(jié)構(gòu)體
/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    // 定時(shí)回調(diào)函數(shù)指針
    aeTimeProc *timeProc;
    // 定時(shí)事件清理函數(shù),當(dāng)刪除定時(shí)事件的時(shí)候會(huì)被調(diào)用
    aeEventFinalizerProc *finalizerProc;
    // clientData 參數(shù)一般是指向redisClient 的指針
    void *clientData;
    // 定時(shí)事件表采用鏈表來維護(hù)
    struct aeTimeEvent *next;
} aeTimeEvent;
// 觸發(fā)事件
/* A fired event */
typedef struct aeFiredEvent {
    int fd;
    int mask;
} aeFiredEvent;
// 事件循環(huán)結(jié)構(gòu)體
/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd; /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    // 記錄最大的定時(shí)事件id + 1
    long long timeEventNextId;
    // 用于系統(tǒng)時(shí)間的矯正
    time_t lastTime; /* Used to detect system clock skew */
    // I/O 事件表
    aeFileEvent *events; /* Registered events */
    // 被觸發(fā)的事件
    aeFiredEvent *fired; /* Fired events */
    // 定時(shí)事件表
    aeTimeEvent *timeEventHead;
    // 事件循環(huán)結(jié)束標(biāo)識(shí)
    int stop;
    // 對(duì)于不同的I/O 多路復(fù)用技術(shù),有不同的數(shù)據(jù),詳見各自實(shí)現(xiàn)
    void *apidata; /* This is used for polling API specific data */
    // 新的循環(huán)前需要執(zhí)行的操作
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;

上面的數(shù)據(jù)結(jié)構(gòu)能給我們很好的提示:事件循環(huán)結(jié)構(gòu)體維護(hù) I/O 事件表,定時(shí)事件表和觸發(fā)事件表。

事件循環(huán)中心

Redis 的主函數(shù)中調(diào)用 initServer() 函數(shù)從而初始化事件循環(huán)中心(EventLoop),它的主要工作是在 aeCreateEventLoop() 中完成的。

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;
    // 分配空間
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    // 分配文件事件結(jié)構(gòu)體空間
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    // 分配已觸發(fā)事件結(jié)構(gòu)體空間
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    // 時(shí)間事件鏈表頭
    eventLoop->timeEventHead = NULL;
    // 后續(xù)提到
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    // 進(jìn)入事件循環(huán)前需要執(zhí)行的操作,此項(xiàng)會(huì)在redis main() 函數(shù)中設(shè)置
    eventLoop->beforesleep = NULL;
    // 在這里,aeApiCreate() 函數(shù)對(duì)于每個(gè)IO 多路復(fù)用模型的實(shí)現(xiàn)都有不同,
    // 具體參見源代碼,因?yàn)槊糠NIO 多路復(fù)用模型的初始化都不同
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
    // 初始化事件類型掩碼為無事件狀態(tài)
for (i = 0; i < setsize; i++)
    eventLoop->events[i].mask = AE_NONE;
return eventLoop;
    err:
if (eventLoop) {
     zfree(eventLoop->events);
     zfree(eventLoop->fired);
     zfree(eventLoop);
   }
   return NULL;
}

有上面初始化工作只是完成了一個(gè)空的事件中心而已,并沒有注冊(cè)一些感興趣的事件。要想驅(qū)動(dòng)事件循環(huán),還需要下面的工作。

Redis 事件驅(qū)動(dòng)原理

事件注冊(cè)詳解

文件 I/O 事件注冊(cè)主要操作在 aeCreateFileEvent() 中完成。aeCreateFileEvent() 會(huì)根據(jù)文件描述符的數(shù)值大小在事件循環(huán)結(jié)構(gòu)體的 I/O 事件表中取一個(gè)數(shù)據(jù)空間,利用系統(tǒng)提供的 I/O 多路復(fù)用技術(shù)監(jiān)聽感興趣的 I/O 事件,并設(shè)置回調(diào)函數(shù)。

http://wiki.jikexueyuan.com/project/redis/images/redis3.png" alt="" />

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
       aeFileProc *proc, void *clientData)
{
   if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
   }
   // 在I/O 事件表中選擇一個(gè)空間
   aeFileEvent *fe = &eventLoop->events[fd];
   // aeApiAddEvent() 只在此函數(shù)中調(diào)用,對(duì)于不同IO 多路復(fù)用實(shí)現(xiàn),會(huì)有所不同
  if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
        fe->mask |= mask;
   // 設(shè)置回調(diào)函數(shù)
   if (mask & AE_READABLE) fe->rfileProc = proc;
   if (mask & AE_WRITABLE) fe->wfileProc = proc;
        fe->clientData = clientData;
   if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
        return AE_OK;
}

對(duì)于不同版本的 I/O 多路復(fù)用,比如 epoll,select,kqueue 等,Redis 有各自的版本,但接口統(tǒng)一,譬如 aeApiAddEvent(),會(huì)有多個(gè)版本的實(shí)現(xiàn)。

http://wiki.jikexueyuan.com/project/redis/images/redis4.png" alt="" />

準(zhǔn)備監(jiān)聽工作

initServer() 中調(diào)用了 aeCreateEventLoop() 完成了事件中心的初始化,initServer() 還做了監(jiān)聽的準(zhǔn)備。

/* Open the TCP listening socket for the user commands. */
// listenToPort() 中有調(diào)用listen()
if (server.port != 0 &&
    listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
     exit(1);
// UNIX 域套接字
/* Open the listening Unix domain socket. */
if (server.unixsocket != NULL) {
    unlink(server.unixsocket); /* don't care if this fails */
    server.sofd = anetUnixServer(server.neterr,server.unixsocket,
    server.unixsocketperm);
if (server.sofd == ANET_ERR) {
    redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
    exit(1);
   }
}

從上面可以看出,Redis 提供了 TCP 和 UNIX 域套接字兩種工作方式。以 TCP 工作方式為例,listenPort() 創(chuàng)建綁定了套接字并啟動(dòng)了監(jiān)聽,這是網(wǎng)絡(luò)編程的基礎(chǔ)部分了。

為監(jiān)聽套接字注冊(cè)事件

在進(jìn)入事件循環(huán)前還需要做一些準(zhǔn)備工作。緊接著,initServer() 為所有的監(jiān)聽套接字注冊(cè)了讀事件(讀事件表示有新的連接到來),響應(yīng)函數(shù)為 acceptTcpHandler() 或者 acceptUnixHandler()。

// 創(chuàng)建接收TCP 或者UNIX 域套接字的事件處理
// TCP
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
    // acceptTcpHandler() tcp 連接接受處理函數(shù)
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
    acceptTcpHandler,NULL) == AE_ERR)
   {
       redisPanic(
         "Unrecoverable error creating server.ipfd file event.");
       }
   }
// UNIX 域套接字
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
    acceptUnixHandler,NULL) == AE_ERR)
    redisPanic("Unrecoverable error creating server.sofd file event.");

來看看 acceptTcpHandler() 做了什么:

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    char cip[REDIS_IP_STR_LEN];
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);
    // 接收客戶端請(qǐng)求
    cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
    // 出錯(cuò)
    if (cfd == AE_ERR) {
        redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
    return;
    }
    // 記錄
    redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
    // 真正有意思的地方
    acceptCommonHandler(cfd,0);
}

接收套接字與客戶端建立連接后,調(diào)用 acceptCommonHandler()。acceptCommonHandler()主要工作就是:

  1. 建立并保存服務(wù)端與客戶端的連接信息,這些信息保存在一個(gè) struct redisClient 結(jié)構(gòu) 體中;
  2. 為與客戶端連接的套接字注冊(cè)讀事件,相應(yīng)的回調(diào)函數(shù)為 readQueryFromClient(),readQueryFromClient() 作用是從套接字讀取數(shù)據(jù),執(zhí)行相應(yīng)操作并回復(fù)客戶端。

簡而言之,就是接收一個(gè) TCP 請(qǐng)求。

事件循環(huán)

以上做好了準(zhǔn)備工作,可以進(jìn)入事件循環(huán)。跳出 initServer() 回到 main() 中,main() 會(huì)調(diào)用 aeMain()。進(jìn)入事件循環(huán)發(fā)生在 aeProcessEvents() 中:

  1. 根據(jù)定時(shí)事件表計(jì)算需要等待的最短時(shí)間;
  2. 調(diào)用 redis api aeApiPoll() 進(jìn)入監(jiān)聽輪詢,如果沒有事件發(fā)生就會(huì)進(jìn)入睡眠狀態(tài),其實(shí) 就是 I/O 多路復(fù)用 select() epoll() 等的調(diào)用;
  3. 有事件發(fā)生會(huì)被喚醒,處理已觸發(fā)的 I/O 事件和定時(shí)事件。

來看看 aeMain() 的具體實(shí)現(xiàn):

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
    // 進(jìn)入事件循環(huán)可能會(huì)進(jìn)入睡眠狀態(tài)。在睡眠之前,執(zhí)行預(yù)設(shè)置的函數(shù)
    // aeSetBeforeSleepProc()。
    if (eventLoop->beforesleep != NULL)
        eventLoop->beforesleep(eventLoop);
     // AE_ALL_EVENTS 表示處理所有的事件
     aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

事件觸發(fā)

這里以 select 版本的 redis api 實(shí)現(xiàn)作為講解,aeApiPoll() 調(diào)用了 select() 進(jìn)入了監(jiān)聽輪 詢。aeApiPoll() 的 tvp 參數(shù)是最小等待時(shí)間,它會(huì)被預(yù)先計(jì)算出來,它主要完成:

  1. 拷貝讀寫的 fdset。select() 的調(diào)用會(huì)破壞傳入的fdset,實(shí)際上有兩份 fdset,一份作為 備份,另一份用作調(diào)用。每次調(diào)用 select() 之前都從備份中直接拷貝一份;
  2. 調(diào)用 select();
  3. 被喚醒后,檢查 fdset 中的每一個(gè)文件描述符,并將可讀或者可寫的描述符記錄到觸 發(fā)表當(dāng)中。

接下來的操作便是執(zhí)行相應(yīng)的回調(diào)函數(shù),代碼在上一段中已經(jīng)貼出:先處理 I/O 事件,再 處理定時(shí)事件。

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
   aeApiState *state = eventLoop->apidata;
   int retval, j, numevents = 0;
   /*
   真有意思,在aeApiState 結(jié)構(gòu)中:
   typedef struct aeApiState {
   fd_set rfds, wfds;
   fd_set _rfds, _wfds;
   } aeApiState;
   在調(diào)用select() 的時(shí)候傳入的是_rfds 和_wfds,所有監(jiān)聽的數(shù)據(jù)
   在rfds 和wfds 中。
   在下次需要調(diào)用selec() 的時(shí)候,會(huì)將rfds 和wfds 中的數(shù)據(jù)拷貝
   進(jìn)_rfds 和_wfds 中。*/
   memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
   memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
   retval = select(eventLoop->maxfd+1,
   &state->_rfds,&state->_wfds,NULL,tvp);
   if (retval > 0) {
   // 輪詢
   for (j = 0; j <= eventLoop->maxfd; j++) {
        int mask = 0;
        aeFileEvent *fe = &eventLoop->events[j];
   if (fe->mask == AE_NONE) continue;
   if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
        mask |= AE_READABLE;
   if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
        mask |= AE_WRITABLE;
    // 添加到觸發(fā)事件表中
       eventLoop->fired[numevents].fd = j;
       eventLoop->fired[numevents].mask = mask;
       numevents++;
    }
  }
  return numevents;
}

總結(jié)

http://wiki.jikexueyuan.com/project/redis/images/redis5.png" alt="" />

Redis 的事件驅(qū)動(dòng)總結(jié)如下:

  • 初始化事件循環(huán)結(jié)構(gòu)體
  • 注冊(cè)監(jiān)聽套接字的讀事件
  • 注冊(cè)定時(shí)事件
  • 進(jìn)入事件循環(huán)
  • 如果監(jiān)聽套接字變?yōu)榭勺x,會(huì)接收客戶端請(qǐng)求,并為對(duì)應(yīng)的套接字注冊(cè)讀事件
  • 如果與客戶端連接的套接字變?yōu)榭勺x,執(zhí)行相應(yīng)的操作
上一篇:初探 Redis下一篇:主從復(fù)制