我正在嘗試暫停/恢復 Kafka 容器。使用以下代碼片段來執行此操作:
kafkaListenerEndpointRegistry.getListenerContainer("MAIN").pause();
當我呼叫 pause 時,我還需要做一個 thread.sleep 以便不處理批處理中的訊息。對于批處理中的每條訊息,我正在呼叫另一個具有速率限制的 API。為了保持這個速率限制,我需要停止對訊息的處理。
如果主執行緒休眠,它會阻止監聽器發送心跳嗎?它是否也會在后臺停止心跳執行緒?檔案說,“當容器暫停時,它會繼續 poll() 消費者,如果正在使用組管理,則避免重新平衡,但它不會檢索任何記錄。”但是我正在暫停容器并使執行緒休眠。這將如何影響流量?
uj5u.com熱心網友回復:
您絕不能休眠消費者執行緒,以避免重新平衡。
相反,減少,max.poll.records
這樣暫停會更快生效(消費者實際上不會暫停,直到處理之前的輪詢接收到的記錄)。
您可以在暫停消費者后拋出例外,但您需要以某種方式恢復容器。
我開了一個新問題來改善這種行為https://github.com/spring-projects/spring-kafka/issues/2280
如果您受到速率限制,請考慮使用KafkaTemplate.receive()
方法、按計劃或輪詢Spring Integration 配接器,而不是使用訊息驅動的方法。
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/477523.html