(Python客戶端 —— 使用 pika 0.9.8)
在第二篇教程中我們介紹了如何使用工作隊列(work queue)在多個工作者(woker)中間分發(fā)耗時的任務。
可是如果我們需要將一個函數(shù)運行在遠程計算機上并且等待從那兒獲取結果時,該怎么辦呢?這就是另外的故事了。這種模式通常被稱為遠程過程調(diào)用(Remote Procedure Call)或者 RPC。
這篇教程中,我們會使用 RabbitMQ 來構建一個 RPC 系統(tǒng):包含一個客戶端和一個 RPC 服務器?,F(xiàn)在的情況是,我們沒有一個值得被分發(fā)的足夠耗時的任務,所以接下來,我們會創(chuàng)建一個模擬RPC服務來返回斐波那契數(shù)列。
為了展示 RPC 服務如何使用,我們創(chuàng)建了一個簡單的客戶端類。它會暴露出一個名為 “call” 的方法用來發(fā)送一個 RPC 請求,并且在收到回應前保持阻塞。
fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)
盡管 RPC 在計算領域是一個常用模式,但它也經(jīng)常被詬病。當一個問題被拋出的時候,程序員往往意識不到這到底是由本地調(diào)用還是由較慢的 RPC 調(diào)用引起的。同樣的困惑還來自于系統(tǒng)的不可預測性和給調(diào)試工作帶來的不必要的復雜性。跟軟件精簡不同的是,濫用 RPC 會導致不可維護的面條代碼.
考慮到這一點,牢記以下建議:
確保能夠明確的搞清楚哪個函數(shù)是本地調(diào)用的,哪個函數(shù)是遠程調(diào)用的。給你的系統(tǒng)編寫文檔。保持各個組件間的依賴明確。處理錯誤案例。明了客戶端改如何處理 RPC 服務器的宕機和長時間無響應情況。
當對避免使用 RPC 有疑問的時候。如果可以的話,你應該盡量使用異步管道來代替RPC 類的阻塞。結果被異步地推送到下一個計算場景。
一般來說通過 RabbitMQ 來實現(xiàn) RPC 是很容易的。一個客戶端發(fā)送請求信息,服務器端將其應用到一個回復信息中。為了接收到回復信息,客戶端需要在發(fā)送請求的時候同時發(fā)送一個回調(diào)隊列(callback queue)的地址。我們試試看:
result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue
channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = callback_queue,
),
body=request)
# ... and some code to read a response message from the callback_queue ...
AMQP 協(xié)議給消息預定義了一系列的14個屬性。大多數(shù)屬性很少會用到,除了以下幾個:
上邊介紹的方法中,我們建議給每一個 RPC 請求新建一個回調(diào)隊列。這不是一個高效的做法,幸好這兒有一個更好的辦法 —— 我們可以為每個客戶端只建立一個獨立的回調(diào)隊列。
這就帶來一個新問題,當此隊列接收到一個響應的時候它無法辨別出這個響應是屬于哪個請求的。correlation_id 就是為了解決這個問題而來的。我們給每個請求設置一個獨一無二的值。稍后,當我們從回調(diào)隊列中接收到一個消息的時候,我們就可以查看這條屬性從而將響應和請求匹配起來。如果我們接手到的消息的correlation_id 是未知的,那就直接銷毀掉它,因為它不屬于我們的任何一條請求。
你也許會問,為什么我們接收到未知消息的時候不拋出一個錯誤,而是要將它忽略掉?這是為了解決服務器端有可能發(fā)生的競爭情況。盡管可能性不大,但RPC服務器還是有可能在已將應答發(fā)送給我們但還未將確認消息發(fā)送給請求的情況下死掉。如果這種情況發(fā)生,RPC 在重啟后會重新處理請求。這就是為什么我們必須在客戶端優(yōu)雅的處理重復響應,同時RPC也需要盡可能保持冪等性。
http://wiki.jikexueyuan.com/project/rabbitmq/images/16.png" alt="" />
我們的 RPC 如此工作:
rpc_server.py代碼:
\#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print " [.] fib(%s)" % (n,)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print " [x] Awaiting RPC requests"
channel.start_consuming()
服務器端代碼相當簡單:
\#!/usr/bin/env python
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print " [x] Requesting fib(30)"
response = fibonacci_rpc.call(30)
print " [.] Got %r" % (response,)
客戶端代碼稍微有點難懂:
我們的 RPC 服務已經(jīng)準備就緒了,現(xiàn)在啟動服務器端:
$ python rpc_server.py
[x] Awaiting RPC requests
運行客戶端,請求一個fibonacci隊列。
$ python rpc_client.py
[x] Requesting fib(30)
此處呈現(xiàn)的設計并不是實現(xiàn) RPC 服務的唯一方式,但是他有一些重要的優(yōu)勢:
我們的代碼依舊非常簡單,而且沒有試圖去解決一些復雜(但是重要)的問題,如:
如果你想做一些實驗,你會發(fā)現(xiàn) rabbitmq-management plugin 在觀測隊列方面是很有用處的。
(完整的 rpc_client.py 和 rpc_server.py 代碼)