RabbitMQ 是一個消息代理。它的核心原理非常簡單:接收和發(fā)送消息。你可以把它想像成一個郵局:你把信件放入郵箱,郵遞員就會把信件投遞到你的收件人處。在這個比喻中,RabbitMQ 就扮演著郵箱、郵局以及郵遞員的角色。
RabbitMQ 和郵局的主要區(qū)別是,它不是用來處理紙張的,它是用來接收、存儲和發(fā)送消息(message)這種二進制數(shù)據(jù)的。
一般提到 RabbitMQ 和消息,都會用到一些專有名詞。
http://wiki.jikexueyuan.com/project/rabbitmq/images/1.png" alt="" />
http://wiki.jikexueyuan.com/project/rabbitmq/images/2.png" alt="" />
需要指出的是生產(chǎn)者、消費者、代理需不要待在同一個設(shè)備上;事實上大多數(shù)應(yīng)用也確實不在會將他們放在一臺機器上。
(使用pika 0.9.5 Python客戶端)
我們的 “Hello world” 不會很復(fù)雜——僅僅發(fā)送一個消息,然后獲取它并輸出到屏幕。這樣以來我們需要兩個程序,一個用作發(fā)送消息,另一個接受消息并打印消息內(nèi)容
我們的大致的設(shè)計是這樣的:
http://wiki.jikexueyuan.com/project/rabbitmq/images/3.png" alt="" />
生產(chǎn)者(producer)把消息發(fā)送到一個名為 “hello” 的隊列中。消費者(consumer)從這個隊列中獲取消息。
RabbitMQ 使用的是 AMQP 協(xié)議。要使用她你就必須需要一個使用同樣協(xié)議的庫。幾乎所有的編程語言都有可選擇的庫。python 也是一樣,可以從以下幾個庫中選擇:
在這一系列教程中,我們打算使用 pika。要安裝 pika,你可以使用 pip 這個包管理工具:
$ sudo pip install pika==0.9.5
安裝過程依賴于 pip 和 git-core 兩個包,你需要先安裝它們。
easy_install pip
pip install pika==0.9.5
http://wiki.jikexueyuan.com/project/rabbitmq/images/5.png" alt="" />
我們第一個程序 send.py 會發(fā)送一個消息到隊列中。首先要做的事情就是建立一個到 RabbitMQ 服務(wù)器的連接。
\#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
現(xiàn)在我們已經(jīng)連接上服務(wù)器了,那么,在發(fā)送消息之前我們需要確認隊列是存在的。如果我們把消息發(fā)送到一個不存在的隊列,RabbitMQ 會丟棄這條消息。我門先創(chuàng)建一個名為 hello 的隊列,然后把消息發(fā)送到這個隊列中。
channel.queue_declare(queue='hello')
這時候我們就可以發(fā)送消息了,我們第一條消息只包含了 Hello World! 字符串,我們打算把它發(fā)送到我們的 hello 隊列。
在 RabbitMQ 中,消息是不能直接發(fā)送到隊列,它需要發(fā)送到交換機(exchange)中。我們不打算在這里深入討論它——你可以通過教程的第三部分了解更多?,F(xiàn)在我們所需要了解的是如何使用默認的交換機(exchange),它使用一個空字符串來標識。交換機允許我們指定某條消息需要投遞到哪個隊列,routing_key 參數(shù)必須指定為隊列的名稱:
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print " [x] Sent 'Hello World!'"
在退出程序之前,我們需要確認網(wǎng)絡(luò)緩沖已經(jīng)被刷寫、消息已經(jīng)投遞到 RabbitMQ。完成這些事情(正確的關(guān)閉連接)是很簡單的。
connection.close()
發(fā)送不成功!
如果這是你第一次使用 RabbitMQ,并且沒有看到 “Sent” 消息出現(xiàn)在屏幕上,你可能會抓耳撓腮不知所以。這也許是因為沒有足夠的磁盤空間給代理使用所造成的(代理默認需要 1Gb 的空閑空間),所以它才會拒絕接收消息。查看一下代理的日志確定并且減少必要的限制。配置文件文檔會告訴你如何更改磁盤空間限制(disk_free_limit)。
http://wiki.jikexueyuan.com/project/rabbitmq/images/6.png" alt="" />
我們的第二個程序 receive.py,將會從隊列中獲取消息并打印消息。
這次我們還是先要連接到 RabbitMQ 服務(wù)器。連接服務(wù)器的代碼和之前是一樣的。
下一步也和之前一樣,我們需要確認隊列是存在的。使用 queue_declare 創(chuàng)建一個隊列——我們可以運行這個命令很多次,但是只有一個隊列會被創(chuàng)建。
channel.queue_declare(queue='hello')
你也許要問: 為什么要重復(fù)聲明隊列呢 —— 我們已經(jīng)在前面的代碼中聲明過它了。如果我們確定了隊列是已經(jīng)存在的,那么我們可以不這么做,比如此前預(yù)先運行了send.py 程序??墒俏覀儾⒉淮_定哪個程序會首先運行。這種情況下,在程序中重復(fù)將隊列重復(fù)聲明一下是種值得推薦的做法。
你也許希望查看 RabbitMQ 中有哪些隊列、有多少消息在隊列中。此時你可以使用rabbitmqctl 工具(使用有權(quán)限的用戶):
bash
$ sudo rabbitmqctl list_queues
Listing queues ...
hello 0
...done.
(在 Windows 中不需要 sudo 命令)
從隊列中獲取消息相對來說稍顯復(fù)雜。需要為隊列定義一個回調(diào)(callback)函數(shù)。當我們獲取到消息的時候,Pika 庫就會調(diào)用此回調(diào)函數(shù)。這個回調(diào)函數(shù)會將接收到的消息內(nèi)容輸出到屏幕上。
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
下一步,我們需要告訴 RabbitMQ 這個回調(diào)函數(shù)將會從名為 "hello" 的隊列中接收消息:
channel.basic_consume(callback,
queue='hello',
no_ack=True)
要成功運行這些命令,我們必須保證隊列是存在的,我們的確可以確保它的存在——因為我們之前已經(jīng)使用 queue_declare 將其聲明過了。
no_ack 參數(shù)稍后會進行介紹。
最后,我們輸入一個用來等待消息數(shù)據(jù)并且在需要的時候運行回調(diào)函數(shù)的無限循環(huán)。
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()
send.py 的全部代碼:
\#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()
(send.py 源碼)
receive.py的全部代碼:
\#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
channel.start_consuming()
(receive.py source)
現(xiàn)在就可以在終端中運行我們的程序了。首先,用 send.py 重續(xù)發(fā)送一條消息:
$ python send.py
[x] Sent 'Hello World!'
生產(chǎn)者(producer)程序 send.py 每次運行之后就會停止?,F(xiàn)在我們就來接收消息:
$ python receive.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!'
成功了!我們已經(jīng)通過 RabbitMQ 發(fā)送第一條消息。你也許已經(jīng)注意到了,receive.py 程序并沒有退出。它一直在準備獲取消息,你可以通過 Ctrl-C 來中止它。
試下在新的終端中再次運行 send.py。
我們已經(jīng)學(xué)會如何發(fā)送消息到一個已知隊列中并接收消息。是時候移步到第二部分了,我們將會建立一個簡單的工作隊列(work queue)。