(Python客戶端 —— 使用 pika 0.9.8)
在第二篇教程中我們介紹了如何使用工作隊(duì)列(work queue)在多個(gè)工作者(woker)中間分發(fā)耗時(shí)的任務(wù)。
可是如果我們需要將一個(gè)函數(shù)運(yùn)行在遠(yuǎn)程計(jì)算機(jī)上并且等待從那兒獲取結(jié)果時(shí),該怎么辦呢?這就是另外的故事了。這種模式通常被稱為遠(yuǎn)程過程調(diào)用(Remote Procedure Call)或者 RPC。
這篇教程中,我們會(huì)使用 RabbitMQ 來構(gòu)建一個(gè) RPC 系統(tǒng):包含一個(gè)客戶端和一個(gè) RPC 服務(wù)器。現(xiàn)在的情況是,我們沒有一個(gè)值得被分發(fā)的足夠耗時(shí)的任務(wù),所以接下來,我們會(huì)創(chuàng)建一個(gè)模擬RPC服務(wù)來返回斐波那契數(shù)列。
為了展示 RPC 服務(wù)如何使用,我們創(chuàng)建了一個(gè)簡(jiǎn)單的客戶端類。它會(huì)暴露出一個(gè)名為 “call” 的方法用來發(fā)送一個(gè) RPC 請(qǐng)求,并且在收到回應(yīng)前保持阻塞。
fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)
盡管 RPC 在計(jì)算領(lǐng)域是一個(gè)常用模式,但它也經(jīng)常被詬病。當(dāng)一個(gè)問題被拋出的時(shí)候,程序員往往意識(shí)不到這到底是由本地調(diào)用還是由較慢的 RPC 調(diào)用引起的。同樣的困惑還來自于系統(tǒng)的不可預(yù)測(cè)性和給調(diào)試工作帶來的不必要的復(fù)雜性。跟軟件精簡(jiǎn)不同的是,濫用 RPC 會(huì)導(dǎo)致不可維護(hù)的面條代碼.
考慮到這一點(diǎn),牢記以下建議:
確保能夠明確的搞清楚哪個(gè)函數(shù)是本地調(diào)用的,哪個(gè)函數(shù)是遠(yuǎn)程調(diào)用的。給你的系統(tǒng)編寫文檔。保持各個(gè)組件間的依賴明確。處理錯(cuò)誤案例。明了客戶端改如何處理 RPC 服務(wù)器的宕機(jī)和長(zhǎng)時(shí)間無響應(yīng)情況。
當(dāng)對(duì)避免使用 RPC 有疑問的時(shí)候。如果可以的話,你應(yīng)該盡量使用異步管道來代替RPC 類的阻塞。結(jié)果被異步地推送到下一個(gè)計(jì)算場(chǎng)景。
一般來說通過 RabbitMQ 來實(shí)現(xiàn) RPC 是很容易的。一個(gè)客戶端發(fā)送請(qǐng)求信息,服務(wù)器端將其應(yīng)用到一個(gè)回復(fù)信息中。為了接收到回復(fù)信息,客戶端需要在發(fā)送請(qǐng)求的時(shí)候同時(shí)發(fā)送一個(gè)回調(diào)隊(duì)列(callback queue)的地址。我們?cè)囋嚳矗?/p>
result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue
channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = callback_queue,
),
body=request)
# ... and some code to read a response message from the callback_queue ...
AMQP 協(xié)議給消息預(yù)定義了一系列的14個(gè)屬性。大多數(shù)屬性很少會(huì)用到,除了以下幾個(gè):
上邊介紹的方法中,我們建議給每一個(gè) RPC 請(qǐng)求新建一個(gè)回調(diào)隊(duì)列。這不是一個(gè)高效的做法,幸好這兒有一個(gè)更好的辦法 —— 我們可以為每個(gè)客戶端只建立一個(gè)獨(dú)立的回調(diào)隊(duì)列。
這就帶來一個(gè)新問題,當(dāng)此隊(duì)列接收到一個(gè)響應(yīng)的時(shí)候它無法辨別出這個(gè)響應(yīng)是屬于哪個(gè)請(qǐng)求的。correlation_id 就是為了解決這個(gè)問題而來的。我們給每個(gè)請(qǐng)求設(shè)置一個(gè)獨(dú)一無二的值。稍后,當(dāng)我們從回調(diào)隊(duì)列中接收到一個(gè)消息的時(shí)候,我們就可以查看這條屬性從而將響應(yīng)和請(qǐng)求匹配起來。如果我們接手到的消息的correlation_id 是未知的,那就直接銷毀掉它,因?yàn)樗粚儆谖覀兊娜魏我粭l請(qǐng)求。
你也許會(huì)問,為什么我們接收到未知消息的時(shí)候不拋出一個(gè)錯(cuò)誤,而是要將它忽略掉?這是為了解決服務(wù)器端有可能發(fā)生的競(jìng)爭(zhēng)情況。盡管可能性不大,但RPC服務(wù)器還是有可能在已將應(yīng)答發(fā)送給我們但還未將確認(rèn)消息發(fā)送給請(qǐng)求的情況下死掉。如果這種情況發(fā)生,RPC 在重啟后會(huì)重新處理請(qǐng)求。這就是為什么我們必須在客戶端優(yōu)雅的處理重復(fù)響應(yīng),同時(shí)RPC也需要盡可能保持冪等性。
http://wiki.jikexueyuan.com/project/rabbitmq/images/16.png" alt="" />
我們的 RPC 如此工作:
rpc_server.py代碼:
\#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print " [.] fib(%s)" % (n,)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print " [x] Awaiting RPC requests"
channel.start_consuming()
服務(wù)器端代碼相當(dāng)簡(jiǎn)單:
\#!/usr/bin/env python
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print " [x] Requesting fib(30)"
response = fibonacci_rpc.call(30)
print " [.] Got %r" % (response,)
客戶端代碼稍微有點(diǎn)難懂:
我們的 RPC 服務(wù)已經(jīng)準(zhǔn)備就緒了,現(xiàn)在啟動(dòng)服務(wù)器端:
$ python rpc_server.py
[x] Awaiting RPC requests
運(yùn)行客戶端,請(qǐng)求一個(gè)fibonacci隊(duì)列。
$ python rpc_client.py
[x] Requesting fib(30)
此處呈現(xiàn)的設(shè)計(jì)并不是實(shí)現(xiàn) RPC 服務(wù)的唯一方式,但是他有一些重要的優(yōu)勢(shì):
我們的代碼依舊非常簡(jiǎn)單,而且沒有試圖去解決一些復(fù)雜(但是重要)的問題,如:
如果你想做一些實(shí)驗(yàn),你會(huì)發(fā)現(xiàn) rabbitmq-management plugin 在觀測(cè)隊(duì)列方面是很有用處的。
(完整的 rpc_client.py 和 rpc_server.py 代碼)