下面的代碼效率低下,它kafkaConsumer
每次都在 for 回圈中請求(它說<!-- move code below -->
)。如何將其移至<!-- move it here -->
每個主題只需請求一次?我相信我必須從中獲取所有TopicPartition
內容jsonOffsets
并將其放入,kafkaConsumer.endOffsets
但我不知道該怎么做。
endOffsets
需要型別TopicPartitions
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
event.progress.sources
// Ignoring sqs / jms sources their offsets
.filter(source => source.description.toLowerCase().contains("kafka"))
// ex offset :
// "endOffset" : {
// "rcs-enriched-event" : {
// "8" : 31376,
// "11" : 39114,
// } ...
.foreach(source => {
/// Map[Topic,Map[Partition, CurrentOffset]]
val jsonOffsets = objectMapper.readValue(source.endOffset, classOf[Map[String, Map[String, Int]]])
jsonOffsets.keys.filter(key => topics.contains(key))
.foreach(topic => {
val topicPartitionMap = new java.util.HashMap[TopicPartition, OffsetAndMetadata]()
// Map[Partition, CurrentOffset]
val topicOffsetList = new ListBuffer[Int]()
val offsets: Option[Map[String, Int]] = jsonOffsets.get(topic)
<!-- move it here -->
offsets match {
case Some(topicOffsetData) =>
topicOffsetData.keys.foreach(partition => {
/// "4" : 34937
val tp = new TopicPartition(topic, partition.toInt)
val oam = new OffsetAndMetadata(topicOffsetData(partition).toLong)
val bbCurrentOffset = topicOffsetData(partition).toLong
<!-- move code below -->
val kafkaPartitionOffset = kafkaConsumer.endOffsets(java.util.Arrays.asList(tp))
// latest offset
val partitionLatestOffset = kafkaPartitionOffset.get(tp)
// Log for a particular partition
val delta = partitionLatestOffset - bbCurrentOffset
topicOffsetList = delta.abs
topicPartitionMap.put(tp, oam)
})
}
try {
kafkaConsumer.commitSync(topicPartitionMap)
} catch {
case e: Exception => log.error(s"${groupId} Could not commit offset", e)
}
//log.info have the group id (unique id), the topic, cumulative consumer lag delta
//push out to logger
log.info("groupId: " groupId " topic: " topic " lagDeltaSum: " topicOffsetList.sum)
})
})
}
uj5u.com熱心網友回復:
在我看來,您應該回圈offsets
兩次。
.foreach(topic => {
val offsets: Option[Map[String, Int]] = jsonOffsets.get(topic)
// Fetch latest offsets
val consumedPartitions = new ListBuffer[TopicPartition]()
offsets match {
case Some(topicOffsetData) =>
topicOffsetData.keys.foreach(partition => {
val tp = new TopicPartition(topic, partition.toInt)
consumedPartitions = tp
}
}
val latestOffsets = kafkaConsumer.endOffsets(consumedPartitions.asJava)
offsets match {
case Some(topicOffsetData) =>
// Use latest offsets, as needed ...
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/477836.html