在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 教程/ 數(shù)據(jù)庫/ 編譯客戶端庫
nsqadmin
常見問題
安裝
編譯客戶端庫
特性和擔(dān)保
工具
拓撲模式
設(shè)計
Docker
nsqd
內(nèi)幕
性能
TCP 協(xié)議規(guī)范
nsqlookupd
介紹
產(chǎn)品配置
客戶端庫
快速開始

編譯客戶端庫

NSQ 將一些功能集成到客戶端庫中,以便維持集群的健壯性和性能。

這篇文章試圖列出客戶端庫通常需要完成的功能。因為發(fā)布到 nsqd 非常的瑣碎(僅用 HTTP POST /put 節(jié)點就可),這個文檔主要關(guān)注消費者。

通過規(guī)范,我們希望各種語言實現(xiàn)的時候都能保持一致性。

配置

從高層看,配置相關(guān)的設(shè)計理念是希望系統(tǒng)能支持不同的工作負載,使用相同的默認值能立即可用,并且能將撥號數(shù)最小化。

消費者通過 TCP 連接到 nsqd 實例,訂閱通道(channel) 上的 話題(topic)。每個連接只能訂閱一個話題(topic),因此消費多個話題(topic),必須響應(yīng)的結(jié)構(gòu)化。

使用 nsqlookupd 來發(fā)現(xiàn)是方案之一,所以客戶端庫必須支持消費者直接連接一個或多個 nsqd 實例,或者它可用輪詢一個或多個 nsqlookupd 實例。當(dāng)消費者輪詢 nsqlookupd 的時候,時間間隔必須是可配置的。另外,因為 NSQ 的標(biāo)準(zhǔn)部署是分布式環(huán)境,包含很多消費者和生產(chǎn)者,客戶端庫必須根據(jù)配置值得隨機性自動添加抖動。更多細節(jié)參考發(fā)現(xiàn).

對于消費者來說,在 nsqd 響應(yīng)前能接收到多少消息是非常重要的指標(biāo)。這個管道促進緩存,批處理,異步消息處理。這個值稱為 max_in_flight ,并且它影響了 RDY 狀態(tài)。更多細節(jié)參見 RDY 狀態(tài)。

設(shè)計系統(tǒng)時通常會考慮優(yōu)雅處理失敗,客戶端庫希望能實現(xiàn)失敗消息的重試,并提供邊界參數(shù)來處理每個消息嘗試次數(shù)。更多細節(jié)參見消息處理

當(dāng)消息處理失敗的時候,客戶端庫能自動將消息重新隊列。NSQ 支持使用 REQ 命令發(fā)送延遲。客戶端庫需要能提供延遲的初始化值(第一次失敗時),以及重新隊列失敗該如何改變。更多細節(jié)參見 Backoff.

最重要的時,客戶端庫必須支持消息處理的回調(diào)函數(shù)配置。這些回調(diào)函數(shù)必須簡單,通常都支持一個參數(shù)(消息對象的實例)。

發(fā)現(xiàn)

nsqlookupd 是 NSQ 的重要組成部分,它為消費者發(fā)現(xiàn)服務(wù)提供來定位 nsqd ,它在運行時提供一個指定話題(topic)。

雖然使用 nsqlookupd 能大幅減少配置數(shù)目,但是需要維持并放大一個巨大的分布式 NSQ 集群。

當(dāng)消費者使用 nsqlookupd 來發(fā)現(xiàn)時,客戶端庫必須管理輪詢所有 nsqlookupd 實例的進程,最新的 nsqd 組合以問題形式提供了話題(topic),并且管理到這些 nsqd 的連接。

查詢一個 nsqlookupd 實例非常的簡單。執(zhí)行一個 HTTP 請求,使用消費者試圖發(fā)現(xiàn)的話題(topic) 作為查詢參數(shù)來查找節(jié)點(例如/lookup?topic=clicks). 響應(yīng)體是 JSON:

{
    "status_code": 200,
    "status_txt": "OK",
    "data": {
        "channels": ["archive", "science", "metrics"],
        "producers": [
            {
                "broadcast_address": "clicksapi01.routable.domain.net",
                "hostname": "clicksapi01.domain.net",
                "tcp_port": 4150,
                "http_port": 4151,
                "version": "0.2.18"
            },
            {
                "broadcast_address": "clicksapi02.routable.domain.net",
                "hostname": "clicksapi02.domain.net",
                "tcp_port": 4150,
                "http_port": 4151,
                "version": "0.2.18"
            }
        ]
    }
}

broadcast_addresstcp_port 必須用來連接 nsqd。 因為從設(shè)計上來說 nsqlookupd 實例不會分享或協(xié)調(diào)他們的數(shù)據(jù),客戶端庫必須聯(lián)合它接收到得所有 nsqlookupd 查詢列表來建立 nsqd 最終列表。使用 broadcast_address:tcp_port 作為這個聯(lián)合的唯一 KEY。

必須用周期性的計時器來重復(fù)的輪詢 nsqlookupd 的配置,這樣消費者能自動的發(fā)現(xiàn)新的 nsqd。客戶端庫必須自動的初始化到所有新發(fā)現(xiàn)的實例的連接。

當(dāng)客戶端庫開始執(zhí)行的時候,它必須通過踢開配置 nsqlookupd 實例的一組請求,來引導(dǎo)輪詢。

連接處理

一旦消費者有一個 nsqd 可以連接(通過發(fā)現(xiàn)或手工配置), 它就必須打開一個 TCP 連接到 broadcast_address:port。一個單獨的 TCP 連接必須能讓消費者可以訂閱到每個 nsqd 的 話題(topic)。

當(dāng)連接到一個 nsqd 實例時,客戶端庫必須發(fā)送以下數(shù)據(jù),順序是:

  1. 魔術(shù)標(biāo)識符
  2. 一個 IDENTIFY 命令 (和負載) 和讀/驗證響應(yīng)
  3. 一個 SUB 命令 (指定需要的話題(topic)) 和讀/驗證響應(yīng)
  4. 一個初始化 RDY 值 1

(低級別的細節(jié)參見 spec)

重新連接

客戶端庫必須通過以下方法自動重新連接:

  • 如果消費者通過特定的 nsqd 列表指定,重新連接必須通過延遲重試來處理。(列如,8s, 16s, 32s, 等等, 到最大值后重試)。

  • 如果消費者通過 nsqlookupd 來發(fā)現(xiàn)實例,必須通過輪詢間隔來自動處理重新連接(例如,如果消費者斷開和 nsqd 的連接,客戶端庫僅在隨后的 nsqlookupd 輪詢發(fā)現(xiàn)的實例后重新連接)。這能保證消費者了解 nsqd。

特性協(xié)商

IDENTIFY 命令可以用來設(shè)置 nsqd 端的元數(shù)據(jù),修改客戶端設(shè)置,并特性協(xié)商,它滿足亮點:

  1. 某些情況下,客戶端可能會修改 nsqd 的交互方式(比如,修改客戶端的心跳間隔,并允許壓縮,TLS,輸出緩存,等等-完整列表參見 spec
  2. nsqd 使用 JSON payload 來響應(yīng) IDENTIFY 命令,它包含了重要的服務(wù)端配置值,客戶端和之交互時必須遵守。

連接后,根據(jù)用戶的配置, 客戶端庫必須發(fā)送一個 IDENTIFY命令, 它的內(nèi)容是 JSON payload:

{
    "client_id": "metrics_increment",
    "hostname": "app01.bitly.net",
    "heartbeat_interval": 30000,
    "feature_negotiation": true
}

feature_negotiation 位表示客戶端可以接受返回值是 JSON payload。 client_idhostname 是隨意的文本字段,nsqd (和 nsqadmin)會用來區(qū)別客戶端. heartbeat_interval 配置每個客戶端的心跳間隔。

nsqd 必須響應(yīng) OK,如果它不支持特性協(xié)商 (nsqd``v0.2.20+引入), 否則:

{
    "max_rdy_count": 2500,
    "version": "0.2.20-alpha"
}

數(shù)據(jù)流和心跳

一旦消費者處于訂閱狀態(tài),NSQ 協(xié)議里的數(shù)據(jù)流時異步的。對于消費者來說,這就是說如果想建立一個健壯并高效的客戶端庫,就必須使用異步的網(wǎng)絡(luò) IO 循環(huán)和/或“線程”(線程表示 OS 級別的線程和用戶空間(userland)的進程,比如協(xié)同程序(coroutines))。

另外,期望客戶端能響應(yīng)它們連接到的 nsqd 實例的周期性心跳。通常這個周期是 30 秒。客戶端可以使用任何命令響應(yīng),不過通常方便起見,使用 NOP 響應(yīng)心跳。更多細節(jié)參見 protocol spec。

“進程”必須專注于讀取 TCP socket 的數(shù)據(jù),解包幀數(shù)據(jù),并執(zhí)行多路邏輯來傳輸。這也是處理心跳最佳點。從最低級別看,讀取協(xié)議包括以下步驟:

  1. 讀取 4 字節(jié) big endian uint32 大小
  2. 讀取字節(jié)大小數(shù)據(jù)
  3. 解包數(shù)據(jù)
  4. ...
  5. profit
  6. goto 1

一個和錯誤相關(guān)小插曲

根據(jù)系統(tǒng)的異步特性,會采用更多的狀態(tài)來追蹤相關(guān)協(xié)議的由命令產(chǎn)生的錯誤。我們會采用“快速錯誤”("fail fast")方法,所以大量協(xié)議級別錯誤處理都是致命的。這意味著如果客戶端發(fā)送一個無效命令(或者自己是無效狀態(tài)),通過強制關(guān)閉連接(如果可能,發(fā)送一個錯誤給客戶端),它連接到的 nsqd 實例將會保護自己(和系統(tǒng))。和之前提到的連接處理相配合,使得系統(tǒng)更加健壯和穩(wěn)定。

僅有的幾個非致命錯誤是:

  • E_FIN_FAILED - FIN 命令, 無效的消息 ID
  • E_REQ_FAILED - REQ 命令 無效的消息 ID
  • E_TOUCH_FAILED - TOUCH 命令 無效的消息 ID

因為這些錯誤通常和時間有關(guān),所以不當(dāng)做致命錯誤。這些錯誤通常發(fā)生在 nsqd 端消息超時,重新隊列時,和投遞到其他消費者時。原先的接受者不再允許響應(yīng)這個消息。

消息處理

當(dāng) IO 循環(huán)解包包含消息的幀數(shù)據(jù)時,它必須路由這個消息給配置處理函數(shù)來處理。

發(fā)送 nsqd,在配置消息超時時希望收到回復(fù)(默認:60秒)??赡苡幸韵聢鼍埃?/p>

  1. 處理函數(shù)表示消息已經(jīng)成功處理
  2. 處理函數(shù)表示消息正處理成功
  3. 處理函數(shù)表示需要更多的時間來處理消息
  4. in-flight 超時,并且 nsqd 自動重新隊列消息

前 3 個情況,客戶端庫必須發(fā)送合適消費者方面的命令 (FINREQ,和 TOUCH)。

FIN 命令最簡單。它告訴 nsqd 它能安全的拋棄消息。FIN 也能拋棄那些你不想處理或重試的消息。

REQ 命令告訴 nsqd,消息必須重新隊列(可選參數(shù)指定了重試的次數(shù))。如果消費者沒有指定可選參數(shù),客戶端庫必須自動算出相關(guān)聯(lián)的消息處理的時長(通常設(shè)置為多倍,這樣效率更高)??蛻舳藥毂仨殥仐壋^最多重試次數(shù)的消息。當(dāng)它發(fā)生的時候,必須執(zhí)行用戶提供的回調(diào)來通知,并運行特定的回調(diào)。

如果消息處理函數(shù)需要的時間超過配置的超時時間,可以用 TOUCH 命令來重置 nsqd 端的計時器??梢灾貜?fù)這個動作,直到消息 FINREQ,或發(fā)送 nsqd 的配置屬性 max_msg_timeout??蛻舳藥觳荒茏詣?TOUCH 代表消費者。

如果發(fā)送 nsqd 實例沒有接收到響應(yīng),消息將會超時,并會自動重新隊列來投遞到可用的消費者。

最后,每個消息的屬性是嘗試次數(shù)??蛻舳藥毂仨毐容^這個值和配置的最大值,并且拋棄已經(jīng)超過這個值得消息。當(dāng)消息已經(jīng)拋棄的時候,需要觸發(fā)回調(diào)。通常這個回調(diào)的實現(xiàn)必須包括寫入磁盤,日志等等。用戶必須能重寫默認的處理函數(shù)。

RDY 狀態(tài)

因為消息是從 nsqd 推送到消費者那,我們必須擁有一個方法來管理數(shù)據(jù)流,而不僅依賴于低級別的 TCP 語法。消費者的 RDY 狀態(tài)是 NSQ 的流控制機制。

配置中列出的內(nèi)容,通過 max_in_flight 配置消費者。這是并行的并且性能 knob。比如一些 downstream 系統(tǒng)可以更加容易進行消息批處理,并對更高級的 max-in-flight 有利。

當(dāng)消費者連接到nsqd (并且訂閱) ,RDY 初始化狀態(tài)為 0。不會投遞任何消息。

客戶端庫擁有很少的責(zé)任:

  1. 引導(dǎo)并最終分布配置 max_in_flight 到所有的連接。
  2. 永遠不允許匯集所有連接 RDY 的和(total_rdy_count),為超過 max_in_flight 的配置。
  3. 永遠不要超過每個連接 nsqd 配置的 max_rdy_count。
  4. 暴露一個 API 方法給值得信賴的消息流。

1. 引導(dǎo)和分布

為連接選擇 RDY 值,需要考慮的因素很少(最終分布為 max_in_flight):

  • 連接 # 是動態(tài)的,通常并不知道次數(shù)(例如,當(dāng)通過 nsqlookupd 發(fā)現(xiàn) nsqd)。
  • max_in_flight 可能會小于你的連接數(shù)

為了開始消息流,客戶端庫必須發(fā)送一個初始的 RDY 值。因為最終的連接數(shù)并不知道(通常從 '1' 開始),所以客戶端庫必能公平對待每個連接。

另外,每個消息處理后,客戶端庫必須評估什么時候更新 RDY 狀態(tài)。如果當(dāng)前值是 '0',或者低于最后發(fā)送的值的 25% 必須觸發(fā)更新。

客戶端庫必須一直嘗試最終分布 RDY 值到所有的連接。

通常來說,它可以通過 max_in_flight / num_conns 實現(xiàn)。

然而,當(dāng) max_in_flight < num_conns 這個簡單的公式無效的時候??蛻舳藥毂仨殘?zhí)行一個動態(tài)的運行評估,自從通過之前的連接接收到得消息后,連接的 nsqd '活躍度'的時間。當(dāng)配置到期后,他必須重新分布,不論 RDY 值是否對于新的 nsqd 有效。這么做,你能保證你可以通過消息找到 nsqd。這些會有延遲的影響。

2. 維護 max_in_flight

客戶端庫必須維護指定消費者的消息 in flight 的最大值。尤其,匯集每個連接的 RDY 值永遠不能超過配置的 max_in_flight 值。

底下的 Python 代碼,它指出 RDY 值是否對于指定的連接有效。

def send_ready(reader, conn, count):
    if (reader.total_ready_count + count) > reader.max_in_flight:
        return

    conn.send_ready(count)
    conn.rdy_count = count
    reader.total_ready_count += count

3. nsqd 最大 RDY 值

每個 nsqd 通過 --max-rdy-count 配置,如果消費者發(fā)送的 RDY 值超過了可接受的范圍,它的連接將強制關(guān)閉。為了向后兼容,這個值必須假設(shè)為 2500 ,如果 nsqd 實例不能支持特性協(xié)商。

4. 消息流 Starvation

最終,客戶端庫必須提供一個 API 方法,來表示消息流 starvation。對于消費者(消費者處理函數(shù))來說,簡單比較 in-flight 的消息數(shù)和 max_in_flight 值,來決定是否”批處理“不太合適。有兩種情況有問題:

  1. 當(dāng)消費者配置 max_in_flight > 1, 根據(jù)變量 num_conns,max_in_flightnum_conns 除不盡。因為你永遠不能超過max_in_flight,你必須降低,并且在 RDY 值少于 max_in_flight 時結(jié)束。
  2. 如果僅僅 nsqd 的子集有消息,因為even distribution 的 RDY 預(yù)期值, 這些活躍 nsqd 僅有 max_in_flight 的片段。

以上兩種情況,消費者實際上永遠不會接受消息的 max_in_flight。因此,客戶端庫必須暴露一個方法 is_starved,表示任何連接是否 starved,如下:

def is_starved(conns):
    for c in conns:
        # the constant 0.85 is designed to *anticipate* starvation rather than wait for it
        if c.in_flight > 0 and c.in_flight >= (c.last_ready * 0.85):
            return True
    return False

is_starved 方法必須由消息處理函數(shù)使用,來發(fā)現(xiàn)什么時候處理批量消息。

Backoff

消息處理失敗的時候如何處理是一個非常復(fù)雜的問題。消息處理章節(jié)介紹了客戶端庫動作,它會處理和時間相關(guān)的失敗的消息。其他的問題是是否減少吞吐量。這兩個功能對于整個系統(tǒng)的穩(wěn)定性至關(guān)重要。

通過減慢處理的速率,或者 "backing off",消費者允許 downstream 系統(tǒng)回收傳輸失敗。然而這個行為必須是可配置的,因為不是什么時候都能稱心如意,這種情況下延遲必須優(yōu)先處理。

Backoff 必須通過發(fā)送 RDY 0 到合適的 nsqd 來實現(xiàn),停止消息流。這個狀態(tài)的時長通過重試的失敗來計算。處理成功會減少這個時長,直到 reader 不再是 backoff 狀態(tài)。

當(dāng) reader 是 backoff 狀態(tài)時,超時后,客戶端庫必須僅發(fā)送過 RDY 1 ,而不是 max_in_flight。 在返回完整的 throttle 前,這是有效的 "tests the waters"。另外,backoff 超時時,客戶端庫必須忽略任何和計算 backoff 時間成功或者失敗結(jié)果。(比如,每次超時時它僅信任一個結(jié)果)

http://wiki.jikexueyuan.com/project/nsq-guide/images/tumblr_inline_mmjev3stkE1qz4rgp.png" alt="nsq_客戶端_flow" />

加密/壓縮

NSQ 支持加密和/或壓縮特性協(xié)商,通過IDENTIFY 命令。 TLS 用來加密。 Snappy 和 DEFLATE 都支持壓縮。Snappy 可作為第三方庫使用,但是基本所有的語言都支持 DEFLATE。

收到 IDENTIFY 響應(yīng)時,并且你通過 tls_v1 標(biāo)志位請求 TLS,你得到的東西和以下內(nèi)容類似:

{
    "deflate": false,
    "deflate_level": 0,
    "max_deflate_level": 6,
    "max_msg_timeout": 900000,
    "max_rdy_count": 2500,
    "msg_timeout": 60000,
    "sample_rate": 0,
    "snappy": true,
    "tls_v1": true,
    "version": "0.2.28"
}

確認 tls_v1true 后(意味著服務(wù)器支持 TLS),在接受和發(fā)送任何消息前,你需要初始化 TLS 握手(例如,Python 使用 ssl.wrap_socket 表示完成)。TLS 握手成功后,你必須立即讀取一個 NSQ 加密的 OK 響應(yīng)。

如果你想壓縮,可以設(shè)置 snappydeflatetrue ,并且使用合適壓縮(解壓縮)調(diào)用讀寫。同樣的你必須立即讀取一個 NSQ 壓縮的 OK 響應(yīng)。

這些壓縮特性是互斥的。

你不能阻止緩存直到加密/壓縮協(xié)商完成,或者確保小心的讀取到內(nèi)存。

匯總

分布式系統(tǒng)非常有意思。

不同的 NSQ 集群部門間交互在一個平臺上,它健壯,高性能,并且穩(wěn)定。希望您能這篇文章里了解到客戶端是多么重要。

這些細節(jié)的實現(xiàn),我們將 pynsqgo-nsq 作為代碼基礎(chǔ)。pynsq 可以切割為 2 個部分:

  • Message - 高級別的消息對象,它暴露了狀態(tài)方法,來響應(yīng)nsqd(FIN,REQTOUCH等等),同時元數(shù)據(jù)包含目的和時間戳。

  • Connection - 高級別的封裝,包含 TCP 連接到一個指定的 nsqd,它包含 flight 消息,RDY 狀態(tài),協(xié)商特性,和不同時間。

  • 消費者 - 和用戶打交道的 API,它處理發(fā)現(xiàn),創(chuàng)建連接(和訂閱),引導(dǎo)和管理 RDY 狀態(tài),解析收到的數(shù)據(jù),創(chuàng)建消息對象,和分發(fā)消息給處理函數(shù)。

  • Producer -和用戶打交道的 API,處理發(fā)布。

我們很高興能幫助任何對編寫客戶端庫有興趣的人。我們希望大家能加入到社區(qū),擴展目前已經(jīng)存在的庫。社區(qū)已經(jīng)開源很多客戶端庫。

上一篇:特性和擔(dān)保下一篇:客戶端庫