在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 教程/ 大數據/ 消息中間件
Redis 數據淘汰機制
積分排行榜
小剖 Memcache
Redis 數據結構 intset
分布式鎖
從哪里開始讀起,怎么讀
Redis 數據結構 dict
不在浮沙筑高臺
Redis 集群(上)
Redis 監(jiān)視器
源碼閱讀工具
Redis 日志和斷言
內存數據管理
Redis 數據結構綜述
源碼日志
Web 服務器存儲 session
消息中間件
Redis 與 Lua 腳本
什么樣的源代碼適合閱讀
Redis 數據結構 sds
Memcached slab 分配策略
訂閱發(fā)布機制
Redis 是如何提供服務的
Redis 事務機制
Redis 集群(下)
主從復制
Redis 應用
RDB 持久化策略
Redis 數據遷移
Redis 事件驅動詳解
初探 Redis
Redis 與 Memcache
AOF 持久化策略
Redis 數據結構 redisOb
作者簡介
Redis 數據結構 ziplist
Redis 數據結構 skiplist
Redis 哨兵機制

消息中間件

消息隊列簡介

接觸 Linux 系統(tǒng)編程的時候,曾經學到消息隊列是 IPC 的一種方式,這種通訊方式通常只用于本地的進程,基于共享內存的《無鎖消息隊列》即是一個很好的中間件,詳見這里。但這篇提到的消息隊列,也被稱為消息中間件,通常在分布式系統(tǒng)中用到。

提及消息中間件的時候,還會涉及生產者和消費者兩個概念。消息中間件是負責接收來自生產者的消息,并存儲并轉發(fā)給對應的消費者,生產者可以按 topic 發(fā)布各樣消息,消費者也可以按 topic 訂閱各樣消息。生產者只管往消息隊列里推送消息,不用等待消費者的回應;消費者只管從消息隊列中取出數據并處理,可用可靠性等問題都交由消息中間件來負責。

http://wiki.jikexueyuan.com/project/redis/images/w.png" alt="" />

說白了,這種分布式的消息中間件即是網絡上一個服務器,我們可以往里面扔數據,里面的數據會被消息中間件推送或者被別人拉取,消息中間件取到一個數據中轉的作用。生產者和消費者通常有兩種對應關系,一個生產者對應一個消費者,以及一個生產者對應多個消費者。在這篇文章中,介紹了消息中間件的三個特點:解耦,異步和并行。讀者可以自行理解。一些不需要及時可靠響應的業(yè)務場景,消息中間件可以大大提高業(yè)務上層的 吞吐量。

目前消息中間件一族里邊有一些優(yōu)秀的作品,RabbitMQ, Jafka/Kafka。redis 也可以作為一個入門級的消息隊列。上面提到的一個生產者對應一個消費者,Redis 的 blist 可以實現;一個生產者對應多個消費者,Redis 的pub/sub 模式可以實現。值得注意的是,使用 Redis 作為消息中間件,假如消費者有一段時間斷開了與 Redis 的連接,它將不會收到這段時間內 Redis 內的數據,這一點從 pub/sub 的實現可以知道。嚴格意義上的消息中間件,需要保證數據的可靠性。

分布式的消息隊列

在平時的開發(fā)當中,消息隊列算是最常見的應用了。在本機的時候,可以使用系統(tǒng)提供的消息隊列,或者基于共享內存的循環(huán)消息隊列,來實現本機進程以及進程之間的通信。對于異機部署的多個進程,就需要用到分布式的消息隊列了,來看看這個場景:

http://wiki.jikexueyuan.com/project/redis/images/w1.png" alt="" />

生產者,基于 Redis 的消息隊列,3 個 worker 組都分別部署在不同的機器上,生產者會快速將產出內容(如需要存儲的數據或者日志等)推送到消息隊列服務器上,這是 worker group 就能消費了。

這種實現可以借助 Redis 中的 blist 實現。在這里用 C 實現了一個生產者和 worker group 的示例代碼:

// comm.h
#ifndef COMM_H__
#define COMM_H__
#include <inttypes.h>
typedef struct {
    char ip[32];
    uint16_t port;
    char queue_name[256];
} config_t ;
void Usage(char *program) {
    printf("Usage: %s -h ip -p port -l test\n",program);
    abort();
}
const size_t max_cmd_len = 512;
#endif

生產者的代碼:
// producer.cc
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>

#include "comm.h"

#include "hiredis/hiredis.h"

void test_redis_client()
{
    redisContext *rc = redisConnect("127.0.0.1",6379);
    if(NULL == rc || rc != NULL && rc->err) {
        fprintf(stderr,"error: %s\n",rc->errstr);
        return;
    }
    // set name
    redisReply *reply = (redisReply *)redisCommand(rc,"set name dylan");
    printf("%s\n",reply->str);
    // get name
    reply = (redisReply *)redisCommand(rc,"get name");
    printf("%s\n",reply->str);
    }
    int main(int argc, char *argv[]) {
    if (argc < 7)
        Usage(argv[0]);
    config_t config;
    for (int i = EOF;
            (i = getopt(argc, argv, "h:p:l:")) != EOF;) {
        switch (i) {
            case 'h': snprintf(config.ip,sizeof(config.ip),"%s",optarg); break;
            case 'p': config.port = atoi(optarg); break;
            case 'l': snprintf(config.queue_name,sizeof(config.queue_name),"%s",
            optarg); break;
            default: Usage(argv[0]); break;
        }
    }
    redisContext *rc = redisConnect(config.ip,config.port);
    if (NULL == rc || rc != NULL && rc->err) {
        fprintf(stderr,"error: %s\n",rc->errstr);
        return -1;
    }
    redisReply *reply = NULL;
    char cmd[max_cmd_len];
    snprintf(cmd,sizeof(cmd),"LPUSH %s task",config.queue_name);
    printf("cmd=%s\n",cmd);
    int count = 100;
    while (count--) {
        reply = (redisReply *)redisCommand(rc,cmd);
    if (reply && reply->type == REDIS_REPLY_INTEGER) {
    } else {
        printf("BLPUSH error\n");
        }
    }
return 0;
}

消費者的代碼:
// consumer.cc
#include "comm.h"
#include "hiredis/hiredis.h"
int DoLogic(char *data, size_t len);
int main(int argc, char *argv[]) {
    if (argc < 7)
        Usage(argv[0]);
    config_t config;
        for (int i = EOF;
            (i = getopt(argc, argv, "h:p:l:")) != EOF;) {
        switch (i) {
            case 'h': snprintf(config.ip,sizeof(config.ip),"%s",optarg); break;
            case 'p': config.port = atoi(optarg); break;
            case 'l': snprintf(config.queue_name,sizeof(config.queue_name),"%s",
            optarg); break;
            default: Usage(argv[0]); break;
        }
    }
    redisContext *rc = redisConnect(config.ip,config.port);
    if (NULL == rc || rc != NULL && rc->err) {
        fprintf(stderr,"error: %s\n",rc->errstr);
        return -1;
    }
    redisReply *reply = NULL;
    char cmd[max_cmd_len];
    snprintf(cmd,sizeof(cmd),"BRPOP %s task 30",config.queue_name);
    int seq = 0;
    while (true) {
        reply = (redisReply *)redisCommand(rc,cmd);
    if (reply && reply->type == REDIS_REPLY_STRING) {
        DoLogic(reply->str,reply->len);
    } else if (reply && reply->type == REDIS_REPLY_ARRAY) {
        for (size_t i=0; i<reply->elements; i+=2) {
        printf("%d->%s\n",seq++,reply->element[i]->str);
    }
    } else {
    printf("BRPOP error, reply->type=%d\n",reply->type);
    break;
        }
    }
    return 0;
}
    int DoLogic(char *data, size_t len) {
        printf("reply=%s\n",data);
return 0;
}