在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 教程/ 大數(shù)據(jù)/ 訂閱發(fā)布機(jī)制
Redis 數(shù)據(jù)淘汰機(jī)制
積分排行榜
小剖 Memcache
Redis 數(shù)據(jù)結(jié)構(gòu) intset
分布式鎖
從哪里開始讀起,怎么讀
Redis 數(shù)據(jù)結(jié)構(gòu) dict
不在浮沙筑高臺(tái)
Redis 集群(上)
Redis 監(jiān)視器
源碼閱讀工具
Redis 日志和斷言
內(nèi)存數(shù)據(jù)管理
Redis 數(shù)據(jù)結(jié)構(gòu)綜述
源碼日志
Web 服務(wù)器存儲(chǔ) session
消息中間件
Redis 與 Lua 腳本
什么樣的源代碼適合閱讀
Redis 數(shù)據(jù)結(jié)構(gòu) sds
Memcached slab 分配策略
訂閱發(fā)布機(jī)制
Redis 是如何提供服務(wù)的
Redis 事務(wù)機(jī)制
Redis 集群(下)
主從復(fù)制
Redis 應(yīng)用
RDB 持久化策略
Redis 數(shù)據(jù)遷移
Redis 事件驅(qū)動(dòng)詳解
初探 Redis
Redis 與 Memcache
AOF 持久化策略
Redis 數(shù)據(jù)結(jié)構(gòu) redisOb
作者簡(jiǎn)介
Redis 數(shù)據(jù)結(jié)構(gòu) ziplist
Redis 數(shù)據(jù)結(jié)構(gòu) skiplist
Redis 哨兵機(jī)制

訂閱發(fā)布機(jī)制

兩種訂閱

Redis 提供兩個(gè)訂閱模式:頻道(channel)訂閱和 glob-style 模式(pattern)頻道訂閱。頻道訂閱容易理解,即CA(client A)向服務(wù)器訂閱了頻道 news,當(dāng) CB 向 news 發(fā)布消息的時(shí)候,CA 便能收到。

glob-style 模式(pattern)頻道訂閱,需要先解釋什么是 glob-style?舉一個(gè)簡(jiǎn)單的例子,rm *.jpg linux 下這條命令刪除當(dāng)前目錄下所有 jpg 圖片,所用到的是 glob-style 模式匹配,你可以將他理解為某種 style 的正則表達(dá)式;)

舉例,CA(client A)向服務(wù)器訂閱了頻道*.news

  • 當(dāng) CB 向 China.news 發(fā)布消息的時(shí)候,CA 能收到,
  • 當(dāng) CB 向 America.news 發(fā)布消息的時(shí)候,CA 能收到,
  • 當(dāng) CB 向 AV.news 發(fā)布消息的時(shí)候,CA 便能收到。

訂閱相關(guān)數(shù)據(jù)結(jié)構(gòu)

struct redisServer 和 struct redisClient 都維護(hù)了頻道和模式頻道,前者維護(hù)了所有頻道和訂閱頻道的客戶端,后者維護(hù)了客戶端自己訂閱的頻道。

struct redisServer {
    ......
    /* Pubsub */
    dict *pubsub_channels; /* Map channels to list of subscribed clients */
    list *pubsub_patterns; /* A list of pubsub_patterns */
    ......
}
typedef struct redisClient {
    ......
    // 用戶感興趣的頻道
    dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
    // 用戶感興趣的模式
    list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
    ......
} redisClient;
    // 模式頻道數(shù)據(jù)結(jié)構(gòu),list *pubsub_patterns 里的每個(gè)節(jié)點(diǎn)數(shù)據(jù)都是struct
    // pubsubPattern。
typedef struct pubsubPattern {
    redisClient *client;
    robj *pattern;
} pubsubPattern;

頻道訂閱是一個(gè) dict,每個(gè) channel 被哈希進(jìn)相應(yīng)的桶,每個(gè) channel 對(duì)應(yīng)一個(gè) clients,clients 都訂閱了此 channel。當(dāng)有消息發(fā)布的時(shí)候,檢索 channel,遍歷 clients,發(fā)布消息。

http://wiki.jikexueyuan.com/project/redis/images/redis20.png" alt="" />

模式頻道訂閱是一個(gè) list。當(dāng)有消息發(fā)布的時(shí)候,channel 與 glob-style pattern 匹配,發(fā)布消息。

http://wiki.jikexueyuan.com/project/redis/images/redis21.png" alt="" />

訂閱過(guò)程

兩種訂閱模式是維護(hù)上述兩種數(shù)據(jù)結(jié)構(gòu)的過(guò)程,

// 訂閱頻道
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
    struct dictEntry *de;
    list *clients = NULL;
    int retval = 0;
    // redisClient.pubsub_channels 中保存客戶端訂閱的所有頻道,可以查看客戶端
    // 訂閱了多少頻道以及客戶端是否訂閱某個(gè)頻道
    // server.pubsub_channels 中保存所有的頻道和每個(gè)頻道的訂閱客戶端,可以將
    // 消息發(fā)布到訂閱客戶端
    // 將頻道加入redisClient.pubsub_channels
    /* Add the channel to the client -> channels hash table */
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        // 在服務(wù)器負(fù)責(zé)維護(hù)的channel->clients 哈希表中尋找指定的頻道
        /* Add the client to the channel -> list of clients hash table */
        de = dictFind(server.pubsub_channels,channel);
        // 未找到客戶端指定的頻道,需要?jiǎng)?chuàng)建
    if (de == NULL) {
        clients = listCreate();
        // 將頻道加入server.pubsub_channels
        dictAdd(server.pubsub_channels,channel,clients);
        incrRefCount(channel);
        // 找到客戶端指定的頻道,直接獲取這個(gè)頻道
    } else {
        clients = dictGetVal(de);
    }
        // 將客戶端添加到鏈表的尾部
        listAddNodeTail(clients,c);
    }
    // 通知客戶端
    /* Notify the client */
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(
    c->pubsub_patterns));
    return retval;
}
// 訂閱模式頻道
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded,
or 0 if the client was already subscribed to that pattern. */
int pubsubSubscribePattern(redisClient *c, robj *pattern) {
    int retval = 0;
    // redisClient.pubsub_patterns 中保存客戶端訂閱的所有模式頻道,可以查看
    // 客戶端訂閱了多少頻道以及客戶端是否訂閱某個(gè)頻道
    // server.pubsub_patterns 中保存所有的模式頻道和每個(gè)模式頻道的訂閱客戶端
    // ,可以將消息發(fā)布到訂閱客戶端
    // 未訂閱模式頻道,插入
    if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
        retval = 1;
        pubsubPattern *pat;
        // 將模式頻道加入redisClient.pubsub_patterns
        listAddNodeTail(c->pubsub_patterns,pattern);
        incrRefCount(pattern);
        // 將模式頻道加入server.pubsub_patterns
        pat = zmalloc(sizeof(*pat));
        pat->pattern = getDecodedObject(pattern);
        pat->client = c;
        listAddNodeTail(server.pubsub_patterns,pat);
    }
    // 通知客戶端
    /* Notify the client */
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.psubscribebulk);
    addReplyBulk(c,pattern);
    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(
    c->pubsub_patterns));
    return retval;
}

取消訂閱的過(guò)程則相反。

消息發(fā)布

發(fā)布消息的過(guò)程則遍歷上述兩個(gè)數(shù)據(jù)結(jié)構(gòu)(dict 和list),并將消息發(fā)布到匹配頻道的所有客戶端。

// 發(fā)布消息
/* Publish a message */
int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    struct dictEntry *de;
    listNode *ln;
    listIter li;
    // 發(fā)布消息有兩個(gè)步驟,
    // 指定頻道的所有訂閱者發(fā)布消息
    // 指定模式頻道的所有訂閱者發(fā)布消息
    // 
    // 尋找頻道
    /* Send to clients listening for that channel */
    de = dictFind(server.pubsub_channels,channel);
    // 向頻道所有訂閱者發(fā)布信息
    if (de) {
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;
        listRewind(list,&li);
    while ((ln = listNext(&li)) != NULL) {
        redisClient *c = ln->value;
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.messagebulk);
        addReplyBulk(c,channel);
        addReplyBulk(c,message);
        receivers++;
    }
}
// 
// 進(jìn)行g(shù)lob-style 模式匹配
/* Send to clients listening to matching channels */
if (listLength(server.pubsub_patterns)) {
    listRewind(server.pubsub_patterns,&li);
    channel = getDecodedObject(channel);
    while ((ln = listNext(&li)) != NULL) {
        pubsubPattern *pat = ln->value;
        // 匹配成功,向訂閱者發(fā)布消息
        if (stringmatchlen((char*)pat->pattern->ptr,
                sdslen(pat->pattern->ptr),
                (char*)channel->ptr,
                sdslen(channel->ptr),0)) {
        addReply(pat->client,shared.mbulkhdr[4]);
        addReply(pat->client,shared.pmessagebulk);
        addReplyBulk(pat->client,pat->pattern);
        addReplyBulk(pat->client,channel);
        addReplyBulk(pat->client,message);
        receivers++;
        }
      }
   decrRefCount(channel);
  }
return receivers;
}

注意, 只要客戶端訂閱了頻道, 除了SUBCRIBE,UNSUBCRIBE,PSUBCRIBE,PSUBCRIBE,就不能執(zhí)行其他命令。

int processCommand(redisClient *c) {
    ......
    // 在訂閱發(fā)布模式下,只允許處理SUBSCRIBE 或者UNSUBSCRIBE 命令
    // 從下面的檢測(cè)條件可以看出:只要存在redisClient.pubsub_channels 或者
    // redisClient.pubsub_patterns,就代表處于訂閱發(fā)布模式下
    /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
    if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0)
        &&
        c->cmd->proc != subscribeCommand &&
        c->cmd->proc != unsubscribeCommand &&
        c->cmd->proc != psubscribeCommand &&
        c->cmd->proc != punsubscribeCommand) {
        addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed
                "in this context");
        return REDIS_OK;
        }
    ......
}