接觸 Linux 系統(tǒng)編程的時(shí)候,曾經(jīng)學(xué)到消息隊(duì)列是 IPC 的一種方式,這種通訊方式通常只用于本地的進(jìn)程,基于共享內(nèi)存的《無(wú)鎖消息隊(duì)列》即是一個(gè)很好的中間件,詳見這里。但這篇提到的消息隊(duì)列,也被稱為消息中間件,通常在分布式系統(tǒng)中用到。
提及消息中間件的時(shí)候,還會(huì)涉及生產(chǎn)者和消費(fèi)者兩個(gè)概念。消息中間件是負(fù)責(zé)接收來(lái)自生產(chǎn)者的消息,并存儲(chǔ)并轉(zhuǎn)發(fā)給對(duì)應(yīng)的消費(fèi)者,生產(chǎn)者可以按 topic 發(fā)布各樣消息,消費(fèi)者也可以按 topic 訂閱各樣消息。生產(chǎn)者只管往消息隊(duì)列里推送消息,不用等待消費(fèi)者的回應(yīng);消費(fèi)者只管從消息隊(duì)列中取出數(shù)據(jù)并處理,可用可靠性等問題都交由消息中間件來(lái)負(fù)責(zé)。
http://wiki.jikexueyuan.com/project/redis/images/w.png" alt="" />
說(shuō)白了,這種分布式的消息中間件即是網(wǎng)絡(luò)上一個(gè)服務(wù)器,我們可以往里面扔數(shù)據(jù),里面的數(shù)據(jù)會(huì)被消息中間件推送或者被別人拉取,消息中間件取到一個(gè)數(shù)據(jù)中轉(zhuǎn)的作用。生產(chǎn)者和消費(fèi)者通常有兩種對(duì)應(yīng)關(guān)系,一個(gè)生產(chǎn)者對(duì)應(yīng)一個(gè)消費(fèi)者,以及一個(gè)生產(chǎn)者對(duì)應(yīng)多個(gè)消費(fèi)者。在這篇文章中,介紹了消息中間件的三個(gè)特點(diǎn):解耦,異步和并行。讀者可以自行理解。一些不需要及時(shí)可靠響應(yīng)的業(yè)務(wù)場(chǎng)景,消息中間件可以大大提高業(yè)務(wù)上層的 吞吐量。
目前消息中間件一族里邊有一些優(yōu)秀的作品,RabbitMQ, Jafka/Kafka。redis 也可以作為一個(gè)入門級(jí)的消息隊(duì)列。上面提到的一個(gè)生產(chǎn)者對(duì)應(yīng)一個(gè)消費(fèi)者,Redis 的 blist 可以實(shí)現(xiàn);一個(gè)生產(chǎn)者對(duì)應(yīng)多個(gè)消費(fèi)者,Redis 的pub/sub 模式可以實(shí)現(xiàn)。值得注意的是,使用 Redis 作為消息中間件,假如消費(fèi)者有一段時(shí)間斷開了與 Redis 的連接,它將不會(huì)收到這段時(shí)間內(nèi) Redis 內(nèi)的數(shù)據(jù),這一點(diǎn)從 pub/sub 的實(shí)現(xiàn)可以知道。嚴(yán)格意義上的消息中間件,需要保證數(shù)據(jù)的可靠性。
在平時(shí)的開發(fā)當(dāng)中,消息隊(duì)列算是最常見的應(yīng)用了。在本機(jī)的時(shí)候,可以使用系統(tǒng)提供的消息隊(duì)列,或者基于共享內(nèi)存的循環(huán)消息隊(duì)列,來(lái)實(shí)現(xiàn)本機(jī)進(jìn)程以及進(jìn)程之間的通信。對(duì)于異機(jī)部署的多個(gè)進(jìn)程,就需要用到分布式的消息隊(duì)列了,來(lái)看看這個(gè)場(chǎng)景:
http://wiki.jikexueyuan.com/project/redis/images/w1.png" alt="" />
生產(chǎn)者,基于 Redis 的消息隊(duì)列,3 個(gè) worker 組都分別部署在不同的機(jī)器上,生產(chǎn)者會(huì)快速將產(chǎn)出內(nèi)容(如需要存儲(chǔ)的數(shù)據(jù)或者日志等)推送到消息隊(duì)列服務(wù)器上,這是 worker group 就能消費(fèi)了。
這種實(shí)現(xiàn)可以借助 Redis 中的 blist 實(shí)現(xiàn)。在這里用 C 實(shí)現(xiàn)了一個(gè)生產(chǎn)者和 worker group 的示例代碼:
// comm.h
#ifndef COMM_H__
#define COMM_H__
#include <inttypes.h>
typedef struct {
char ip[32];
uint16_t port;
char queue_name[256];
} config_t ;
void Usage(char *program) {
printf("Usage: %s -h ip -p port -l test\n",program);
abort();
}
const size_t max_cmd_len = 512;
#endif
生產(chǎn)者的代碼:
// producer.cc
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include "comm.h"
#include "hiredis/hiredis.h"
void test_redis_client()
{
redisContext *rc = redisConnect("127.0.0.1",6379);
if(NULL == rc || rc != NULL && rc->err) {
fprintf(stderr,"error: %s\n",rc->errstr);
return;
}
// set name
redisReply *reply = (redisReply *)redisCommand(rc,"set name dylan");
printf("%s\n",reply->str);
// get name
reply = (redisReply *)redisCommand(rc,"get name");
printf("%s\n",reply->str);
}
int main(int argc, char *argv[]) {
if (argc < 7)
Usage(argv[0]);
config_t config;
for (int i = EOF;
(i = getopt(argc, argv, "h:p:l:")) != EOF;) {
switch (i) {
case 'h': snprintf(config.ip,sizeof(config.ip),"%s",optarg); break;
case 'p': config.port = atoi(optarg); break;
case 'l': snprintf(config.queue_name,sizeof(config.queue_name),"%s",
optarg); break;
default: Usage(argv[0]); break;
}
}
redisContext *rc = redisConnect(config.ip,config.port);
if (NULL == rc || rc != NULL && rc->err) {
fprintf(stderr,"error: %s\n",rc->errstr);
return -1;
}
redisReply *reply = NULL;
char cmd[max_cmd_len];
snprintf(cmd,sizeof(cmd),"LPUSH %s task",config.queue_name);
printf("cmd=%s\n",cmd);
int count = 100;
while (count--) {
reply = (redisReply *)redisCommand(rc,cmd);
if (reply && reply->type == REDIS_REPLY_INTEGER) {
} else {
printf("BLPUSH error\n");
}
}
return 0;
}
消費(fèi)者的代碼:
// consumer.cc
#include "comm.h"
#include "hiredis/hiredis.h"
int DoLogic(char *data, size_t len);
int main(int argc, char *argv[]) {
if (argc < 7)
Usage(argv[0]);
config_t config;
for (int i = EOF;
(i = getopt(argc, argv, "h:p:l:")) != EOF;) {
switch (i) {
case 'h': snprintf(config.ip,sizeof(config.ip),"%s",optarg); break;
case 'p': config.port = atoi(optarg); break;
case 'l': snprintf(config.queue_name,sizeof(config.queue_name),"%s",
optarg); break;
default: Usage(argv[0]); break;
}
}
redisContext *rc = redisConnect(config.ip,config.port);
if (NULL == rc || rc != NULL && rc->err) {
fprintf(stderr,"error: %s\n",rc->errstr);
return -1;
}
redisReply *reply = NULL;
char cmd[max_cmd_len];
snprintf(cmd,sizeof(cmd),"BRPOP %s task 30",config.queue_name);
int seq = 0;
while (true) {
reply = (redisReply *)redisCommand(rc,cmd);
if (reply && reply->type == REDIS_REPLY_STRING) {
DoLogic(reply->str,reply->len);
} else if (reply && reply->type == REDIS_REPLY_ARRAY) {
for (size_t i=0; i<reply->elements; i+=2) {
printf("%d->%s\n",seq++,reply->element[i]->str);
}
} else {
printf("BRPOP error, reply->type=%d\n",reply->type);
break;
}
}
return 0;
}
int DoLogic(char *data, size_t len) {
printf("reply=%s\n",data);
return 0;
}