(使用Python 客戶端 —— pika 0.9.8)
上一篇教程里,我們改進了我們的日志系統(tǒng)。我們使用直連交換機替代了扇型交換機,從只能盲目的廣播消息改進為有可能選擇性的接收日志。
盡管直連交換機能夠改善我們的系統(tǒng),但是它也有它的限制 —— 沒辦法基于多個標(biāo)準(zhǔn)執(zhí)行路由操作。
在我們的日志系統(tǒng)中,我們不只希望訂閱基于嚴重程度的日志,同時還希望訂閱基于發(fā)送來源的日志。Unix 工具 syslog 就是同時基于嚴重程度 -severity (info/warn/crit...) 和 設(shè)備 -facility (auth/cron/kern...) 來路由日志的。
如果這樣的話,將會給予我們非常大的靈活性,我們既可以監(jiān)聽來源于 “cron” 的嚴重程度為 “critical errors” 的日志,也可以監(jiān)聽來源于 “kern” 的所有日志。
為了實現(xiàn)這個目的,接下來我們學(xué)習(xí)如何使用另一種更復(fù)雜的交換機 —— 主題交換機。
發(fā)送到主題交換機(topic exchange)的消息不可以攜帶隨意什么樣子的路由鍵(routing_key),它的路由鍵必須是一個由.分隔開的詞語列表。這些單詞隨便是什么都可以,但是最好是跟攜帶它們的消息有關(guān)系的詞匯。以下是幾個推薦的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。詞語的個數(shù)可以隨意,但是不要超過 255 字節(jié)。
綁定鍵也必須擁有同樣的格式。主題交換機背后的邏輯跟直連交換機很相似 —— 一個攜帶著特定路由鍵的消息會被主題交換機投遞給綁定鍵與之想匹配的隊列。但是它的綁定鍵和路由鍵有兩個特殊應(yīng)用方式:
下邊用圖說明:
http://wiki.jikexueyuan.com/project/rabbitmq/images/15.png" alt="" />
這個例子里,我們發(fā)送的所有消息都是用來描述小動物的。發(fā)送的消息所攜帶的路由鍵是由三個單詞所組成的,這三個單詞被兩個.分割開。路由鍵里的第一個單詞描述的是動物的手腳的利索程度,第二個單詞是動物的顏色,第三個是動物的種類。所以它看起來是這樣的:
我們創(chuàng)建了三個綁定:Q1 的綁定鍵為 .orange.,Q2 的綁定鍵為 ..rabbit 和 lazy.# 。
這三個綁定鍵被可以總結(jié)為:
一個攜帶有 quick.orange.rabbit 的消息將會被分別投遞給這兩個隊列。攜帶著 lazy.orange.elephant 的消息同樣也會給兩個隊列都投遞過去。另一方面攜帶有 quick.orange.fox 的消息會投遞給第一個隊列,攜帶有 lazy.brown.fox 的消息會投遞給第二個隊列。攜帶有 lazy.pink.rabbit 的消息只會被投遞給第二個隊列一次,即使它同時匹配第二個隊列的兩個綁定。攜帶著 quick.brown.fox 的消息不會投遞給任何一個隊列。
如果我們違反約定,發(fā)送了一個攜帶有一個單詞或者四個單詞("orange" or "quick.orange.male.rabbit")的消息時,發(fā)送的消息不會投遞給任何一個隊列,而且會丟失掉。
但是另一方面,即使 "lazy.orange.male.rabbit" 有四個單詞,他還是會匹配最后一個綁定,并且被投遞到第二個隊列中。
主題交換機是很強大的,它可以表現(xiàn)出跟其他交換機類似的行為
當(dāng)一個隊列的綁定鍵為 "#"(井號) 的時候,這個隊列將會無視消息的路由鍵,接收所有的消息。
當(dāng) * (星號) 和 # (井號) 這兩個特殊字符都未在綁定鍵中出現(xiàn)的時候,此時主題交換機就擁有的直連交換機的行為。
接下來我們會將主題交換機應(yīng)用到我們的日志系統(tǒng)中。在開始工作前,我們假設(shè)日志的路由鍵由兩個單詞組成,路由鍵看起來是這樣的:
代碼跟上一篇教程差不多。
emit_log_topic.py 的代碼:
\#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()
receive_logs_topic.py 的代碼:
\#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
執(zhí)行下邊命令 接收所有日志:
python receive_logs_topic.py "#"
執(zhí)行下邊命令 接收來自 ”kern“ 設(shè)備的日志:
python receive_logs_topic.py "kern.*"
執(zhí)行下邊命令 只接收嚴重程度為 ”critical“ 的日志:
python receive_logs_topic.py "*.critical"
執(zhí)行下邊命令 建立多個綁定:
python receive_logs_topic.py "kern.*" "*.critical"
執(zhí)行下邊命令 發(fā)送路由鍵為 "kern.critical" 的日志:
python emit_log_topic.py "kern.critical" "A critical kernel error"
執(zhí)行上邊命令試試看效果吧。另外,上邊代碼不會對路由鍵和綁定鍵做任何假設(shè),所以你可以在命令中使用超過兩個路由鍵參數(shù)。
(完整代碼參見 emit_logs_topic.py and receive_logs_topic.py)