(使用pika 0.9.5 Python客戶端)
在前面的教程中,我們實(shí)現(xiàn)了一個(gè)簡單的日志系統(tǒng)??梢园讶罩鞠V播給多個(gè)接收者。
本篇教程中我們打算新增一個(gè)功能 —— 使得它能夠只訂閱消息的一個(gè)字集。例如,我們只需要把嚴(yán)重的錯(cuò)誤日志信息寫入日志文件(存儲到磁盤),但同時(shí)仍然把所有的日志信息輸出到控制臺中
前面的例子,我們已經(jīng)創(chuàng)建過綁定(bindings),代碼如下:
channel.queue_bind(exchange=exchange_name,
queue=queue_name)
綁定(binding)是指交換機(jī)(exchange)和隊(duì)列(queue)的關(guān)系。可以簡單理解為:這個(gè)隊(duì)列(queue)對這個(gè)交換機(jī)(exchange)的消息感興趣。
綁定的時(shí)候可以帶上一個(gè)額外的 routing_key 參數(shù)。為了避免與basic_publish的參數(shù)混淆,我們把它叫做綁定鍵(binding key)。以下是如何創(chuàng)建一個(gè)帶綁定鍵的綁定。
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='black')
綁定鍵的意義取決于交換機(jī)(exchange)的類型。我們之前使用過的扇型交換機(jī)(fanout exchanges)會忽略這個(gè)值。
我們的日志系統(tǒng)廣播所有的消息給所有的消費(fèi)者(consumers)。我們打算擴(kuò)展它,使其基于日志的嚴(yán)重程度進(jìn)行消息過濾。例如我們也許只是希望將比較嚴(yán)重的錯(cuò)誤(error)日志寫入磁盤,以免在警告(warning)或者信息(info)日志上浪費(fèi)磁盤空間。
我們使用的扇型交換機(jī)(fanout exchange)沒有足夠的靈活性 —— 它能做的僅僅是廣播。
我們將會使用直連交換機(jī)(direct exchange)來代替。路由的算法很簡單 —— 交換機(jī)將會對綁定鍵(binding key)和路由鍵(routing key)進(jìn)行精確匹配,從而確定消息該分發(fā)到哪個(gè)隊(duì)列。
下圖能夠很好的描述這個(gè)場景:
http://wiki.jikexueyuan.com/project/rabbitmq/images/12.png" alt="" />
在這個(gè)場景中,我們可以看到直連交換機(jī) X 和兩個(gè)隊(duì)列進(jìn)行了綁定。第一個(gè)隊(duì)列使用 orange 作為綁定鍵,第二個(gè)隊(duì)列有兩個(gè)綁定,一個(gè)使用 black 作為綁定鍵,另外一個(gè)使用 green。
這樣以來,當(dāng)路由鍵為 orange 的消息發(fā)布到交換機(jī),就會被路由到隊(duì)列 Q1。路由鍵為 black 或者 green 的消息就會路由到 Q2。其他的所有消息都將會被丟棄。
http://wiki.jikexueyuan.com/project/rabbitmq/images/13.png" alt="" />
多個(gè)隊(duì)列使用相同的綁定鍵是合法的。這個(gè)例子中,我們可以添加一個(gè) X 和 Q1 之間的綁定,使用 black 綁定鍵。這樣一來,直連交換機(jī)就和扇型交換機(jī)的行為一樣,會將消息廣播到所有匹配的隊(duì)列。帶有 black 路由鍵的消息會同時(shí)發(fā)送到 Q1 和 Q2。
我們將會發(fā)送消息到一個(gè)直連交換機(jī),把日志級別作為路由鍵。這樣接收日志的腳本就可以根據(jù)嚴(yán)重級別來選擇它想要處理的日志。我們先看看發(fā)送日志。
我們需要創(chuàng)建一個(gè)交換機(jī)(exchange):
channel.exchange_declare(exchange='direct_logs',
type='direct')
然后我們發(fā)送一則消息:
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
我們先假設(shè) “severity” 的值是 info、warning、error 中的一個(gè)。
處理接收消息的方式和之前差不多,只有一個(gè)例外,我們將會為我們感興趣的每個(gè)嚴(yán)重級別分別創(chuàng)建一個(gè)新的綁定。
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
http://wiki.jikexueyuan.com/project/rabbitmq/images/14.png" alt="" />
emit_log_direct.py 的代碼:
\#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()
receive_logs_direct.py 的代碼:
\#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
(sys.argv[0],)
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
如果你希望只是保存 warning 和 error 級別的日志到磁盤,只需要打開控制臺并輸入:
$ python receive_logs_direct.py warning error > logs_from_rabbit.log
如果你希望所有的日志信息都輸出到屏幕中,打開一個(gè)新的終端,然后輸入:
$ python receive_logs_direct.py info warning error
[*] Waiting for logs. To exit press CTRL+C
如果要觸發(fā)一個(gè) error 級別的日志,只需要輸入:
$ python emit_log_direct.py error "Run. Run. Or it will explode."
[x] Sent 'error':'Run. Run. Or it will explode.'
這里是完整的代碼:(emit_log_direct.py 和 receive_logs_direct.py)