在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 問答/人工智能  Python/ kafka要等一段時間才能消費(fèi)到數(shù)據(jù)

kafka要等一段時間才能消費(fèi)到數(shù)據(jù)

為什么用python寫的kafka客戶端腳本,程序一運(yùn)行就能生產(chǎn)數(shù)據(jù),而要等一段時間才能消費(fèi)到數(shù)據(jù)(topic里面有數(shù)據(jù))。(pykafka和confluentKafka都一樣)

只有極少的概率立刻可以消費(fèi)到數(shù)據(jù),大多數(shù)都要等個幾分鐘,很影響測試效率。

  • 自己封裝的一個給予confluentKafka的consumer對象

圖片描述

  • 調(diào)用,topic里面是有內(nèi)容的等一段時間也是可以消費(fèi)到數(shù)據(jù)的

圖片描述

回答
編輯回答
熟稔

https://stackoverflow.com/que... 最下面的回答,注意conf中session.timeout.ms參數(shù)的設(shè)置。

2018年8月12日 23:13
編輯回答
賤人曾

你在實(shí)例化consumer對象的時候加這樣兩個參數(shù)session_timeout_ms=6000,heartbeat_interval_ms=2000

consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid,
                                      bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
            kafka_host=self.kafkaHost,
            kafka_port=self.kafkaPort
            ),
                                      session_timeout_ms=6000,
                                      heartbeat_interval_ms=2000)

官網(wǎng)里有參數(shù)的解釋https://kafka-python.readthed...
主要是這句話:“ If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance.”
另外本機(jī)的session_timeout_ms默認(rèn)值我的是30s,不是官網(wǎng)里說的10s!

2018年7月30日 19:01
編輯回答
傻叼

幾分鐘不應(yīng)該啊,能把代碼貼下么

2017年10月2日 09:38