目前我有這個代碼來創建一個ConcurrentKafkaListenerContainerFactory
.
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, KafkaExampleRecord> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Object, KafkaExampleRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps()));
factory.setReplyTemplate(kafkaTemplate());
return factory;
}
如何創建DefaultKafkaConsumerFactory
具有來自的屬性的物件application.yaml
?我不想將它們設定在consumerProps()
方法中。
編輯:我的 KafkaConfig 類現在看起來像這樣。
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public ReplyingKafkaTemplate<Object, KafkaExampleRecord, KafkaExampleRecord> replyingKafkaTemplate(ProducerFactory<Object, KafkaExampleRecord> producerFactory,
ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> repliesContainer) {
return new ReplyingKafkaTemplate<>(producerFactory, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> repliesContainer(
ConcurrentKafkaListenerContainerFactory<Object, KafkaExampleRecord> kafkaListenerContainerFactory) {
ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> rc =
kafkaListenerContainerFactory.createContainer("mytopic");
rc.setAutoStartup(false);
return rc;
}
@Bean
public KafkaTemplate<Object, KafkaExampleRecord> kafkaTemplate(ProducerFactory<Object, KafkaExampleRecord> pf) {
return new KafkaTemplate<>(pf);
}
}
但是現在我明白了java.lang.IllegalStateException: a KafkaTemplate is required to support replies
。
uj5u.com熱心網友回復:
Spring Boot 自動配置一個;只需將其作為引數添加到工廠方法:
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, KafkaExampleRecord> kafkaListenerContainerFactory(
ConsumerFactory<Object, KafkaExampleRecord> cf) {
...
但是,您也不需要宣告容器工廠 bean;如果將 KT 定義為 bean,Boot 將為您自動配置并自動設定 KT。
請參閱引導的KafkaAnnotationDrivenConfiguration
.
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/496066.html