我需要能夠@KafkaListener
根據其中一個屬性的值以及在 中定義的值,為每個帶有注釋的方法自定義 id application.yaml
,例如:
有一個帶有這樣注釋的方法的類:
@KafkaListener(info="myInfo")
public void listen(String msg){
}
以及我的 application.yaml 中的自定義屬性
myapp:
myProperty: myProp
在運行時,我希望注冊的端點消費者的 idmyInfo_myProp
不是當我沒有在注釋的屬性中明確提供一個時生成的自動生成的。
實作這一目標的最佳方法是什么?我正在考慮擴展KafkaListenerEndpointRegistrar
or BeanPostProcessor
?
謝謝
uj5u.com熱心網友回復:
在呼叫超類之前,覆寫端點注冊表 bean 并在那里操作端點屬性。
這是一個例子:
@SpringBootApplication
public class So72719215Application {
public static void main(String[] args) {
SpringApplication.run(So72719215Application.class, args);
}
@Bean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
KafkaListenerEndpointRegistry registry(@Value("${myApp.myProperty}") String myProperty) {
return new KafkaListenerEndpointRegistry() {
@Override
public void registerListenerContainer(KafkaListenerEndpoint endpoint,
KafkaListenerContainerFactory<?> factory) {
AbstractKafkaListenerEndpoint<?, ?> akle = (AbstractKafkaListenerEndpoint<?, ?>) endpoint;
akle.setId(new String(akle.getListenerInfo()) "_" myProperty);
akle.setGroupId("group_" myProperty);
super.registerListenerContainer(endpoint, factory);
}
};
}
@KafkaListener(topics = "so72719215", info = "foo")
void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so72719215").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaListenerEndpointRegistry reg) {
return args -> {
System.out.println(reg.getListenerContainerIds());
};
}
}
uj5u.com熱心網友回復:
id()
請參閱該屬性的JavaDocs @KafkaListener
:
/**
* The unique identifier of the container for this listener.
* <p>If none is specified an auto-generated id is used.
* <p>Note: When provided, this value will override the group id property
* in the consumer factory configuration, unless {@link #idIsGroup()}
* is set to false or {@link #groupId()} is provided.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* @return the {@code id} for the container managing for this endpoint.
* @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
*/
String id() default "";
因此,您可以使用 SpEL 和屬性占位符為目標偵聽器容器定義動態 id。就像是:
@KafkaKListener(id = "#{myBean.generateId('${property.from.env}')}")
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/495512.html