nsqd 是一個(gè)守護(hù)進(jìn)程,負(fù)責(zé)接收,排隊(duì),投遞消息給客戶端。
它可以獨(dú)立運(yùn)行,不過通常它是由 nsqlookupd 實(shí)例所在集群配置的(它在這能聲明 topics 和 channels,以便大家能找到)。
它在 2 個(gè) TCP 端口監(jiān)聽,一個(gè)給客戶端,另一個(gè)是 HTTP API。同時(shí),它也能在第三個(gè)端口監(jiān)聽 HTTPS。
-auth-http-address=: <addr>:<port> 查詢授權(quán)服務(wù)器 (可能會(huì)給多次)
-broadcast-address="": 通過 lookupd 注冊(cè)的地址(默認(rèn)名是 OS)
-config="": 配置文件路徑
-data-path="": 緩存消息的磁盤路徑
-deflate=true: 運(yùn)行協(xié)商壓縮特性(客戶端壓縮)
-e2e-processing-latency-percentile=: 消息處理時(shí)間的百分比(通過逗號(hào)可以多次指定,默認(rèn)為 none)
-e2e-processing-latency-window-time=10m0s: 計(jì)算這段時(shí)間里,點(diǎn)對(duì)點(diǎn)時(shí)間延遲(例如,60s 僅計(jì)算過去 60 秒)
-http-address="0.0.0.0:4151": 為 HTTP 客戶端監(jiān)聽 <addr>:<port>
-https-address="": 為 HTTPS 客戶端 監(jiān)聽 <addr>:<port>
-lookupd-tcp-address=: 解析 TCP 地址名字 (可能會(huì)給多次)
-max-body-size=5123840: 單個(gè)命令體的最大尺寸
-max-bytes-per-file=104857600: 每個(gè)磁盤隊(duì)列文件的字節(jié)數(shù)
-max-deflate-level=6: 最大的壓縮比率等級(jí)(> values == > nsqd CPU usage)
-max-heartbeat-interval=1m0s: 在客戶端心跳間,最大的客戶端配置時(shí)間間隔
-max-message-size=1024768: (棄用 --max-msg-size) 單個(gè)消息體的最大字節(jié)數(shù)
-max-msg-size=1024768: 單個(gè)消息體的最大字節(jié)數(shù)
-max-msg-timeout=15m0s: 消息超時(shí)的最大時(shí)間間隔
-max-output-buffer-size=65536: 最大客戶端輸出緩存可配置大小(字節(jié))
-max-output-buffer-timeout=1s: 在 flushing 到客戶端前,最長(zhǎng)的配置時(shí)間間隔。
-max-rdy-count=2500: 客戶端最大的 RDY 數(shù)量
-max-req-timeout=1h0m0s: 消息重新排隊(duì)的超時(shí)時(shí)間
-mem-queue-size=10000: 內(nèi)存里的消息數(shù)(per topic/channel)
-msg-timeout="60s": 自動(dòng)重新隊(duì)列消息前需要等待的時(shí)間
-snappy=true: 打開快速選項(xiàng) (客戶端壓縮)
-statsd-address="": 統(tǒng)計(jì)進(jìn)程的 UDP <addr>:<port>
-statsd-interval="60s": 從推送到統(tǒng)計(jì)的時(shí)間間隔
-statsd-mem-stats=true: 切換發(fā)送內(nèi)存和 GC 統(tǒng)計(jì)數(shù)據(jù)
-statsd-prefix="nsq.%s": 發(fā)送給統(tǒng)計(jì)keys 的前綴(%s for host replacement)
-sync-every=2500: 磁盤隊(duì)列 fsync 的消息數(shù)
-sync-timeout=2s: 每個(gè)磁盤隊(duì)列 fsync 平均耗時(shí)
-tcp-address="0.0.0.0:4150": TCP 客戶端 監(jiān)聽的 <addr>:<port>
-tls-cert="": 證書文件路徑
-tls-client-auth-policy="": 客戶端證書授權(quán)策略 ('require' or 'require-verify')
-tls-key="": 私鑰路徑文件
-tls-required=false: 客戶端連接需求 TLS
-tls-root-ca-file="": 私鑰證書授權(quán) PEM 路徑
-verbose=false: 打開日志
-version=false: 打印版本
-worker-id=0: 進(jìn)程的唯一碼(默認(rèn)是主機(jī)名的哈希值)
/ping - 活躍度/info - 版本/stats - 檢查綜合運(yùn)行/pub - 發(fā)布消息到話題(topic)/mpub - 發(fā)布多個(gè)消息到話題(topic)/debug/pprof - pprof 調(diào)試入口/debug/pprof/profile - 生成 pprof CPU 配置文件/debug/pprof/goroutine - 生成 pprof 計(jì)算配置文件/debug/pprof/heap - 生成 pprof 堆配置文件/debug/pprof/block - 生成 pprof 塊配置文件/debug/pprof/threadcreate - 生成 pprof OS 線程配置文件v1 命名空間 (as of nsqd v0.2.29+):
/topic/create - 創(chuàng)建一個(gè)新的話題(topic)/topic/delete - 刪除一個(gè)話題(topic)/topic/empty - 清空話題(topic)/topic/pause - 暫停話題(topic)的消息流/topic/unpause - 恢復(fù)話題(topic)的消息流/channel/create - 創(chuàng)建一個(gè)新的通道(channel)/channel/delete - 刪除一個(gè)通道(channel)/channel/empty - 清空一個(gè)通道(channel)/channel/pause - 暫停通道(channel)的消息流/channel/unpause - 恢復(fù)通道(channel)的消息流以拋棄的命名空間:
/create_topic - 創(chuàng)建一個(gè)新的話題(topic)/delete_topic - 刪除一個(gè)話題(topic)/empty_topic - 清空話題(topic)/pause_topic - 暫停話題(topic)的消息流/unpause_topic - 恢復(fù)話題(topic)的消息流/create_channel - 創(chuàng)建一個(gè)新的通道(channel)/delete_channel - 刪除一個(gè)通道(channel)/empty_channel - 清空一個(gè)通道(channel)/pause_channel - 暫停通道(channel)的消息流/unpause_channel - 恢復(fù)通道(channel)的消息流NOTE: 這些結(jié)束點(diǎn)返回 "wrapped" JSON:
{"status_code":200, "status_text":"OK", "data":{...}}
發(fā)送 Accept: application/vnd.nsq; version=1.0 頭將會(huì)協(xié)商使用未封裝的 JSON 響應(yīng)格式 (as of nsqd v0.2.29+)。
發(fā)布一個(gè)消息
參數(shù):
topic - the topic to publish to
POST body - the raw message bytes
$ curl -d "<message>" http://127.0.0.1:4151/pub?topic=message_topic`
一個(gè)往返發(fā)布多個(gè)消息
參數(shù):
topic - 發(fā)布到的話題(topic)
binary - bool ('true' or 'false') 允許二進(jìn)制模式
POST body - `\n` 分離原始消息字節(jié)
注意:默認(rèn)的 /mpub 希望消息使用 \n 切割,使用 ?binary=true 查詢參數(shù)來允許二進(jìn)制模式,希望發(fā)送的消息體能成為以下的格式(HTTP 'Content-Length' 頭必須是將要發(fā)送的消息體的總大小):
[ 4-byte num messages ]
[ 4-byte message #1 size ][ N-byte binary data ]
... (repeated <num_messages> times)
$ curl -d "<message>\n<message>\n<message>" http://127.0.0.1:4151/mpub?topic=message_topic`
已經(jīng)拋棄的別名 /create_topic
創(chuàng)建一個(gè)話題(topic)
參數(shù):
話題(topic) - 將要?jiǎng)?chuàng)建的話題(topic)
已經(jīng)拋棄的別名 : /delete_topic
刪除一個(gè)已經(jīng)存在的話題(topic) (和所有的通道(channel))
參數(shù):
topic - 現(xiàn)有的話題(topic) to delete
已拋棄的別名: /create_channel
為現(xiàn)有的話題(topic) 創(chuàng)建一個(gè)通道(channel)
參數(shù):
topic - 現(xiàn)有的話題(topic)
channel - the channel to create
已拋棄的別名: /delete_channel
刪除現(xiàn)有的話題(topic) 一個(gè)的通道(channel)
參數(shù):
topic - 現(xiàn)有的話題(topic)
channel - 待刪除的通道(channel)
已拋棄的別名: /empty_topic
清空現(xiàn)有話題(topic) 隊(duì)列中所有的消息(內(nèi)存和磁盤中)
參數(shù):
topic - 待清空的話題(topic)
已拋棄的別名: /empty_channel
清空現(xiàn)有通道(channel) 隊(duì)列中所有的消息(內(nèi)存和磁盤中)
參數(shù):
topic - 現(xiàn)有的話題(topic)
channel - 待清空的通道(channel)
已拋棄的別名: /pause_topic
暫停已有話題(topic) 的所有通道(channel)的消息(消息將會(huì)在話題(topic) 里排隊(duì))
參數(shù):
topic - 現(xiàn)有的話題(topic)
已拋棄的別名: /unpause_topic
為現(xiàn)有的話題(topic) 的通道(channel) 重啟消息流
參數(shù):
topic - 現(xiàn)有的話題(topic)
已拋棄的別名: /channel_pause
暫停發(fā)送已有的通道(channel) 給消費(fèi)者(消息將會(huì)隊(duì)列)
參數(shù):
topic - 現(xiàn)有的話題(topic)
channel - 已有的通道(channel)將會(huì)被暫停
已拋棄的別名: /unpause_channel
重新發(fā)送通道(channel) 里的消息給消費(fèi)者
參數(shù):
topic - 現(xiàn)有的話題(topic)
channel - 將要暫停的通道(channel)
返回內(nèi)部統(tǒng)計(jì)數(shù)據(jù)
參數(shù)
format - (可選) `text` or `json` (默認(rèn) = `text`)
監(jiān)控結(jié)束點(diǎn),必須返回 OK。如果有問題返回 500。同時(shí),如果寫消息到磁盤失敗將會(huì)返回錯(cuò)誤狀態(tài)。
返回版本信息
可用的調(diào)試節(jié)點(diǎn)的頁碼
開始 30秒的 pprof CPU 配置,并通過請(qǐng)求返回。
注意,因?yàn)樗谶\(yùn)行時(shí)的性能和時(shí)間,這個(gè)結(jié)束點(diǎn)并沒在 /debug/pprof 頁面列表中。
為所有運(yùn)行的 goroutines 返回棧記錄。
返回堆和內(nèi)存配置信息(前面的內(nèi)容可作為 pprof 配置信息)
返回 goroutine 塊配置信息
返回 goroutine 棧記錄
nsqd 提供一套節(jié)點(diǎn)的配置信息,直接通過 Go 的 pprof 工具。如果你有 go 工具套裝,只要運(yùn)行:
# memory profiling
$ go 工具 pprof http://localhost:4151/debug/pprof/heap
# cpu profiling
$ go 工具 pprof http://localhost:4151/debug/pprof/profile
為了加強(qiáng)安全性,可以通過 --tls-cert 和 --tls-key 客戶端配置 nsqd,升級(jí)他們的鏈接為 TLS。
另外,你可以要求客戶端使用 --tls-required (nsqd v0.2.28+)協(xié)商 TLS。
你可以通過--tls-client-auth-policy (require 或 require-verify)配置一個(gè) nsqd 客戶端證書:
require - 客戶端必須提供一個(gè)證書,否則將會(huì)被拒絕require-verify - 客戶端必須提供一個(gè)有效的證書,根據(jù) --tls-root-ca-file 指定的鏈接或者默認(rèn)的 CA,否則將會(huì)被拒絕。可以當(dāng)做客戶端授權(quán)的表單(nsqd v0.2.28+)。
如果你想生成一個(gè) password-less,自簽名證書,用:
$ openssl req -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -days 365 -nodes
注意: 在 nsqd v0.2.29+ 可用
通過使用一個(gè)遵從 Auth HTTP 協(xié)議的授權(quán)服務(wù)器,指定 -auth-http-address=host:port 標(biāo)志,你可以配置 nsqd。
注意: 希望當(dāng)僅有 nsqd TCP 協(xié)議暴露給外部客戶端時(shí)使用授權(quán),而不是 HTTP(S) 節(jié)點(diǎn)。參見底下說明:
Auth 服務(wù)器必須接受 HTTP 請(qǐng)求:
/auth?remote_ip=...&tls=...&auth_secret=...
返回結(jié)果格式如下:
{
"ttl": 3600,
"identity": "username",
"identity_url": "http://....",
"authorizations": [
{
"permissions": [
"subscribe",
"publish"
],
"topic": ".*",
"channels": [
".*"
]
}
]
}
注意話題(topic) 和通道(channel) 字符串必須用 nsqd 的正則表達(dá)式來申請(qǐng)授權(quán)。nsqd 將會(huì)為 TTL 間隔,并會(huì)在這個(gè)間隔時(shí)間里重新請(qǐng)求授權(quán)。
通常情況,將會(huì)使用 TLS 來加強(qiáng)安全性。nsqd 和 授權(quán)服務(wù)器間通過信任的網(wǎng)絡(luò)通信(并沒被加密)。如果一個(gè)授權(quán)服務(wù)器通過遠(yuǎn)程 IP 信息來授權(quán),客戶端可以使用占位符(比如 .),作為 AUTH 命令(Auth 服務(wù)器忽略)。
授權(quán)服務(wù)器例子 pynsqauthd。
幫助服務(wù)器暴露 nsqlookupd 和 nsqd /stats 數(shù)據(jù)給客戶端,從授權(quán)服務(wù)器通過權(quán)限過濾,在以下可以找到
nsqauthfilter。
當(dāng)使用命令行工具,可以通過使用 --reader-opt 標(biāo)志來授權(quán)。
$ nsq_tail ... -reader-opt="tls_v1,true" -reader-opt="auth_secret,$SECRET"
你可以選擇設(shè)置 nsqd 來收集和發(fā)射點(diǎn)對(duì)點(diǎn)信息處理延遲,通過 --e2e-processing-latency-percentile 標(biāo)志位來配置百分比。
使用概率百分比技術(shù)(參見 Effective Computation of Biased Quantiles over Data Streams)來計(jì)算值。我們通過 bmizerany 來使用 perks 包,它能實(shí)現(xiàn)這個(gè)算法。
我們內(nèi)部維持 2 個(gè)通道(channel),每個(gè)通道(channel)存儲(chǔ) N/2 分鐘的延遲數(shù)據(jù)。每個(gè) N/2 分鐘我們重置了每個(gè)通道(channel)(并開始插入新的數(shù)據(jù))。
因?yàn)槲覀儍H在通道級(jí)別收集數(shù)據(jù),對(duì)于話題我們聚合并合并所有的通道數(shù)量的 quantiles。如果數(shù)據(jù)在同一個(gè) nsqd 實(shí)例上時(shí),可以使用這個(gè)技術(shù)。然而當(dāng)數(shù)據(jù)已經(jīng)精確的通過 nsqd (通過 nsqlookupd),我們?yōu)槊總€(gè) nsqd 取平均值。為了維持統(tǒng)計(jì)的精確性,除了平均值,我們也提供最大最小值。
注意: 如果沒有消費(fèi)者連接,不能更新值,盡管消息隊(duì)列的點(diǎn)對(duì)點(diǎn)時(shí)間會(huì)緩慢增長(zhǎng)。這是因?yàn)閮H在 nsqd 收到從客戶端發(fā)來 FIN 消息時(shí)才會(huì)重新計(jì)算。當(dāng)消費(fèi)者重新連接,這些值將會(huì)重新調(diào)整。
當(dāng)使用 --statsd-address 來為statsd (或類似 statsdaemon)指定 UDP <addr>:<port> 時(shí),nsqd 將會(huì)在 --statsd-interval 定期推送數(shù)據(jù)給 statsd(注意:這個(gè)間隔必須始終小于等于 graphite 的刷入間隔)。設(shè)置 nsqadmin 可以顯示圖標(biāo)。
推薦以下配置(但是這些選擇必須建立在你的可用資源和要求上)。同時(shí),statsd 的刷入間隔必須小于或者等于 storage-schemas.conf 的最小值,并且 nsqd 必須通過 --statsd-interval 來確認(rèn)刷入時(shí)間小于等于時(shí)間間隔。
# storage-schemas.conf
[nsq]
pattern = ^nsq\..*
retentions = 1m:1d,5m:30d,15m:1y
# storage-aggregation.conf
[default_nsq]
pattern = ^nsq\..*
xFilesFactor = 0.2
aggregationMethod = average
nsqd 實(shí)例將會(huì)推送給以下 statsd 路徑:
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.backend_depth [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.depth [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.message_count
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.backend_depth [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.clients [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.deferred_count [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.depth [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.in_flight_count [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.message_count
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.requeue_count
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.timeout_count
# if --statsd-mem-stats is enabled
nsq.<nsqd_host>_<nsqd_port>.mem.heap_objects [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.heap_idle_bytes [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.heap_in_use_bytes [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.heap_released_bytes [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.gc_pause_usec_100 [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.gc_pause_usec_99 [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.gc_pause_usec_95 [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.mem.next_gc_bytes [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.gc_runs
# if --e2e-processing-latency-percentile is specified, for each percentile
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.e2e_processing_latency_<percent> [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.e2e_processing_latency_<percent> [gauge]