在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 教程/ 大數(shù)據(jù)/ 發(fā)布/訂閱
介紹
路由(Routing)
為什么需要主題交換機(jī)?
發(fā)布/訂閱
工作隊(duì)列
遠(yuǎn)程過(guò)程調(diào)用(RPC)

發(fā)布/訂閱

(使用pika 0.9.5 Python客戶端)

在上篇教程中,我們搭建了一個(gè)工作隊(duì)列,每個(gè)任務(wù)只分發(fā)給一個(gè)工作者(worker)。在本篇教程中,我們要做的跟之前完全不一樣 —— 分發(fā)一個(gè)消息給多個(gè)消費(fèi)者(consumers)。這種模式被稱為“發(fā)布/訂閱”。

為了描述這種模式,我們將會(huì)構(gòu)建一個(gè)簡(jiǎn)單的日志系統(tǒng)。它包括兩個(gè)程序——第一個(gè)程序負(fù)責(zé)發(fā)送日志消息,第二個(gè)程序負(fù)責(zé)獲取消息并輸出內(nèi)容。

在我們的這個(gè)日志系統(tǒng)中,所有正在運(yùn)行的接收方程序都會(huì)接受消息。我們用其中一個(gè)接收者(receiver)把日志寫入硬盤中,另外一個(gè)接受者(receiver)把日志輸出到屏幕上。

最終,日志消息被廣播給所有的接受者(receivers)。

交換機(jī)(Exchanges)

前面的教程中,我們發(fā)送消息到隊(duì)列并從中取出消息?,F(xiàn)在是時(shí)候介紹 RabbitMQ 中完整的消息模型了。

讓我們簡(jiǎn)單的概括一下之前的教程:

  • 發(fā)布者(producer)是發(fā)布消息的應(yīng)用程序。
  • 隊(duì)列(queue)用于消息存儲(chǔ)的緩沖。
  • 消費(fèi)者(consumer)是接收消息的應(yīng)用程序。

RabbitMQ 消息模型的核心理念是:發(fā)布者(producer)不會(huì)直接發(fā)送任何消息給隊(duì)列。事實(shí)上,發(fā)布者(producer)甚至不知道消息是否已經(jīng)被投遞到隊(duì)列。

發(fā)布者(producer)只需要把消息發(fā)送給一個(gè)交換機(jī)(exchange)。交換機(jī)非常簡(jiǎn)單,它一邊從發(fā)布者方接收消息,一邊把消息推送到隊(duì)列。交換機(jī)必須知道如何處理它接收到的消息,是應(yīng)該推送到指定的隊(duì)列還是是多個(gè)隊(duì)列,或者是直接忽略消息。這些規(guī)則是通過(guò)交換機(jī)類型(exchange type)來(lái)定義的。

http://wiki.jikexueyuan.com/project/rabbitmq/images/9.png" alt="" />

有幾個(gè)可供選擇的交換機(jī)類型:直連交換機(jī)(direct), 主題交換機(jī)(topic), (頭交換機(jī))headers和 扇型交換機(jī)(fanout)。我們?cè)谶@里主要說(shuō)明最后一個(gè) —— 扇型交換機(jī)(fanout)。先創(chuàng)建一個(gè) fanout 類型的交換機(jī),命名為 logs:

channel.exchange_declare(exchange='logs',
                         type='fanout')  

扇型交換機(jī)(fanout)很簡(jiǎn)單,你可能從名字上就能猜測(cè)出來(lái),它把消息發(fā)送給它所知道的所有隊(duì)列。這正是我們的日志系統(tǒng)所需要的。

交換器列表

rabbitmqctl 能夠列出服務(wù)器上所有的交換器:

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
logs      fanout
amq.direct      direct
amq.topic       topic
amq.fanout      fanout
amq.headers     headers
...done. 

這個(gè)列表中有一些叫做 amq.* 的交換器。這些都是默認(rèn)創(chuàng)建的,不過(guò)這時(shí)候你還不需要使用他們。

匿名的交換器

前面的教程中我們對(duì)交換機(jī)一無(wú)所知,但仍然能夠發(fā)送消息到隊(duì)列中。因?yàn)槲覀兪褂昧嗣麨榭兆址?("") 默認(rèn)的交換機(jī)。

回想我們之前是如何發(fā)布一則消息:

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)  

exchange 參數(shù)就是交換機(jī)的名稱??兆址砟J(rèn)或者匿名交換機(jī):消息將會(huì)根據(jù)指定的 routing_key 分發(fā)到指定的隊(duì)列。

現(xiàn)在,我們就可以發(fā)送消息到一個(gè)具名交換機(jī)了:

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message) 

臨時(shí)隊(duì)列

你還記得之前我們使用的隊(duì)列名嗎( hello 和 task_queue)?給一個(gè)隊(duì)列命名是很重要的——我們需要把工作者(workers)指向正確的隊(duì)列。如果你打算在發(fā)布者(producers)和消費(fèi)者(consumers)之間共享同隊(duì)列的話,給隊(duì)列命名是十分重要的。

但是這并不適用于我們的日志系統(tǒng)。我們打算接收所有的日志消息,而不僅僅是一小部分。我們關(guān)心的是最新的消息而不是舊的。為了解決這個(gè)問(wèn)題,我們需要做兩件事情。

首先,當(dāng)我們連接上 RabbitMQ 的時(shí)候,我們需要一個(gè)全新的、空的隊(duì)列。我們可以手動(dòng)創(chuàng)建一個(gè)隨機(jī)的隊(duì)列名,或者讓服務(wù)器為我們選擇一個(gè)隨機(jī)的隊(duì)列名(推薦)。我們只需要在調(diào)用 queue_declare 方法的時(shí)候,不提供 queue 參數(shù)就可以了:

result = channel.queue_declare()  

這時(shí)候我們可以通過(guò) result.method.queue 獲得已經(jīng)生成的隨機(jī)隊(duì)列名。它可能是這樣子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。

第二步,當(dāng)與消費(fèi)者(consumer)斷開連接的時(shí)候,這個(gè)隊(duì)列應(yīng)當(dāng)被立即刪除。exclusive 標(biāo)識(shí)符即可達(dá)到此目的。

result = channel.queue_declare(exclusive=True)  

綁定(Bindings)

http://wiki.jikexueyuan.com/project/rabbitmq/images/10.png" alt="" />

我們已經(jīng)創(chuàng)建了一個(gè)扇型交換機(jī)(fanout)和一個(gè)隊(duì)列?,F(xiàn)在我們需要告訴交換機(jī)如何發(fā)送消息給我們的隊(duì)列。交換器和隊(duì)列之間的聯(lián)系我們稱之為綁定(binding)。

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)  

現(xiàn)在,logs 交換機(jī)將會(huì)把消息添加到我們的隊(duì)列中。

綁定(binding)列表

你可以使用 rabbitmqctl list_bindings 列出所有現(xiàn)存的綁定。

代碼整合

http://wiki.jikexueyuan.com/project/rabbitmq/images/11.png" alt="" />

發(fā)布日志消息的程序看起來(lái)和之前的沒(méi)有太大區(qū)別。最重要的改變就是我們把消息發(fā)送給 logs 交換機(jī)而不是匿名交換機(jī)。在發(fā)送的時(shí)候我們需要提供 routing_key參數(shù),但是它的值會(huì)被扇型交換機(jī)(fanout exchange)忽略。以下是emit_log.py 腳本:

\#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print " [x] Sent %r" % (message,)
connection.close()  

(emit_log.py 源文件)

正如你看到的那樣,在連接成功之后,我們聲明了一個(gè)交換器,這一個(gè)是很重要的,因?yàn)椴辉试S發(fā)布消息到不存在的交換器。

如果沒(méi)有綁定隊(duì)列到交換器,消息將會(huì)丟失。但這個(gè)沒(méi)有所謂,如果沒(méi)有消費(fèi)者監(jiān)聽,那么消息就會(huì)被忽略。

receive_logs.py 的代碼:

\#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r" % (body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()  

(receive_logs.py source)

這樣我們就完成了。如果你想把日志保存到文件中,只需要打開控制臺(tái)輸入:

$ python receive_logs.py > logs_from_rabbit.log  

如果你想在屏幕中查看日志,那么打開一個(gè)新的終端然后運(yùn)行:

$ python receive_logs.py  

當(dāng)然還要發(fā)送日志:

$ python emit_log.py  

使用 rabbitmqctl list_bindings 你可確認(rèn)已經(jīng)創(chuàng)建的隊(duì)列綁定。你可以看到運(yùn)行中的兩個(gè) receive_logs.py 程序:

$ sudo rabbitmqctl list_bindings
Listing bindings ...
 ...
logs    amq.gen-TJWkez28YpImbWdRKMa8sg==                []
logs    amq.gen-x0kymA4yPzAT6BoC/YP+zw==                []
...done.  

顯示結(jié)果很直觀:logs 交換器把數(shù)據(jù)發(fā)送給兩個(gè)系統(tǒng)命名的隊(duì)列。這就是我們所期望的。

如何監(jiān)聽消息的子集呢?讓我們移步教程4

上一篇:工作隊(duì)列