我正在運行spring-kafka:2.6.7
,我正在尋找一種為我的聽眾設定自定義任務執行器的方法。下面是我的 Kafka 配置。
@Bean
ProducerFactory<Integer, BaseEventTemplate> eventProducerFactory() {
Map<String, Object> producerProps = new HashMap<>()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class)
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BaseEventTemplateSerializer.class)
producerProps.put(ProducerConfig.ACKS_CONFIG, "all")
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 256)
return new DefaultKafkaProducerFactory<>(producerProps)
}
@Bean
KafkaTemplate<Integer, BaseEventTemplate> baseEventKafkaTemplate() {
return new KafkaTemplate<>(eventProducerFactory())
}
@Bean
ConsumerFactory<Integer, BaseEventTemplate> baseEventConsumerFactory() {
Map<String, Object> consumerProps = new HashMap<>()
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer)
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "kafkaeventconsumer")
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class)
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BaseEventTemplateDeserializer.class)
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Collections.singletonList(RoundRobinAssignor.class))
return new DefaultKafkaConsumerFactory<>(consumerProps)
}
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, BaseEventTemplate> baseEventKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, BaseEventTemplate> factory =
new ConcurrentKafkaListenerContainerFactory<>()
factory.setConsumerFactory(baseEventConsumerFactory())
factory.setConcurrency(3)
factory.getContainerProperties().setPollTimeout(3000)
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE)
factory.getContainerProperties().setSyncCommits(true)
return factory
}
我有一種方法可以通過設定消費者任務執行器,factory.getContainerProperties().setConsumerTaskExecutor()
但不確定如何為偵聽器設定任務執行器。
uj5u.com熱心網友回復:
2.6.x 不再支持 OSS https://spring.io/projects/spring-kafka#support
用于輪詢消費者的同一執行緒用于呼叫偵聽器。
在非常早期的版本中(1.3 之前),由于 kafka-clients 的限制,有兩個執行緒,但現在只有一個(過去 5 年)。
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/492229.html