NSQ 協(xié)議足夠簡單,用任何語言編譯客戶端都很容易。我們提供官方的 Go 和 Python 客戶端庫。
nsqd 進程通過監(jiān)聽配置的 TCP 端口來接受客戶端連接。
連接后,客戶端必須發(fā)送一個 4 字節(jié)的 "magic" 標識碼,表示通訊協(xié)議的版本。
V2 (4 個字節(jié)的 ASCII [space][space][V][2])
消費用到的推送流協(xié)議(和發(fā)布用到的請求/響應協(xié)議)認證后,客戶端可以發(fā)送 IDENTIFY 命令來停供常用的元數(shù)據(jù)(比如,更多的描述標識碼)和協(xié)商特性。為了消費消息,客戶端必須 SUB 到一個通道(channel)。
訂閱的時候,客戶端的 RDY 狀態(tài)為 0。意味著沒有消息會被發(fā)送到客戶端。當客戶端已經準備好接受消息時,需要把 RDY 設置為 #。比如設置為 100,不需要任何附加命令,將會有 100 條消息推送到客戶端(每次服務端都會相應的減少 RDY 的值)。
V2 版本的協(xié)議讓客戶端擁有心跳功能。每隔 30 秒(默認設置),nsqd 將會發(fā)送一個 _heartbeat_ 響應,并期待返回。如果客戶端空閑,發(fā)送 NOP命令。如果 2 個 _heartbeat_ 響應沒有被應答, nsqd 將會超時,并且強制關閉客戶端連接。IDENTIFY 命令可以用來改變/禁用這個行為。
除非 stated,所有的傳輸?shù)亩壷拼笮?整數(shù)都是網絡字節(jié)順序。(列如. big endian)
[.a-zA-Z0-9_-] 和數(shù)字 1 < length <= 64 (在 nsqd 0.2.28 版本前最長 32 位)更新服務器上的客戶端元數(shù)據(jù)和協(xié)商功能。
IDENTIFY\n
[ 4-byte size in bytes ][ N-byte JSON data ]
注意: 這個命令包含 JSON 的相關內容,包括:
short_id (nsqd v0.2.28+ 版本之后已經拋棄,使用 client_id 替換)這個標示符是描述的簡易格式(比如,主機名)
long_id (v0.2.28+ 版之后已經拋棄,使用 hostname 替換)這個標示符是描述的長格式。(比如. 主機名全名)
client_id 這個標示符用來消除客戶端的歧義 (比如. 一些指定給消費者)
hostname 部署了客戶端的主機名
feature_negotiation (nsqd v0.2.19+) bool, 用來標示客戶端支持的協(xié)商特性。如果服務器接受,將會以 JSON 的形式發(fā)送支持的特性和元數(shù)據(jù)。
heartbeat_interval (nsqd v0.2.19+) 心跳的毫秒數(shù).
有效范圍: 1000 <= heartbeat_interval <= configured_max (-1 禁用心跳)
--max-heartbeat-interval (nsqd 標志位) 控制最大值
默認值 --client-timeout / 2
output_buffer_size (nsqd v0.2.21+) 當 nsqd 寫到這個客戶端時將會用到的緩存的大小(字節(jié)數(shù))。
有效范圍: 64 <= output_buffer_size <= configured_max (-1 禁用輸出緩存)
--max-output-buffer-size (nsqd 標志位) 控制最大值
默認值 16kb
output_buffer_timeout (nsqd v0.2.21+)超時后,nsqd 緩沖的數(shù)據(jù)都會刷新到此客戶端。
有效范圍: 1ms <= output_buffer_timeout <= configured_max (-1 禁用 timeouts)
--max-output-buffer-timeout (nsqd 標志位) 控制最大值
默認值 250ms
警告: 使用極小值 output_buffer_timeout (< 25ms) 配置客戶端,將會顯著提高 nsqd CPU 的使用率(通??蛻舳诉B接時 > 50 )。
這依賴于 Go 的 timers 的實現(xiàn),它通過 Go 的優(yōu)先隊列運行時間維護。
tls_v1 (nsqd v0.2.22+) 允許 TLS 來連接
--tls-cert and --tls-key (nsqd 標志位s) 允許 TLS 并配置服務器證書
如果服務器支持 TLS,將會回復 "tls_v1": true。
客戶端讀取 IDENTIFY 響應后,必須立即開始 TLS 握手。
完成 TLS 握手后服務器將會響應 OK.
snappy (nsqd v0.2.23+) 允許 snappy 壓縮這次連接
--snappy (nsqd 標志位) 允許服務端支持
客戶端不允許同時 snappy 和 deflate。
deflate (nsqd v0.2.23+) 允許 deflate 壓縮這次連接
--deflate (nsqd 標志位) 允許服務端支持
客戶端不允許同時 snappy 和 deflate。
deflate_level (nsqd v0.2.23+) 配置 deflate 壓縮這次連接的級別
--max-deflate-level (nsqd 標志位) 配置允許的最大值
有效范圍: 1 <= deflate_level <= configured_max
值越高壓縮率越好,但是 CPU 負載也高。
sample_rate (nsqd v0.2.25+) 投遞此次連接的消息接收率。
有效范圍: 0 <= sample_rate <= 99 (0 禁用)
默認值 0
user_agent (nsqd v0.2.25+) 這個客戶端的代理字符串
默認值: <client_library_name>/<version>
msg_timeout (nsqd v0.2.28+) 配置服務端發(fā)送消息給客戶端的超時時間成功后響應:
OK
注意: 如果客戶端發(fā)送了 feature_negotiation (并且服務端支持),響應體將會是 JSON。
錯誤后的響應內容:
E_INVALID
E_BAD_BODY
訂閱話題(topic) /通道(channel)
SUB <topic_name> <channel_name>\n
<topic_name> - 字符串 (建議包含 #ephemeral 后綴)
<channel_name> - 字符串 (建議包含 #ephemeral 后綴)
成功后響應:
OK
錯誤后響應:
E_INVALID
E_BAD_TOPIC
E_BAD_CHANNEL
發(fā)布一個消息到 話題(topic):
PUB <topic_name>\n
[ 4-byte size in bytes ][ N-byte binary data ]
<topic_name> - 字符串 (建議 having #ephemeral suffix)
成功后響應:
OK
錯誤后響應:
E_INVALID
E_BAD_TOPIC
E_BAD_MESSAGE
E_PUB_FAILED
發(fā)布多個消息到 話題(topic) (自動):
注意: nsqd v0.2.16+ 有效
MPUB <topic_name>\n
[ 4-byte body size ]
[ 4-byte num messages ]
[ 4-byte message #1 size ][ N-byte binary data ]
... (repeated <num_messages> times)
<topic_name> - 字符串 (建議 having #ephemeral suffix)
成功后響應:
OK
錯誤后響應:
E_INVALID
E_BAD_TOPIC
E_BAD_BODY
E_BAD_MESSAGE
E_MPUB_FAILED
更新 RDY 狀態(tài) (表示你已經準備好接收N 消息)
注意: nsqd v0.2.20+ 使用 --max-rdy-count 表示這個值
RDY <count>\n
<count> - a string representation of integer N where 0 < N <= configured_max
注意: 這個沒有成功后響應
錯誤后響應:
E_INVALID
完成一個消息 (表示成功處理)
FIN <message_id>\n
<message_id> - message id as 16-byte hex string
注意: 這里沒有成功后響應
錯誤后響應:
E_INVALID
E_FIN_FAILED
重新將消息隊列(表示處理失?。?/p>
這個消息放在隊尾,表示已經發(fā)布過,但是因為很多實現(xiàn)細節(jié)問題,不要嚴格信賴這個,將來會改進。
簡單來說,消息在傳播途中,并且超時就表示 REQ。
REQ <message_id> <timeout>\n
<message_id> - message id as 16-byte hex string
<timeout> - a string representation of integer N where N <= configured max timeout
0 is a special case that will not defer re-queueing
注意: 這里沒有成功后響應
錯誤后響應:
E_INVALID
E_REQ_FAILED
重置傳播途中的消息超時時間
注意: 在 nsqd v0.2.17+ 可用
TOUCH <message_id>\n
<message_id> - the hex id of the message
注意: 這里沒有成功后響應
錯誤后響應:
E_INVALID
E_TOUCH_FAILED
清除連接(不再發(fā)送消息)
CLS\n
成功后響應s:
CLOSE_WAIT
錯誤后響應:
E_INVALID
No-op
NOP\n
注意: 這里沒有 response
注意: 在 nsqd v0.2.29+ 可用
如果 IDENTIFY 響應中有 auth_required=true,客戶端必須在 SUB, PUB 或 MPUB 命令前前發(fā)送 AUTH 。否則,客戶端不需要認證。
當 nsqd 接收到 AUTH 命令,它通過執(zhí)行 HTTP 配置 --auth-http-address ,這個請求包括以下查詢參數(shù):連接的遠程地址,TLS 狀態(tài),支持的認證密碼。更多細節(jié)參見:AUTH
AUTH\n
[ 4-byte size in bytes ][ N-byte Auth Secret ]
成功后響應:
JSON 包含授權給客戶端的身份,可選的 URL,和授權過的權限列表。
{"identity":"...", "identity_url":"...", "permission_count":1}
錯誤后響應:
E_AUTH_FAILED - An error occurred contacting an auth server
E_UNAUTHORIZED - No permissions found
數(shù)據(jù)異步傳輸給客戶端,并且支持各種回復體,比如
[x][x][x][x][x][x][x][x][x][x][x][x]...
| (int32) || (int32) || (binary)
| 4-byte || 4-byte || N-byte
------------------------------------...
size frame type data
客戶端必須是以下類型之一:
FrameTypeResponse int32 = 0
FrameTypeError int32 = 1
FrameTypeMessage int32 = 2
以及消息格式:
[x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
| (int64) || || (hex string encoded in ASCII) || (binary)
| 8-byte || || 16-byte || N-byte
------------------------------------------------------------------------------------------...
nanosecond timestamp ^^ message ID message body
(uint16)
2-byte
attempts