在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 教程/ 大數(shù)據(jù)/ 遠(yuǎn)程過程調(diào)用(RPC)
介紹
路由(Routing)
為什么需要主題交換機(jī)?
發(fā)布/訂閱
工作隊(duì)列
遠(yuǎn)程過程調(diào)用(RPC)

遠(yuǎn)程過程調(diào)用(RPC)

(Python客戶端 —— 使用 pika 0.9.8)

在第二篇教程中我們介紹了如何使用工作隊(duì)列(work queue)在多個(gè)工作者(woker)中間分發(fā)耗時(shí)的任務(wù)。

可是如果我們需要將一個(gè)函數(shù)運(yùn)行在遠(yuǎn)程計(jì)算機(jī)上并且等待從那兒獲取結(jié)果時(shí),該怎么辦呢?這就是另外的故事了。這種模式通常被稱為遠(yuǎn)程過程調(diào)用(Remote Procedure Call)或者 RPC。

這篇教程中,我們會(huì)使用 RabbitMQ 來構(gòu)建一個(gè) RPC 系統(tǒng):包含一個(gè)客戶端和一個(gè) RPC 服務(wù)器。現(xiàn)在的情況是,我們沒有一個(gè)值得被分發(fā)的足夠耗時(shí)的任務(wù),所以接下來,我們會(huì)創(chuàng)建一個(gè)模擬RPC服務(wù)來返回斐波那契數(shù)列。

客戶端接口

為了展示 RPC 服務(wù)如何使用,我們創(chuàng)建了一個(gè)簡(jiǎn)單的客戶端類。它會(huì)暴露出一個(gè)名為 “call” 的方法用來發(fā)送一個(gè) RPC 請(qǐng)求,并且在收到回應(yīng)前保持阻塞。

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)  

關(guān)于 RPC 的注意事項(xiàng)

盡管 RPC 在計(jì)算領(lǐng)域是一個(gè)常用模式,但它也經(jīng)常被詬病。當(dāng)一個(gè)問題被拋出的時(shí)候,程序員往往意識(shí)不到這到底是由本地調(diào)用還是由較慢的 RPC 調(diào)用引起的。同樣的困惑還來自于系統(tǒng)的不可預(yù)測(cè)性和給調(diào)試工作帶來的不必要的復(fù)雜性。跟軟件精簡(jiǎn)不同的是,濫用 RPC 會(huì)導(dǎo)致不可維護(hù)的面條代碼.

考慮到這一點(diǎn),牢記以下建議:

確保能夠明確的搞清楚哪個(gè)函數(shù)是本地調(diào)用的,哪個(gè)函數(shù)是遠(yuǎn)程調(diào)用的。給你的系統(tǒng)編寫文檔。保持各個(gè)組件間的依賴明確。處理錯(cuò)誤案例。明了客戶端改如何處理 RPC 服務(wù)器的宕機(jī)和長(zhǎng)時(shí)間無響應(yīng)情況。

當(dāng)對(duì)避免使用 RPC 有疑問的時(shí)候。如果可以的話,你應(yīng)該盡量使用異步管道來代替RPC 類的阻塞。結(jié)果被異步地推送到下一個(gè)計(jì)算場(chǎng)景。

回調(diào)隊(duì)列

一般來說通過 RabbitMQ 來實(shí)現(xiàn) RPC 是很容易的。一個(gè)客戶端發(fā)送請(qǐng)求信息,服務(wù)器端將其應(yīng)用到一個(gè)回復(fù)信息中。為了接收到回復(fù)信息,客戶端需要在發(fā)送請(qǐng)求的時(shí)候同時(shí)發(fā)送一個(gè)回調(diào)隊(duì)列(callback queue)的地址。我們?cè)囋嚳矗?/p>

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

# ... and some code to read a response message from the callback_queue ...  

消息屬性

AMQP 協(xié)議給消息預(yù)定義了一系列的14個(gè)屬性。大多數(shù)屬性很少會(huì)用到,除了以下幾個(gè):

  • delivery_mode(投遞模式):將消息標(biāo)記為持久的(值為2)或暫存的(除了2之外的其他任何值)。第二篇教程里接觸過這個(gè)屬性,記得吧?
  • content_type(內(nèi)容類型):用來描述編碼的 mime-type。例如在實(shí)際使用中常常使用 application/json 來描述 JOSN 編碼類型。
  • reply_to(回復(fù)目標(biāo)):通常用來命名回調(diào)隊(duì)列。
  • correlation_id(關(guān)聯(lián)標(biāo)識(shí)):用來將RPC的響應(yīng)和請(qǐng)求關(guān)聯(lián)起來。

關(guān)聯(lián)標(biāo)識(shí)

上邊介紹的方法中,我們建議給每一個(gè) RPC 請(qǐng)求新建一個(gè)回調(diào)隊(duì)列。這不是一個(gè)高效的做法,幸好這兒有一個(gè)更好的辦法 —— 我們可以為每個(gè)客戶端只建立一個(gè)獨(dú)立的回調(diào)隊(duì)列。

這就帶來一個(gè)新問題,當(dāng)此隊(duì)列接收到一個(gè)響應(yīng)的時(shí)候它無法辨別出這個(gè)響應(yīng)是屬于哪個(gè)請(qǐng)求的。correlation_id 就是為了解決這個(gè)問題而來的。我們給每個(gè)請(qǐng)求設(shè)置一個(gè)獨(dú)一無二的值。稍后,當(dāng)我們從回調(diào)隊(duì)列中接收到一個(gè)消息的時(shí)候,我們就可以查看這條屬性從而將響應(yīng)和請(qǐng)求匹配起來。如果我們接手到的消息的correlation_id 是未知的,那就直接銷毀掉它,因?yàn)樗粚儆谖覀兊娜魏我粭l請(qǐng)求。

你也許會(huì)問,為什么我們接收到未知消息的時(shí)候不拋出一個(gè)錯(cuò)誤,而是要將它忽略掉?這是為了解決服務(wù)器端有可能發(fā)生的競(jìng)爭(zhēng)情況。盡管可能性不大,但RPC服務(wù)器還是有可能在已將應(yīng)答發(fā)送給我們但還未將確認(rèn)消息發(fā)送給請(qǐng)求的情況下死掉。如果這種情況發(fā)生,RPC 在重啟后會(huì)重新處理請(qǐng)求。這就是為什么我們必須在客戶端優(yōu)雅的處理重復(fù)響應(yīng),同時(shí)RPC也需要盡可能保持冪等性。

總結(jié)

http://wiki.jikexueyuan.com/project/rabbitmq/images/16.png" alt="" />

我們的 RPC 如此工作:

  • 當(dāng)客戶端啟動(dòng)的時(shí)候,它創(chuàng)建一個(gè)匿名獨(dú)享的回調(diào)隊(duì)列。
  • 在 RPC 請(qǐng)求中,客戶端發(fā)送帶有兩個(gè)屬性的消息:一個(gè)是設(shè)置回調(diào)隊(duì)列的 reply_to 屬性,另一個(gè)是設(shè)置唯一值的 correlation_id 屬性。
  • 將請(qǐng)求發(fā)送到一個(gè) rpc_queue 隊(duì)列中。
  • RPC 工作者(又名:服務(wù)器)等待請(qǐng)求發(fā)送到這個(gè)隊(duì)列中來。當(dāng)請(qǐng)求出現(xiàn)的時(shí)候,它執(zhí)行他的工作并且將帶有執(zhí)行結(jié)果的消息發(fā)送給 reply_to 字段指定的隊(duì)列。
  • 客戶端等待回調(diào)隊(duì)列里的數(shù)據(jù)。當(dāng)有消息出現(xiàn)的時(shí)候,它會(huì)檢查 correlation_id 屬性。如果此屬性的值與請(qǐng)求匹配,將它返回給應(yīng)用。 整合到一起

rpc_server.py代碼:

\#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)

    print " [.] fib(%s)"  % (n,)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print " [x] Awaiting RPC requests"
channel.start_consuming()   

服務(wù)器端代碼相當(dāng)簡(jiǎn)單:

  • (4)像往常一樣,我們建立連接,聲明隊(duì)列
  • (11)我們聲明我們的 fibonacci 函數(shù),它假設(shè)只有合法的正整數(shù)當(dāng)作輸入。(別指望這個(gè)函數(shù)能處理很大的數(shù)值,函數(shù)遞歸你們都懂得...)
  • (19)我們?yōu)?basic_consume 聲明了一個(gè)回調(diào)函數(shù),這是 RPC 服務(wù)器端的核心。它執(zhí)行實(shí)際的操作并且作出響應(yīng)。
  • (32)或許我們希望能在服務(wù)器上多開幾個(gè)線程。為了能將負(fù)載平均地分?jǐn)偟蕉鄠€(gè)服務(wù)器,我們需要將 prefetch_count 設(shè)置好。 rpc_client.py 代碼:
\#!/usr/bin/env python
import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print " [x] Requesting fib(30)"
response = fibonacci_rpc.call(30)
print " [.] Got %r" % (response,)  

客戶端代碼稍微有點(diǎn)難懂:

  • (7)建立連接、通道并且為回復(fù)(replies)聲明獨(dú)享的回調(diào)隊(duì)列。
  • (16)我們訂閱這個(gè)回調(diào)隊(duì)列,以便接收 RPC 的響應(yīng)。
  • (18)“on_response” 回調(diào)函數(shù)對(duì)每一個(gè)響應(yīng)執(zhí)行一個(gè)非常簡(jiǎn)單的操作,檢查每一個(gè)響應(yīng)消息的 correlation_id 屬性是否與我們期待的一致,如果一致,將響應(yīng)結(jié)果賦給 self.response,然后跳出 consuming 循環(huán)。
  • (23)接下來,我們定義我們的主要方法 call 方法。它執(zhí)行真正的 RPC 請(qǐng)求。
  • (24)在這個(gè)方法中,首先我們生成一個(gè)唯一的 correlation_id 值并且保存起來,'on_response' 回調(diào)函數(shù)會(huì)用它來獲取符合要求的響應(yīng)。
  • (25)接下來,我們將帶有 reply_to 和 correlation_id 屬性的消息發(fā)布出去。
  • (32)現(xiàn)在我們可以坐下來,等待正確的響應(yīng)到來。
  • (33)最后,我們將響應(yīng)返回給用戶。

我們的 RPC 服務(wù)已經(jīng)準(zhǔn)備就緒了,現(xiàn)在啟動(dòng)服務(wù)器端:

$ python rpc_server.py
 [x] Awaiting RPC requests  

運(yùn)行客戶端,請(qǐng)求一個(gè)fibonacci隊(duì)列。

$ python rpc_client.py
 [x] Requesting fib(30)  

此處呈現(xiàn)的設(shè)計(jì)并不是實(shí)現(xiàn) RPC 服務(wù)的唯一方式,但是他有一些重要的優(yōu)勢(shì):

  • 如果 RPC 服務(wù)器運(yùn)行的過慢的時(shí)候,你可以通過運(yùn)行另外一個(gè)服務(wù)器端輕松擴(kuò)展它。試試在控制臺(tái)中運(yùn)行第二個(gè) rpc_server.py 。
  • 在客戶端,RPC 請(qǐng)求只發(fā)送或接收一條消息。不需要像 queue_declare 這樣的異步調(diào)用。所以RPC客戶端的單個(gè)請(qǐng)求只需要一個(gè)網(wǎng)絡(luò)往返。

我們的代碼依舊非常簡(jiǎn)單,而且沒有試圖去解決一些復(fù)雜(但是重要)的問題,如:

  • 當(dāng)沒有服務(wù)器運(yùn)行時(shí),客戶端如何作出反映。
  • 客戶端是否需要實(shí)現(xiàn)類似RPC超時(shí)的東西。
  • 如果服務(wù)器發(fā)生故障,并且拋出異常,應(yīng)該被轉(zhuǎn)發(fā)到客戶端嗎?
  • 在處理前,防止混入無效的信息(例如檢查邊界)

如果你想做一些實(shí)驗(yàn),你會(huì)發(fā)現(xiàn) rabbitmq-management plugin 在觀測(cè)隊(duì)列方面是很有用處的。

(完整的 rpc_client.pyrpc_server.py 代碼)

上一篇:介紹下一篇:工作隊(duì)列