在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 教程/ 大數(shù)據(jù)/ 遠程過程調(diào)用(RPC)
介紹
路由(Routing)
為什么需要主題交換機?
發(fā)布/訂閱
工作隊列
遠程過程調(diào)用(RPC)

遠程過程調(diào)用(RPC)

(Python客戶端 —— 使用 pika 0.9.8)

在第二篇教程中我們介紹了如何使用工作隊列(work queue)在多個工作者(woker)中間分發(fā)耗時的任務。

可是如果我們需要將一個函數(shù)運行在遠程計算機上并且等待從那兒獲取結果時,該怎么辦呢?這就是另外的故事了。這種模式通常被稱為遠程過程調(diào)用(Remote Procedure Call)或者 RPC。

這篇教程中,我們會使用 RabbitMQ 來構建一個 RPC 系統(tǒng):包含一個客戶端和一個 RPC 服務器?,F(xiàn)在的情況是,我們沒有一個值得被分發(fā)的足夠耗時的任務,所以接下來,我們會創(chuàng)建一個模擬RPC服務來返回斐波那契數(shù)列。

客戶端接口

為了展示 RPC 服務如何使用,我們創(chuàng)建了一個簡單的客戶端類。它會暴露出一個名為 “call” 的方法用來發(fā)送一個 RPC 請求,并且在收到回應前保持阻塞。

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)  

關于 RPC 的注意事項

盡管 RPC 在計算領域是一個常用模式,但它也經(jīng)常被詬病。當一個問題被拋出的時候,程序員往往意識不到這到底是由本地調(diào)用還是由較慢的 RPC 調(diào)用引起的。同樣的困惑還來自于系統(tǒng)的不可預測性和給調(diào)試工作帶來的不必要的復雜性。跟軟件精簡不同的是,濫用 RPC 會導致不可維護的面條代碼.

考慮到這一點,牢記以下建議:

確保能夠明確的搞清楚哪個函數(shù)是本地調(diào)用的,哪個函數(shù)是遠程調(diào)用的。給你的系統(tǒng)編寫文檔。保持各個組件間的依賴明確。處理錯誤案例。明了客戶端改如何處理 RPC 服務器的宕機和長時間無響應情況。

當對避免使用 RPC 有疑問的時候。如果可以的話,你應該盡量使用異步管道來代替RPC 類的阻塞。結果被異步地推送到下一個計算場景。

回調(diào)隊列

一般來說通過 RabbitMQ 來實現(xiàn) RPC 是很容易的。一個客戶端發(fā)送請求信息,服務器端將其應用到一個回復信息中。為了接收到回復信息,客戶端需要在發(fā)送請求的時候同時發(fā)送一個回調(diào)隊列(callback queue)的地址。我們試試看:

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

# ... and some code to read a response message from the callback_queue ...  

消息屬性

AMQP 協(xié)議給消息預定義了一系列的14個屬性。大多數(shù)屬性很少會用到,除了以下幾個:

  • delivery_mode(投遞模式):將消息標記為持久的(值為2)或暫存的(除了2之外的其他任何值)。第二篇教程里接觸過這個屬性,記得吧?
  • content_type(內(nèi)容類型):用來描述編碼的 mime-type。例如在實際使用中常常使用 application/json 來描述 JOSN 編碼類型。
  • reply_to(回復目標):通常用來命名回調(diào)隊列。
  • correlation_id(關聯(lián)標識):用來將RPC的響應和請求關聯(lián)起來。

關聯(lián)標識

上邊介紹的方法中,我們建議給每一個 RPC 請求新建一個回調(diào)隊列。這不是一個高效的做法,幸好這兒有一個更好的辦法 —— 我們可以為每個客戶端只建立一個獨立的回調(diào)隊列。

這就帶來一個新問題,當此隊列接收到一個響應的時候它無法辨別出這個響應是屬于哪個請求的。correlation_id 就是為了解決這個問題而來的。我們給每個請求設置一個獨一無二的值。稍后,當我們從回調(diào)隊列中接收到一個消息的時候,我們就可以查看這條屬性從而將響應和請求匹配起來。如果我們接手到的消息的correlation_id 是未知的,那就直接銷毀掉它,因為它不屬于我們的任何一條請求。

你也許會問,為什么我們接收到未知消息的時候不拋出一個錯誤,而是要將它忽略掉?這是為了解決服務器端有可能發(fā)生的競爭情況。盡管可能性不大,但RPC服務器還是有可能在已將應答發(fā)送給我們但還未將確認消息發(fā)送給請求的情況下死掉。如果這種情況發(fā)生,RPC 在重啟后會重新處理請求。這就是為什么我們必須在客戶端優(yōu)雅的處理重復響應,同時RPC也需要盡可能保持冪等性。

總結

http://wiki.jikexueyuan.com/project/rabbitmq/images/16.png" alt="" />

我們的 RPC 如此工作:

  • 當客戶端啟動的時候,它創(chuàng)建一個匿名獨享的回調(diào)隊列。
  • 在 RPC 請求中,客戶端發(fā)送帶有兩個屬性的消息:一個是設置回調(diào)隊列的 reply_to 屬性,另一個是設置唯一值的 correlation_id 屬性。
  • 將請求發(fā)送到一個 rpc_queue 隊列中。
  • RPC 工作者(又名:服務器)等待請求發(fā)送到這個隊列中來。當請求出現(xiàn)的時候,它執(zhí)行他的工作并且將帶有執(zhí)行結果的消息發(fā)送給 reply_to 字段指定的隊列。
  • 客戶端等待回調(diào)隊列里的數(shù)據(jù)。當有消息出現(xiàn)的時候,它會檢查 correlation_id 屬性。如果此屬性的值與請求匹配,將它返回給應用。 整合到一起

rpc_server.py代碼:

\#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)

    print " [.] fib(%s)"  % (n,)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print " [x] Awaiting RPC requests"
channel.start_consuming()   

服務器端代碼相當簡單:

  • (4)像往常一樣,我們建立連接,聲明隊列
  • (11)我們聲明我們的 fibonacci 函數(shù),它假設只有合法的正整數(shù)當作輸入。(別指望這個函數(shù)能處理很大的數(shù)值,函數(shù)遞歸你們都懂得...)
  • (19)我們?yōu)?basic_consume 聲明了一個回調(diào)函數(shù),這是 RPC 服務器端的核心。它執(zhí)行實際的操作并且作出響應。
  • (32)或許我們希望能在服務器上多開幾個線程。為了能將負載平均地分攤到多個服務器,我們需要將 prefetch_count 設置好。 rpc_client.py 代碼:
\#!/usr/bin/env python
import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print " [x] Requesting fib(30)"
response = fibonacci_rpc.call(30)
print " [.] Got %r" % (response,)  

客戶端代碼稍微有點難懂:

  • (7)建立連接、通道并且為回復(replies)聲明獨享的回調(diào)隊列。
  • (16)我們訂閱這個回調(diào)隊列,以便接收 RPC 的響應。
  • (18)“on_response” 回調(diào)函數(shù)對每一個響應執(zhí)行一個非常簡單的操作,檢查每一個響應消息的 correlation_id 屬性是否與我們期待的一致,如果一致,將響應結果賦給 self.response,然后跳出 consuming 循環(huán)。
  • (23)接下來,我們定義我們的主要方法 call 方法。它執(zhí)行真正的 RPC 請求。
  • (24)在這個方法中,首先我們生成一個唯一的 correlation_id 值并且保存起來,'on_response' 回調(diào)函數(shù)會用它來獲取符合要求的響應。
  • (25)接下來,我們將帶有 reply_to 和 correlation_id 屬性的消息發(fā)布出去。
  • (32)現(xiàn)在我們可以坐下來,等待正確的響應到來。
  • (33)最后,我們將響應返回給用戶。

我們的 RPC 服務已經(jīng)準備就緒了,現(xiàn)在啟動服務器端:

$ python rpc_server.py
 [x] Awaiting RPC requests  

運行客戶端,請求一個fibonacci隊列。

$ python rpc_client.py
 [x] Requesting fib(30)  

此處呈現(xiàn)的設計并不是實現(xiàn) RPC 服務的唯一方式,但是他有一些重要的優(yōu)勢:

  • 如果 RPC 服務器運行的過慢的時候,你可以通過運行另外一個服務器端輕松擴展它。試試在控制臺中運行第二個 rpc_server.py 。
  • 在客戶端,RPC 請求只發(fā)送或接收一條消息。不需要像 queue_declare 這樣的異步調(diào)用。所以RPC客戶端的單個請求只需要一個網(wǎng)絡往返。

我們的代碼依舊非常簡單,而且沒有試圖去解決一些復雜(但是重要)的問題,如:

  • 當沒有服務器運行時,客戶端如何作出反映。
  • 客戶端是否需要實現(xiàn)類似RPC超時的東西。
  • 如果服務器發(fā)生故障,并且拋出異常,應該被轉發(fā)到客戶端嗎?
  • 在處理前,防止混入無效的信息(例如檢查邊界)

如果你想做一些實驗,你會發(fā)現(xiàn) rabbitmq-management plugin 在觀測隊列方面是很有用處的。

(完整的 rpc_client.pyrpc_server.py 代碼)

上一篇:介紹下一篇:工作隊列