(使用pika 0.9.5 Python客戶端)
在上篇教程中,我們搭建了一個(gè)工作隊(duì)列,每個(gè)任務(wù)只分發(fā)給一個(gè)工作者(worker)。在本篇教程中,我們要做的跟之前完全不一樣 —— 分發(fā)一個(gè)消息給多個(gè)消費(fèi)者(consumers)。這種模式被稱為“發(fā)布/訂閱”。
為了描述這種模式,我們將會(huì)構(gòu)建一個(gè)簡(jiǎn)單的日志系統(tǒng)。它包括兩個(gè)程序——第一個(gè)程序負(fù)責(zé)發(fā)送日志消息,第二個(gè)程序負(fù)責(zé)獲取消息并輸出內(nèi)容。
在我們的這個(gè)日志系統(tǒng)中,所有正在運(yùn)行的接收方程序都會(huì)接受消息。我們用其中一個(gè)接收者(receiver)把日志寫入硬盤中,另外一個(gè)接受者(receiver)把日志輸出到屏幕上。
最終,日志消息被廣播給所有的接受者(receivers)。
前面的教程中,我們發(fā)送消息到隊(duì)列并從中取出消息?,F(xiàn)在是時(shí)候介紹 RabbitMQ 中完整的消息模型了。
讓我們簡(jiǎn)單的概括一下之前的教程:
RabbitMQ 消息模型的核心理念是:發(fā)布者(producer)不會(huì)直接發(fā)送任何消息給隊(duì)列。事實(shí)上,發(fā)布者(producer)甚至不知道消息是否已經(jīng)被投遞到隊(duì)列。
發(fā)布者(producer)只需要把消息發(fā)送給一個(gè)交換機(jī)(exchange)。交換機(jī)非常簡(jiǎn)單,它一邊從發(fā)布者方接收消息,一邊把消息推送到隊(duì)列。交換機(jī)必須知道如何處理它接收到的消息,是應(yīng)該推送到指定的隊(duì)列還是是多個(gè)隊(duì)列,或者是直接忽略消息。這些規(guī)則是通過(guò)交換機(jī)類型(exchange type)來(lái)定義的。
http://wiki.jikexueyuan.com/project/rabbitmq/images/9.png" alt="" />
有幾個(gè)可供選擇的交換機(jī)類型:直連交換機(jī)(direct), 主題交換機(jī)(topic), (頭交換機(jī))headers和 扇型交換機(jī)(fanout)。我們?cè)谶@里主要說(shuō)明最后一個(gè) —— 扇型交換機(jī)(fanout)。先創(chuàng)建一個(gè) fanout 類型的交換機(jī),命名為 logs:
channel.exchange_declare(exchange='logs',
type='fanout')
扇型交換機(jī)(fanout)很簡(jiǎn)單,你可能從名字上就能猜測(cè)出來(lái),它把消息發(fā)送給它所知道的所有隊(duì)列。這正是我們的日志系統(tǒng)所需要的。
rabbitmqctl 能夠列出服務(wù)器上所有的交換器:
$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
logs fanout
amq.direct direct
amq.topic topic
amq.fanout fanout
amq.headers headers
...done.
這個(gè)列表中有一些叫做 amq.* 的交換器。這些都是默認(rèn)創(chuàng)建的,不過(guò)這時(shí)候你還不需要使用他們。
前面的教程中我們對(duì)交換機(jī)一無(wú)所知,但仍然能夠發(fā)送消息到隊(duì)列中。因?yàn)槲覀兪褂昧嗣麨榭兆址?("") 默認(rèn)的交換機(jī)。
回想我們之前是如何發(fā)布一則消息:
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
exchange 參數(shù)就是交換機(jī)的名稱??兆址砟J(rèn)或者匿名交換機(jī):消息將會(huì)根據(jù)指定的 routing_key 分發(fā)到指定的隊(duì)列。
現(xiàn)在,我們就可以發(fā)送消息到一個(gè)具名交換機(jī)了:
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
你還記得之前我們使用的隊(duì)列名嗎( hello 和 task_queue)?給一個(gè)隊(duì)列命名是很重要的——我們需要把工作者(workers)指向正確的隊(duì)列。如果你打算在發(fā)布者(producers)和消費(fèi)者(consumers)之間共享同隊(duì)列的話,給隊(duì)列命名是十分重要的。
但是這并不適用于我們的日志系統(tǒng)。我們打算接收所有的日志消息,而不僅僅是一小部分。我們關(guān)心的是最新的消息而不是舊的。為了解決這個(gè)問(wèn)題,我們需要做兩件事情。
首先,當(dāng)我們連接上 RabbitMQ 的時(shí)候,我們需要一個(gè)全新的、空的隊(duì)列。我們可以手動(dòng)創(chuàng)建一個(gè)隨機(jī)的隊(duì)列名,或者讓服務(wù)器為我們選擇一個(gè)隨機(jī)的隊(duì)列名(推薦)。我們只需要在調(diào)用 queue_declare 方法的時(shí)候,不提供 queue 參數(shù)就可以了:
result = channel.queue_declare()
這時(shí)候我們可以通過(guò) result.method.queue 獲得已經(jīng)生成的隨機(jī)隊(duì)列名。它可能是這樣子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。
第二步,當(dāng)與消費(fèi)者(consumer)斷開連接的時(shí)候,這個(gè)隊(duì)列應(yīng)當(dāng)被立即刪除。exclusive 標(biāo)識(shí)符即可達(dá)到此目的。
result = channel.queue_declare(exclusive=True)
http://wiki.jikexueyuan.com/project/rabbitmq/images/10.png" alt="" />
我們已經(jīng)創(chuàng)建了一個(gè)扇型交換機(jī)(fanout)和一個(gè)隊(duì)列?,F(xiàn)在我們需要告訴交換機(jī)如何發(fā)送消息給我們的隊(duì)列。交換器和隊(duì)列之間的聯(lián)系我們稱之為綁定(binding)。
channel.queue_bind(exchange='logs',
queue=result.method.queue)
現(xiàn)在,logs 交換機(jī)將會(huì)把消息添加到我們的隊(duì)列中。
你可以使用 rabbitmqctl list_bindings 列出所有現(xiàn)存的綁定。
http://wiki.jikexueyuan.com/project/rabbitmq/images/11.png" alt="" />
發(fā)布日志消息的程序看起來(lái)和之前的沒(méi)有太大區(qū)別。最重要的改變就是我們把消息發(fā)送給 logs 交換機(jī)而不是匿名交換機(jī)。在發(fā)送的時(shí)候我們需要提供 routing_key參數(shù),但是它的值會(huì)被扇型交換機(jī)(fanout exchange)忽略。以下是emit_log.py 腳本:
\#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print " [x] Sent %r" % (message,)
connection.close()
(emit_log.py 源文件)
正如你看到的那樣,在連接成功之后,我們聲明了一個(gè)交換器,這一個(gè)是很重要的,因?yàn)椴辉试S發(fā)布消息到不存在的交換器。
如果沒(méi)有綁定隊(duì)列到交換器,消息將會(huì)丟失。但這個(gè)沒(méi)有所謂,如果沒(méi)有消費(fèi)者監(jiān)聽,那么消息就會(huì)被忽略。
receive_logs.py 的代碼:
\#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r" % (body,)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
(receive_logs.py source)
這樣我們就完成了。如果你想把日志保存到文件中,只需要打開控制臺(tái)輸入:
$ python receive_logs.py > logs_from_rabbit.log
如果你想在屏幕中查看日志,那么打開一個(gè)新的終端然后運(yùn)行:
$ python receive_logs.py
當(dāng)然還要發(fā)送日志:
$ python emit_log.py
使用 rabbitmqctl list_bindings 你可確認(rèn)已經(jīng)創(chuàng)建的隊(duì)列綁定。你可以看到運(yùn)行中的兩個(gè) receive_logs.py 程序:
$ sudo rabbitmqctl list_bindings
Listing bindings ...
...
logs amq.gen-TJWkez28YpImbWdRKMa8sg== []
logs amq.gen-x0kymA4yPzAT6BoC/YP+zw== []
...done.
顯示結(jié)果很直觀:logs 交換器把數(shù)據(jù)發(fā)送給兩個(gè)系統(tǒng)命名的隊(duì)列。這就是我們所期望的。
如何監(jiān)聽消息的子集呢?讓我們移步教程4