我正在撰寫一個自定義結構化流源,但無法弄清楚如何限制批量大小。新MicroBatchStream
介面提供了planInputPartitions
被呼叫的方法,其回傳值為latestOffset
end Offset。然后它回傳資料的磁區,直到提供的最新偏移量,以便在單個批次中處理。
當我開始一個新的流式查詢時,這會導致第一批非常大,因為所有歷史資料都被塞進一個批次中。
我已經嘗試通過根據已經提交的內容逐漸增加 latestOffset 來手動限制批量大小。但是,當從檢查點重新啟動查詢時,這將失敗,因為尚未提交任何內容。
是否有(明顯的)限制流式批處理大小的方法?
uj5u.com熱心網友回復:
為此,您可以使用SupportsAdmissionControl介面。這為您提供了Offset latestOffset(Offset startOffset, ReadLimit limit);
一種方法,使您可以獲取startOffset
實際上是endOffset
上一批的。這樣,您可以在latestOffset
計算回應時應用大小限制,然后再回傳回應。根據您的需要,您不一定需要使用該ReadLimit
引數 - 在我們的例子中,我們只是有一個我們使用的預定義閾值。對您來說重要的部分是startOffset
引數。
然后,planInputPartitions
將使用正確的開始和結束偏移量呼叫,這是使用您的大小限制計算的。
您可以在 Kafka DataSource 實施中看到一個示例 - 請參見此處。
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/479935.html