在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 教程/ 大數(shù)據(jù)/ 介紹
介紹
路由(Routing)
為什么需要主題交換機?
發(fā)布/訂閱
工作隊列
遠程過程調(diào)用(RPC)

介紹

介紹

RabbitMQ 是一個消息代理。它的核心原理非常簡單:接收和發(fā)送消息。你可以把它想像成一個郵局:你把信件放入郵箱,郵遞員就會把信件投遞到你的收件人處。在這個比喻中,RabbitMQ 就扮演著郵箱、郵局以及郵遞員的角色。

RabbitMQ 和郵局的主要區(qū)別是,它不是用來處理紙張的,它是用來接收、存儲和發(fā)送消息(message)這種二進制數(shù)據(jù)的。

一般提到 RabbitMQ 和消息,都會用到一些專有名詞。

  • 生產(chǎn)(Producing)意思就是發(fā)送。發(fā)送消息的程序就是一個生產(chǎn)者(producer)。我們一般用 "P" 來表示:

http://wiki.jikexueyuan.com/project/rabbitmq/images/1.png" alt="" />

  • 隊列(queue)就是郵箱的名稱。消息通過你的應(yīng)用程序和 RabbitMQ 進行傳輸,它們能夠只存儲在一個隊列(queue)中。 隊列(queue)沒有任何限制,你要存儲多少消息都可以——基本上是一個無限的緩沖。多個生產(chǎn)者(producers)能夠把消息發(fā)送給同一個隊列,同樣,多個消費者(consumers)也能夠從同一個隊列(queue)中獲取數(shù)據(jù)。隊列可以繪制成這樣(圖上是隊列的名稱):

http://wiki.jikexueyuan.com/project/rabbitmq/images/2.png" alt="" />

  • 消費(Consuming)和獲取消息是一樣的意思。一個消費者(consumer)就是一個等待獲取消息的程序。我們把它繪制為 "C":

需要指出的是生產(chǎn)者、消費者、代理需不要待在同一個設(shè)備上;事實上大多數(shù)應(yīng)用也確實不在會將他們放在一臺機器上。

Hello World!

(使用pika 0.9.5 Python客戶端)

我們的 “Hello world” 不會很復(fù)雜——僅僅發(fā)送一個消息,然后獲取它并輸出到屏幕。這樣以來我們需要兩個程序,一個用作發(fā)送消息,另一個接受消息并打印消息內(nèi)容

我們的大致的設(shè)計是這樣的:

http://wiki.jikexueyuan.com/project/rabbitmq/images/3.png" alt="" />

生產(chǎn)者(producer)把消息發(fā)送到一個名為 “hello” 的隊列中。消費者(consumer)從這個隊列中獲取消息。

RabbitMQ 庫

RabbitMQ 使用的是 AMQP 協(xié)議。要使用她你就必須需要一個使用同樣協(xié)議的庫。幾乎所有的編程語言都有可選擇的庫。python 也是一樣,可以從以下幾個庫中選擇:

py-amqplib
txAMQP
pika

在這一系列教程中,我們打算使用 pika。要安裝 pika,你可以使用 pip 這個包管理工具:

$ sudo pip install pika==0.9.5  

安裝過程依賴于 pip 和 git-core 兩個包,你需要先安裝它們。

  • Ubuntu平臺
    $ sudo apt-get install python-pip git-core
  • Debian平臺
    $ sudo apt-get install python-setuptools git-core$ sudo easy_install pip
  • Windows平臺 運行easy_install的安裝程序setuptools即可,安裝后運行以下命令

    easy_install pip
    pip install pika==0.9.5

發(fā)送消息

http://wiki.jikexueyuan.com/project/rabbitmq/images/5.png" alt="" />

我們第一個程序 send.py 會發(fā)送一個消息到隊列中。首先要做的事情就是建立一個到 RabbitMQ 服務(wù)器的連接。

\#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()  

現(xiàn)在我們已經(jīng)連接上服務(wù)器了,那么,在發(fā)送消息之前我們需要確認隊列是存在的。如果我們把消息發(fā)送到一個不存在的隊列,RabbitMQ 會丟棄這條消息。我門先創(chuàng)建一個名為 hello 的隊列,然后把消息發(fā)送到這個隊列中。

channel.queue_declare(queue='hello')  

這時候我們就可以發(fā)送消息了,我們第一條消息只包含了 Hello World! 字符串,我們打算把它發(fā)送到我們的 hello 隊列。

在 RabbitMQ 中,消息是不能直接發(fā)送到隊列,它需要發(fā)送到交換機(exchange)中。我們不打算在這里深入討論它——你可以通過教程的第三部分了解更多?,F(xiàn)在我們所需要了解的是如何使用默認的交換機(exchange),它使用一個空字符串來標識。交換機允許我們指定某條消息需要投遞到哪個隊列,routing_key 參數(shù)必須指定為隊列的名稱:

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print " [x] Sent 'Hello World!'"  

在退出程序之前,我們需要確認網(wǎng)絡(luò)緩沖已經(jīng)被刷寫、消息已經(jīng)投遞到 RabbitMQ。完成這些事情(正確的關(guān)閉連接)是很簡單的。

connection.close()  

發(fā)送不成功!

如果這是你第一次使用 RabbitMQ,并且沒有看到 “Sent” 消息出現(xiàn)在屏幕上,你可能會抓耳撓腮不知所以。這也許是因為沒有足夠的磁盤空間給代理使用所造成的(代理默認需要 1Gb 的空閑空間),所以它才會拒絕接收消息。查看一下代理的日志確定并且減少必要的限制。配置文件文檔會告訴你如何更改磁盤空間限制(disk_free_limit)。

獲取數(shù)據(jù)

http://wiki.jikexueyuan.com/project/rabbitmq/images/6.png" alt="" />

我們的第二個程序 receive.py,將會從隊列中獲取消息并打印消息。

這次我們還是先要連接到 RabbitMQ 服務(wù)器。連接服務(wù)器的代碼和之前是一樣的。

下一步也和之前一樣,我們需要確認隊列是存在的。使用 queue_declare 創(chuàng)建一個隊列——我們可以運行這個命令很多次,但是只有一個隊列會被創(chuàng)建。

channel.queue_declare(queue='hello')  

你也許要問: 為什么要重復(fù)聲明隊列呢 —— 我們已經(jīng)在前面的代碼中聲明過它了。如果我們確定了隊列是已經(jīng)存在的,那么我們可以不這么做,比如此前預(yù)先運行了send.py 程序??墒俏覀儾⒉淮_定哪個程序會首先運行。這種情況下,在程序中重復(fù)將隊列重復(fù)聲明一下是種值得推薦的做法。

列出所有隊列

你也許希望查看 RabbitMQ 中有哪些隊列、有多少消息在隊列中。此時你可以使用rabbitmqctl 工具(使用有權(quán)限的用戶):

bash
   $ sudo rabbitmqctl list_queues
   Listing queues ...
   hello    0
   ...done.

(在 Windows 中不需要 sudo 命令)

從隊列中獲取消息相對來說稍顯復(fù)雜。需要為隊列定義一個回調(diào)(callback)函數(shù)。當我們獲取到消息的時候,Pika 庫就會調(diào)用此回調(diào)函數(shù)。這個回調(diào)函數(shù)會將接收到的消息內(nèi)容輸出到屏幕上。

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)  

下一步,我們需要告訴 RabbitMQ 這個回調(diào)函數(shù)將會從名為 "hello" 的隊列中接收消息:

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)  

要成功運行這些命令,我們必須保證隊列是存在的,我們的確可以確保它的存在——因為我們之前已經(jīng)使用 queue_declare 將其聲明過了。

no_ack 參數(shù)稍后會進行介紹。

最后,我們輸入一個用來等待消息數(shù)據(jù)并且在需要的時候運行回調(diào)函數(shù)的無限循環(huán)。

print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()  

整合

send.py 的全部代碼:

\#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()  

(send.py 源碼)

receive.py的全部代碼:

\#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

channel.start_consuming()  

(receive.py source)

現(xiàn)在就可以在終端中運行我們的程序了。首先,用 send.py 重續(xù)發(fā)送一條消息:

$ python send.py
[x] Sent 'Hello World!' 

生產(chǎn)者(producer)程序 send.py 每次運行之后就會停止?,F(xiàn)在我們就來接收消息:

$ python receive.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!'  

成功了!我們已經(jīng)通過 RabbitMQ 發(fā)送第一條消息。你也許已經(jīng)注意到了,receive.py 程序并沒有退出。它一直在準備獲取消息,你可以通過 Ctrl-C 來中止它。

試下在新的終端中再次運行 send.py。

我們已經(jīng)學(xué)會如何發(fā)送消息到一個已知隊列中并接收消息。是時候移步到第二部分了,我們將會建立一個簡單的工作隊列(work queue)。