NSQ 將一些功能集成到客戶端庫中,以便維持集群的健壯性和性能。
這篇文章試圖列出客戶端庫通常需要完成的功能。因為發(fā)布到 nsqd 非常的瑣碎(僅用 HTTP POST /put 節(jié)點就可),這個文檔主要關(guān)注消費者。
通過規(guī)范,我們希望各種語言實現(xiàn)的時候都能保持一致性。
從高層看,配置相關(guān)的設(shè)計理念是希望系統(tǒng)能支持不同的工作負載,使用相同的默認值能立即可用,并且能將撥號數(shù)最小化。
消費者通過 TCP 連接到 nsqd 實例,訂閱通道(channel) 上的 話題(topic)。每個連接只能訂閱一個話題(topic),因此消費多個話題(topic),必須響應(yīng)的結(jié)構(gòu)化。
使用 nsqlookupd 來發(fā)現(xiàn)是方案之一,所以客戶端庫必須支持消費者直接連接一個或多個 nsqd 實例,或者它可用輪詢一個或多個 nsqlookupd 實例。當(dāng)消費者輪詢 nsqlookupd 的時候,時間間隔必須是可配置的。另外,因為 NSQ 的標(biāo)準(zhǔn)部署是分布式環(huán)境,包含很多消費者和生產(chǎn)者,客戶端庫必須根據(jù)配置值得隨機性自動添加抖動。更多細節(jié)參考發(fā)現(xiàn).
對于消費者來說,在 nsqd 響應(yīng)前能接收到多少消息是非常重要的指標(biāo)。這個管道促進緩存,批處理,異步消息處理。這個值稱為 max_in_flight ,并且它影響了 RDY 狀態(tài)。更多細節(jié)參見 RDY 狀態(tài)。
設(shè)計系統(tǒng)時通常會考慮優(yōu)雅處理失敗,客戶端庫希望能實現(xiàn)失敗消息的重試,并提供邊界參數(shù)來處理每個消息嘗試次數(shù)。更多細節(jié)參見消息處理。
當(dāng)消息處理失敗的時候,客戶端庫能自動將消息重新隊列。NSQ 支持使用 REQ 命令發(fā)送延遲。客戶端庫需要能提供延遲的初始化值(第一次失敗時),以及重新隊列失敗該如何改變。更多細節(jié)參見 Backoff.
最重要的時,客戶端庫必須支持消息處理的回調(diào)函數(shù)配置。這些回調(diào)函數(shù)必須簡單,通常都支持一個參數(shù)(消息對象的實例)。
nsqlookupd 是 NSQ 的重要組成部分,它為消費者發(fā)現(xiàn)服務(wù)提供來定位 nsqd ,它在運行時提供一個指定話題(topic)。
雖然使用 nsqlookupd 能大幅減少配置數(shù)目,但是需要維持并放大一個巨大的分布式 NSQ 集群。
當(dāng)消費者使用 nsqlookupd 來發(fā)現(xiàn)時,客戶端庫必須管理輪詢所有 nsqlookupd 實例的進程,最新的 nsqd 組合以問題形式提供了話題(topic),并且管理到這些 nsqd 的連接。
查詢一個 nsqlookupd 實例非常的簡單。執(zhí)行一個 HTTP 請求,使用消費者試圖發(fā)現(xiàn)的話題(topic) 作為查詢參數(shù)來查找節(jié)點(例如/lookup?topic=clicks). 響應(yīng)體是 JSON:
{
"status_code": 200,
"status_txt": "OK",
"data": {
"channels": ["archive", "science", "metrics"],
"producers": [
{
"broadcast_address": "clicksapi01.routable.domain.net",
"hostname": "clicksapi01.domain.net",
"tcp_port": 4150,
"http_port": 4151,
"version": "0.2.18"
},
{
"broadcast_address": "clicksapi02.routable.domain.net",
"hostname": "clicksapi02.domain.net",
"tcp_port": 4150,
"http_port": 4151,
"version": "0.2.18"
}
]
}
}
broadcast_address 和 tcp_port 必須用來連接 nsqd。 因為從設(shè)計上來說 nsqlookupd 實例不會分享或協(xié)調(diào)他們的數(shù)據(jù),客戶端庫必須聯(lián)合它接收到得所有 nsqlookupd 查詢列表來建立 nsqd 最終列表。使用 broadcast_address:tcp_port 作為這個聯(lián)合的唯一 KEY。
必須用周期性的計時器來重復(fù)的輪詢 nsqlookupd 的配置,這樣消費者能自動的發(fā)現(xiàn)新的 nsqd。客戶端庫必須自動的初始化到所有新發(fā)現(xiàn)的實例的連接。
當(dāng)客戶端庫開始執(zhí)行的時候,它必須通過踢開配置 nsqlookupd 實例的一組請求,來引導(dǎo)輪詢。
一旦消費者有一個 nsqd 可以連接(通過發(fā)現(xiàn)或手工配置), 它就必須打開一個 TCP 連接到 broadcast_address:port。一個單獨的 TCP 連接必須能讓消費者可以訂閱到每個 nsqd 的 話題(topic)。
當(dāng)連接到一個 nsqd 實例時,客戶端庫必須發(fā)送以下數(shù)據(jù),順序是:
IDENTIFY 命令 (和負載) 和讀/驗證響應(yīng) SUB 命令 (指定需要的話題(topic)) 和讀/驗證響應(yīng)RDY 值 1 (低級別的細節(jié)參見 spec)
客戶端庫必須通過以下方法自動重新連接:
如果消費者通過特定的 nsqd 列表指定,重新連接必須通過延遲重試來處理。(列如,8s, 16s, 32s, 等等, 到最大值后重試)。
nsqlookupd 來發(fā)現(xiàn)實例,必須通過輪詢間隔來自動處理重新連接(例如,如果消費者斷開和 nsqd 的連接,客戶端庫僅在隨后的 nsqlookupd 輪詢發(fā)現(xiàn)的實例后重新連接)。這能保證消費者了解 nsqd。IDENTIFY 命令可以用來設(shè)置 nsqd 端的元數(shù)據(jù),修改客戶端設(shè)置,并特性協(xié)商,它滿足亮點:
nsqd 的交互方式(比如,修改客戶端的心跳間隔,并允許壓縮,TLS,輸出緩存,等等-完整列表參見 spec)nsqd 使用 JSON payload 來響應(yīng) IDENTIFY 命令,它包含了重要的服務(wù)端配置值,客戶端和之交互時必須遵守。連接后,根據(jù)用戶的配置, 客戶端庫必須發(fā)送一個 IDENTIFY命令, 它的內(nèi)容是 JSON payload:
{
"client_id": "metrics_increment",
"hostname": "app01.bitly.net",
"heartbeat_interval": 30000,
"feature_negotiation": true
}
feature_negotiation 位表示客戶端可以接受返回值是 JSON payload。
client_id 和 hostname 是隨意的文本字段,nsqd (和 nsqadmin)會用來區(qū)別客戶端. heartbeat_interval 配置每個客戶端的心跳間隔。
nsqd 必須響應(yīng) OK,如果它不支持特性協(xié)商 (nsqd``v0.2.20+引入), 否則:
{
"max_rdy_count": 2500,
"version": "0.2.20-alpha"
}
一旦消費者處于訂閱狀態(tài),NSQ 協(xié)議里的數(shù)據(jù)流時異步的。對于消費者來說,這就是說如果想建立一個健壯并高效的客戶端庫,就必須使用異步的網(wǎng)絡(luò) IO 循環(huán)和/或“線程”(線程表示 OS 級別的線程和用戶空間(userland)的進程,比如協(xié)同程序(coroutines))。
另外,期望客戶端能響應(yīng)它們連接到的 nsqd 實例的周期性心跳。通常這個周期是 30 秒。客戶端可以使用任何命令響應(yīng),不過通常方便起見,使用 NOP 響應(yīng)心跳。更多細節(jié)參見 protocol spec。
“進程”必須專注于讀取 TCP socket 的數(shù)據(jù),解包幀數(shù)據(jù),并執(zhí)行多路邏輯來傳輸。這也是處理心跳最佳點。從最低級別看,讀取協(xié)議包括以下步驟:
根據(jù)系統(tǒng)的異步特性,會采用更多的狀態(tài)來追蹤相關(guān)協(xié)議的由命令產(chǎn)生的錯誤。我們會采用“快速錯誤”("fail fast")方法,所以大量協(xié)議級別錯誤處理都是致命的。這意味著如果客戶端發(fā)送一個無效命令(或者自己是無效狀態(tài)),通過強制關(guān)閉連接(如果可能,發(fā)送一個錯誤給客戶端),它連接到的 nsqd 實例將會保護自己(和系統(tǒng))。和之前提到的連接處理相配合,使得系統(tǒng)更加健壯和穩(wěn)定。
僅有的幾個非致命錯誤是:
E_FIN_FAILED - FIN 命令, 無效的消息 IDE_REQ_FAILED - REQ 命令 無效的消息 IDE_TOUCH_FAILED - TOUCH 命令 無效的消息 ID因為這些錯誤通常和時間有關(guān),所以不當(dāng)做致命錯誤。這些錯誤通常發(fā)生在 nsqd 端消息超時,重新隊列時,和投遞到其他消費者時。原先的接受者不再允許響應(yīng)這個消息。
當(dāng) IO 循環(huán)解包包含消息的幀數(shù)據(jù)時,它必須路由這個消息給配置處理函數(shù)來處理。
發(fā)送 nsqd,在配置消息超時時希望收到回復(fù)(默認:60秒)??赡苡幸韵聢鼍埃?/p>
nsqd 自動重新隊列消息前 3 個情況,客戶端庫必須發(fā)送合適消費者方面的命令 (FIN,REQ,和 TOUCH)。
FIN 命令最簡單。它告訴 nsqd 它能安全的拋棄消息。FIN 也能拋棄那些你不想處理或重試的消息。
REQ 命令告訴 nsqd,消息必須重新隊列(可選參數(shù)指定了重試的次數(shù))。如果消費者沒有指定可選參數(shù),客戶端庫必須自動算出相關(guān)聯(lián)的消息處理的時長(通常設(shè)置為多倍,這樣效率更高)??蛻舳藥毂仨殥仐壋^最多重試次數(shù)的消息。當(dāng)它發(fā)生的時候,必須執(zhí)行用戶提供的回調(diào)來通知,并運行特定的回調(diào)。
如果消息處理函數(shù)需要的時間超過配置的超時時間,可以用 TOUCH 命令來重置 nsqd 端的計時器??梢灾貜?fù)這個動作,直到消息 FIN 或 REQ,或發(fā)送 nsqd 的配置屬性 max_msg_timeout??蛻舳藥觳荒茏詣?TOUCH 代表消費者。
如果發(fā)送 nsqd 實例沒有接收到響應(yīng),消息將會超時,并會自動重新隊列來投遞到可用的消費者。
最后,每個消息的屬性是嘗試次數(shù)??蛻舳藥毂仨毐容^這個值和配置的最大值,并且拋棄已經(jīng)超過這個值得消息。當(dāng)消息已經(jīng)拋棄的時候,需要觸發(fā)回調(diào)。通常這個回調(diào)的實現(xiàn)必須包括寫入磁盤,日志等等。用戶必須能重寫默認的處理函數(shù)。
因為消息是從 nsqd 推送到消費者那,我們必須擁有一個方法來管理數(shù)據(jù)流,而不僅依賴于低級別的 TCP 語法。消費者的 RDY 狀態(tài)是 NSQ 的流控制機制。
如配置中列出的內(nèi)容,通過 max_in_flight 配置消費者。這是并行的并且性能 knob。比如一些 downstream 系統(tǒng)可以更加容易進行消息批處理,并對更高級的 max-in-flight 有利。
當(dāng)消費者連接到nsqd (并且訂閱) ,RDY 初始化狀態(tài)為 0。不會投遞任何消息。
客戶端庫擁有很少的責(zé)任:
max_in_flight 到所有的連接。RDY 的和(total_rdy_count),為超過 max_in_flight 的配置。nsqd 配置的 max_rdy_count。為連接選擇 RDY 值,需要考慮的因素很少(最終分布為 max_in_flight):
nsqlookupd 發(fā)現(xiàn) nsqd)。max_in_flight 可能會小于你的連接數(shù)為了開始消息流,客戶端庫必須發(fā)送一個初始的 RDY 值。因為最終的連接數(shù)并不知道(通常從 '1' 開始),所以客戶端庫必能公平對待每個連接。
另外,每個消息處理后,客戶端庫必須評估什么時候更新 RDY 狀態(tài)。如果當(dāng)前值是 '0',或者低于最后發(fā)送的值的 25% 必須觸發(fā)更新。
客戶端庫必須一直嘗試最終分布 RDY 值到所有的連接。
通常來說,它可以通過 max_in_flight / num_conns 實現(xiàn)。
然而,當(dāng) max_in_flight < num_conns 這個簡單的公式無效的時候??蛻舳藥毂仨殘?zhí)行一個動態(tài)的運行評估,自從通過之前的連接接收到得消息后,連接的 nsqd '活躍度'的時間。當(dāng)配置到期后,他必須重新分布,不論 RDY 值是否對于新的 nsqd 有效。這么做,你能保證你可以通過消息找到 nsqd。這些會有延遲的影響。
max_in_flight客戶端庫必須維護指定消費者的消息 in flight 的最大值。尤其,匯集每個連接的 RDY 值永遠不能超過配置的 max_in_flight 值。
底下的 Python 代碼,它指出 RDY 值是否對于指定的連接有效。
def send_ready(reader, conn, count):
if (reader.total_ready_count + count) > reader.max_in_flight:
return
conn.send_ready(count)
conn.rdy_count = count
reader.total_ready_count += count
nsqd 最大 RDY 值每個 nsqd 通過 --max-rdy-count 配置,如果消費者發(fā)送的 RDY 值超過了可接受的范圍,它的連接將強制關(guān)閉。為了向后兼容,這個值必須假設(shè)為 2500 ,如果 nsqd 實例不能支持特性協(xié)商。
最終,客戶端庫必須提供一個 API 方法,來表示消息流 starvation。對于消費者(消費者處理函數(shù))來說,簡單比較 in-flight 的消息數(shù)和 max_in_flight 值,來決定是否”批處理“不太合適。有兩種情況有問題:
max_in_flight > 1, 根據(jù)變量 num_conns,max_in_flight 除 num_conns 除不盡。因為你永遠不能超過max_in_flight,你必須降低,并且在 RDY 值少于 max_in_flight 時結(jié)束。nsqd 的子集有消息,因為even distribution 的 RDY 預(yù)期值, 這些活躍 nsqd 僅有 max_in_flight 的片段。以上兩種情況,消費者實際上永遠不會接受消息的 max_in_flight。因此,客戶端庫必須暴露一個方法 is_starved,表示任何連接是否 starved,如下:
def is_starved(conns):
for c in conns:
# the constant 0.85 is designed to *anticipate* starvation rather than wait for it
if c.in_flight > 0 and c.in_flight >= (c.last_ready * 0.85):
return True
return False
is_starved 方法必須由消息處理函數(shù)使用,來發(fā)現(xiàn)什么時候處理批量消息。
消息處理失敗的時候如何處理是一個非常復(fù)雜的問題。消息處理章節(jié)介紹了客戶端庫動作,它會處理和時間相關(guān)的失敗的消息。其他的問題是是否減少吞吐量。這兩個功能對于整個系統(tǒng)的穩(wěn)定性至關(guān)重要。
通過減慢處理的速率,或者 "backing off",消費者允許 downstream 系統(tǒng)回收傳輸失敗。然而這個行為必須是可配置的,因為不是什么時候都能稱心如意,這種情況下延遲必須優(yōu)先處理。
Backoff 必須通過發(fā)送 RDY 0 到合適的 nsqd 來實現(xiàn),停止消息流。這個狀態(tài)的時長通過重試的失敗來計算。處理成功會減少這個時長,直到 reader 不再是 backoff 狀態(tài)。
當(dāng) reader 是 backoff 狀態(tài)時,超時后,客戶端庫必須僅發(fā)送過 RDY 1 ,而不是 max_in_flight。 在返回完整的 throttle 前,這是有效的 "tests the waters"。另外,backoff 超時時,客戶端庫必須忽略任何和計算 backoff 時間成功或者失敗結(jié)果。(比如,每次超時時它僅信任一個結(jié)果)
http://wiki.jikexueyuan.com/project/nsq-guide/images/tumblr_inline_mmjev3stkE1qz4rgp.png" alt="nsq_客戶端_flow" />
NSQ 支持加密和/或壓縮特性協(xié)商,通過IDENTIFY 命令。 TLS 用來加密。 Snappy 和 DEFLATE 都支持壓縮。Snappy 可作為第三方庫使用,但是基本所有的語言都支持 DEFLATE。
收到 IDENTIFY 響應(yīng)時,并且你通過 tls_v1 標(biāo)志位請求 TLS,你得到的東西和以下內(nèi)容類似:
{
"deflate": false,
"deflate_level": 0,
"max_deflate_level": 6,
"max_msg_timeout": 900000,
"max_rdy_count": 2500,
"msg_timeout": 60000,
"sample_rate": 0,
"snappy": true,
"tls_v1": true,
"version": "0.2.28"
}
確認 tls_v1 為 true 后(意味著服務(wù)器支持 TLS),在接受和發(fā)送任何消息前,你需要初始化 TLS 握手(例如,Python 使用 ssl.wrap_socket 表示完成)。TLS 握手成功后,你必須立即讀取一個 NSQ 加密的 OK 響應(yīng)。
如果你想壓縮,可以設(shè)置 snappy 或 deflate 為 true ,并且使用合適壓縮(解壓縮)調(diào)用讀寫。同樣的你必須立即讀取一個 NSQ 壓縮的 OK 響應(yīng)。
這些壓縮特性是互斥的。
你不能阻止緩存直到加密/壓縮協(xié)商完成,或者確保小心的讀取到內(nèi)存。
分布式系統(tǒng)非常有意思。
不同的 NSQ 集群部門間交互在一個平臺上,它健壯,高性能,并且穩(wěn)定。希望您能這篇文章里了解到客戶端是多么重要。
這些細節(jié)的實現(xiàn),我們將 pynsq 和 go-nsq 作為代碼基礎(chǔ)。pynsq 可以切割為 2 個部分:
Message - 高級別的消息對象,它暴露了狀態(tài)方法,來響應(yīng)nsqd(FIN,REQ,TOUCH等等),同時元數(shù)據(jù)包含目的和時間戳。
Connection - 高級別的封裝,包含 TCP 連接到一個指定的 nsqd,它包含 flight 消息,RDY 狀態(tài),協(xié)商特性,和不同時間。
消費者 - 和用戶打交道的 API,它處理發(fā)現(xiàn),創(chuàng)建連接(和訂閱),引導(dǎo)和管理 RDY 狀態(tài),解析收到的數(shù)據(jù),創(chuàng)建消息對象,和分發(fā)消息給處理函數(shù)。
Producer -和用戶打交道的 API,處理發(fā)布。我們很高興能幫助任何對編寫客戶端庫有興趣的人。我們希望大家能加入到社區(qū),擴展目前已經(jīng)存在的庫。社區(qū)已經(jīng)開源很多客戶端庫。