在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 教程/ 大數(shù)據(jù)/ 路由(Routing)
介紹
路由(Routing)
為什么需要主題交換機(jī)?
發(fā)布/訂閱
工作隊(duì)列
遠(yuǎn)程過程調(diào)用(RPC)

路由(Routing)

(使用pika 0.9.5 Python客戶端)

在前面的教程中,我們實(shí)現(xiàn)了一個(gè)簡單的日志系統(tǒng)??梢园讶罩鞠V播給多個(gè)接收者。

本篇教程中我們打算新增一個(gè)功能 —— 使得它能夠只訂閱消息的一個(gè)字集。例如,我們只需要把嚴(yán)重的錯(cuò)誤日志信息寫入日志文件(存儲到磁盤),但同時(shí)仍然把所有的日志信息輸出到控制臺中

綁定(Bindings)

前面的例子,我們已經(jīng)創(chuàng)建過綁定(bindings),代碼如下:

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name)  

綁定(binding)是指交換機(jī)(exchange)和隊(duì)列(queue)的關(guān)系。可以簡單理解為:這個(gè)隊(duì)列(queue)對這個(gè)交換機(jī)(exchange)的消息感興趣。

綁定的時(shí)候可以帶上一個(gè)額外的 routing_key 參數(shù)。為了避免與basic_publish的參數(shù)混淆,我們把它叫做綁定鍵(binding key)。以下是如何創(chuàng)建一個(gè)帶綁定鍵的綁定。

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')  

綁定鍵的意義取決于交換機(jī)(exchange)的類型。我們之前使用過的扇型交換機(jī)(fanout exchanges)會忽略這個(gè)值。

直連交換機(jī)(Direct exchange)

我們的日志系統(tǒng)廣播所有的消息給所有的消費(fèi)者(consumers)。我們打算擴(kuò)展它,使其基于日志的嚴(yán)重程度進(jìn)行消息過濾。例如我們也許只是希望將比較嚴(yán)重的錯(cuò)誤(error)日志寫入磁盤,以免在警告(warning)或者信息(info)日志上浪費(fèi)磁盤空間。

我們使用的扇型交換機(jī)(fanout exchange)沒有足夠的靈活性 —— 它能做的僅僅是廣播。

我們將會使用直連交換機(jī)(direct exchange)來代替。路由的算法很簡單 —— 交換機(jī)將會對綁定鍵(binding key)和路由鍵(routing key)進(jìn)行精確匹配,從而確定消息該分發(fā)到哪個(gè)隊(duì)列。

下圖能夠很好的描述這個(gè)場景:

http://wiki.jikexueyuan.com/project/rabbitmq/images/12.png" alt="" />

在這個(gè)場景中,我們可以看到直連交換機(jī) X 和兩個(gè)隊(duì)列進(jìn)行了綁定。第一個(gè)隊(duì)列使用 orange 作為綁定鍵,第二個(gè)隊(duì)列有兩個(gè)綁定,一個(gè)使用 black 作為綁定鍵,另外一個(gè)使用 green。

這樣以來,當(dāng)路由鍵為 orange 的消息發(fā)布到交換機(jī),就會被路由到隊(duì)列 Q1。路由鍵為 black 或者 green 的消息就會路由到 Q2。其他的所有消息都將會被丟棄。

多個(gè)綁定(Multiple bindings)

http://wiki.jikexueyuan.com/project/rabbitmq/images/13.png" alt="" />

多個(gè)隊(duì)列使用相同的綁定鍵是合法的。這個(gè)例子中,我們可以添加一個(gè) X 和 Q1 之間的綁定,使用 black 綁定鍵。這樣一來,直連交換機(jī)就和扇型交換機(jī)的行為一樣,會將消息廣播到所有匹配的隊(duì)列。帶有 black 路由鍵的消息會同時(shí)發(fā)送到 Q1 和 Q2。

發(fā)送日志

我們將會發(fā)送消息到一個(gè)直連交換機(jī),把日志級別作為路由鍵。這樣接收日志的腳本就可以根據(jù)嚴(yán)重級別來選擇它想要處理的日志。我們先看看發(fā)送日志。

我們需要創(chuàng)建一個(gè)交換機(jī)(exchange):

channel.exchange_declare(exchange='direct_logs',
                         type='direct')  

然后我們發(fā)送一則消息:

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)  

我們先假設(shè) “severity” 的值是 info、warning、error 中的一個(gè)。

訂閱

處理接收消息的方式和之前差不多,只有一個(gè)例外,我們將會為我們感興趣的每個(gè)嚴(yán)重級別分別創(chuàng)建一個(gè)新的綁定。

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)  

代碼整合

http://wiki.jikexueyuan.com/project/rabbitmq/images/14.png" alt="" />

emit_log_direct.py 的代碼:

\#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()  

receive_logs_direct.py 的代碼:

\#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
                         (sys.argv[0],)
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()  

如果你希望只是保存 warning 和 error 級別的日志到磁盤,只需要打開控制臺并輸入:

$ python receive_logs_direct.py warning error > logs_from_rabbit.log  

如果你希望所有的日志信息都輸出到屏幕中,打開一個(gè)新的終端,然后輸入:

$ python receive_logs_direct.py info warning error
 [*] Waiting for logs. To exit press CTRL+C  

如果要觸發(fā)一個(gè) error 級別的日志,只需要輸入:

$ python emit_log_direct.py error "Run. Run. Or it will explode."
 [x] Sent 'error':'Run. Run. Or it will explode.'  

這里是完整的代碼:(emit_log_direct.pyreceive_logs_direct.py)