在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 教程/ 大數(shù)據(jù)/ 消息中間件
Redis 數(shù)據(jù)淘汰機(jī)制
積分排行榜
小剖 Memcache
Redis 數(shù)據(jù)結(jié)構(gòu) intset
分布式鎖
從哪里開始讀起,怎么讀
Redis 數(shù)據(jù)結(jié)構(gòu) dict
不在浮沙筑高臺(tái)
Redis 集群(上)
Redis 監(jiān)視器
源碼閱讀工具
Redis 日志和斷言
內(nèi)存數(shù)據(jù)管理
Redis 數(shù)據(jù)結(jié)構(gòu)綜述
源碼日志
Web 服務(wù)器存儲(chǔ) session
消息中間件
Redis 與 Lua 腳本
什么樣的源代碼適合閱讀
Redis 數(shù)據(jù)結(jié)構(gòu) sds
Memcached slab 分配策略
訂閱發(fā)布機(jī)制
Redis 是如何提供服務(wù)的
Redis 事務(wù)機(jī)制
Redis 集群(下)
主從復(fù)制
Redis 應(yīng)用
RDB 持久化策略
Redis 數(shù)據(jù)遷移
Redis 事件驅(qū)動(dòng)詳解
初探 Redis
Redis 與 Memcache
AOF 持久化策略
Redis 數(shù)據(jù)結(jié)構(gòu) redisOb
作者簡(jiǎn)介
Redis 數(shù)據(jù)結(jié)構(gòu) ziplist
Redis 數(shù)據(jù)結(jié)構(gòu) skiplist
Redis 哨兵機(jī)制

消息中間件

消息隊(duì)列簡(jiǎn)介

接觸 Linux 系統(tǒng)編程的時(shí)候,曾經(jīng)學(xué)到消息隊(duì)列是 IPC 的一種方式,這種通訊方式通常只用于本地的進(jìn)程,基于共享內(nèi)存的《無(wú)鎖消息隊(duì)列》即是一個(gè)很好的中間件,詳見這里。但這篇提到的消息隊(duì)列,也被稱為消息中間件,通常在分布式系統(tǒng)中用到。

提及消息中間件的時(shí)候,還會(huì)涉及生產(chǎn)者和消費(fèi)者兩個(gè)概念。消息中間件是負(fù)責(zé)接收來(lái)自生產(chǎn)者的消息,并存儲(chǔ)并轉(zhuǎn)發(fā)給對(duì)應(yīng)的消費(fèi)者,生產(chǎn)者可以按 topic 發(fā)布各樣消息,消費(fèi)者也可以按 topic 訂閱各樣消息。生產(chǎn)者只管往消息隊(duì)列里推送消息,不用等待消費(fèi)者的回應(yīng);消費(fèi)者只管從消息隊(duì)列中取出數(shù)據(jù)并處理,可用可靠性等問題都交由消息中間件來(lái)負(fù)責(zé)。

http://wiki.jikexueyuan.com/project/redis/images/w.png" alt="" />

說(shuō)白了,這種分布式的消息中間件即是網(wǎng)絡(luò)上一個(gè)服務(wù)器,我們可以往里面扔數(shù)據(jù),里面的數(shù)據(jù)會(huì)被消息中間件推送或者被別人拉取,消息中間件取到一個(gè)數(shù)據(jù)中轉(zhuǎn)的作用。生產(chǎn)者和消費(fèi)者通常有兩種對(duì)應(yīng)關(guān)系,一個(gè)生產(chǎn)者對(duì)應(yīng)一個(gè)消費(fèi)者,以及一個(gè)生產(chǎn)者對(duì)應(yīng)多個(gè)消費(fèi)者。在這篇文章中,介紹了消息中間件的三個(gè)特點(diǎn):解耦,異步和并行。讀者可以自行理解。一些不需要及時(shí)可靠響應(yīng)的業(yè)務(wù)場(chǎng)景,消息中間件可以大大提高業(yè)務(wù)上層的 吞吐量。

目前消息中間件一族里邊有一些優(yōu)秀的作品,RabbitMQ, Jafka/Kafka。redis 也可以作為一個(gè)入門級(jí)的消息隊(duì)列。上面提到的一個(gè)生產(chǎn)者對(duì)應(yīng)一個(gè)消費(fèi)者,Redis 的 blist 可以實(shí)現(xiàn);一個(gè)生產(chǎn)者對(duì)應(yīng)多個(gè)消費(fèi)者,Redis 的pub/sub 模式可以實(shí)現(xiàn)。值得注意的是,使用 Redis 作為消息中間件,假如消費(fèi)者有一段時(shí)間斷開了與 Redis 的連接,它將不會(huì)收到這段時(shí)間內(nèi) Redis 內(nèi)的數(shù)據(jù),這一點(diǎn)從 pub/sub 的實(shí)現(xiàn)可以知道。嚴(yán)格意義上的消息中間件,需要保證數(shù)據(jù)的可靠性。

分布式的消息隊(duì)列

在平時(shí)的開發(fā)當(dāng)中,消息隊(duì)列算是最常見的應(yīng)用了。在本機(jī)的時(shí)候,可以使用系統(tǒng)提供的消息隊(duì)列,或者基于共享內(nèi)存的循環(huán)消息隊(duì)列,來(lái)實(shí)現(xiàn)本機(jī)進(jìn)程以及進(jìn)程之間的通信。對(duì)于異機(jī)部署的多個(gè)進(jìn)程,就需要用到分布式的消息隊(duì)列了,來(lái)看看這個(gè)場(chǎng)景:

http://wiki.jikexueyuan.com/project/redis/images/w1.png" alt="" />

生產(chǎn)者,基于 Redis 的消息隊(duì)列,3 個(gè) worker 組都分別部署在不同的機(jī)器上,生產(chǎn)者會(huì)快速將產(chǎn)出內(nèi)容(如需要存儲(chǔ)的數(shù)據(jù)或者日志等)推送到消息隊(duì)列服務(wù)器上,這是 worker group 就能消費(fèi)了。

這種實(shí)現(xiàn)可以借助 Redis 中的 blist 實(shí)現(xiàn)。在這里用 C 實(shí)現(xiàn)了一個(gè)生產(chǎn)者和 worker group 的示例代碼:

// comm.h
#ifndef COMM_H__
#define COMM_H__
#include <inttypes.h>
typedef struct {
    char ip[32];
    uint16_t port;
    char queue_name[256];
} config_t ;
void Usage(char *program) {
    printf("Usage: %s -h ip -p port -l test\n",program);
    abort();
}
const size_t max_cmd_len = 512;
#endif

生產(chǎn)者的代碼:
// producer.cc
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>

#include "comm.h"

#include "hiredis/hiredis.h"

void test_redis_client()
{
    redisContext *rc = redisConnect("127.0.0.1",6379);
    if(NULL == rc || rc != NULL && rc->err) {
        fprintf(stderr,"error: %s\n",rc->errstr);
        return;
    }
    // set name
    redisReply *reply = (redisReply *)redisCommand(rc,"set name dylan");
    printf("%s\n",reply->str);
    // get name
    reply = (redisReply *)redisCommand(rc,"get name");
    printf("%s\n",reply->str);
    }
    int main(int argc, char *argv[]) {
    if (argc < 7)
        Usage(argv[0]);
    config_t config;
    for (int i = EOF;
            (i = getopt(argc, argv, "h:p:l:")) != EOF;) {
        switch (i) {
            case 'h': snprintf(config.ip,sizeof(config.ip),"%s",optarg); break;
            case 'p': config.port = atoi(optarg); break;
            case 'l': snprintf(config.queue_name,sizeof(config.queue_name),"%s",
            optarg); break;
            default: Usage(argv[0]); break;
        }
    }
    redisContext *rc = redisConnect(config.ip,config.port);
    if (NULL == rc || rc != NULL && rc->err) {
        fprintf(stderr,"error: %s\n",rc->errstr);
        return -1;
    }
    redisReply *reply = NULL;
    char cmd[max_cmd_len];
    snprintf(cmd,sizeof(cmd),"LPUSH %s task",config.queue_name);
    printf("cmd=%s\n",cmd);
    int count = 100;
    while (count--) {
        reply = (redisReply *)redisCommand(rc,cmd);
    if (reply && reply->type == REDIS_REPLY_INTEGER) {
    } else {
        printf("BLPUSH error\n");
        }
    }
return 0;
}

消費(fèi)者的代碼:
// consumer.cc
#include "comm.h"
#include "hiredis/hiredis.h"
int DoLogic(char *data, size_t len);
int main(int argc, char *argv[]) {
    if (argc < 7)
        Usage(argv[0]);
    config_t config;
        for (int i = EOF;
            (i = getopt(argc, argv, "h:p:l:")) != EOF;) {
        switch (i) {
            case 'h': snprintf(config.ip,sizeof(config.ip),"%s",optarg); break;
            case 'p': config.port = atoi(optarg); break;
            case 'l': snprintf(config.queue_name,sizeof(config.queue_name),"%s",
            optarg); break;
            default: Usage(argv[0]); break;
        }
    }
    redisContext *rc = redisConnect(config.ip,config.port);
    if (NULL == rc || rc != NULL && rc->err) {
        fprintf(stderr,"error: %s\n",rc->errstr);
        return -1;
    }
    redisReply *reply = NULL;
    char cmd[max_cmd_len];
    snprintf(cmd,sizeof(cmd),"BRPOP %s task 30",config.queue_name);
    int seq = 0;
    while (true) {
        reply = (redisReply *)redisCommand(rc,cmd);
    if (reply && reply->type == REDIS_REPLY_STRING) {
        DoLogic(reply->str,reply->len);
    } else if (reply && reply->type == REDIS_REPLY_ARRAY) {
        for (size_t i=0; i<reply->elements; i+=2) {
        printf("%d->%s\n",seq++,reply->element[i]->str);
    }
    } else {
    printf("BRPOP error, reply->type=%d\n",reply->type);
    break;
        }
    }
    return 0;
}
    int DoLogic(char *data, size_t len) {
        printf("reply=%s\n",data);
return 0;
}