我正在撰寫一個帶有遠程分塊的示例 Spring 批處理 Kafka 集成。起初,大師在一些塊中讀取了一些示例記錄(Item.java)。然后將此塊發送到(Spring Integration)通道,Kafka 生產者將此塊發送到 Kafka 主題。問題是 KafkaTemplate 無法序列化 ChenkRequest。
如果我使用:
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
然后這個錯誤引發:
org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.springframework.batch.integration.chunk.ChunkRequest to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
如果我撰寫一個自定義序列化程式,例如:
public class CustomSerializer implements Serializer<ChunkRequest<Item>> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, ChunkRequest<Item> data) {
try {
System.out.println("Serializing...");
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error when serializing MessageDto to byte[]");
}
}
@Override
public void close() {
}
}
這個配置:
spring.kafka.producer.value-serializer=com.example.myremotechunking.batch.CustomSerializer
這個錯誤會引發:
com.fasterxml.jackson.databind.JsonMappingException: Infinite recursion (StackOverflowError) (through reference chain: org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]
那么,有什么問題!?
uj5u.com熱心網友回復:
與org:spring-projects org.springframework.batch.integration.chunk.ChunkRequest
JSON 不兼容。考慮撰寫一個Serializer<ChunkRequest<Item>>
將使用標準 Java 序列化功能進行序列化的程式。您可以使用org.springframework.core.serializer.DefaultSerializer.serializeToByteArray()
來自 Spring 的 a 作為委托。
可能你在另一邊需要一個類似的解串器......
uj5u.com熱心網友回復:
謝謝阿爾特姆。那行得通!我寫了一個客戶序列化程式:
public class CustomSerializer implements Serializer<Object> {
@Override
public byte[] serialize(String topic, Object data) {
try {
return new DefaultSerializer().serializeToByteArray(data);
} catch (Exception e) {
throw new SerializationException("Error when serializing MessageDto to byte[]");
}
}
}
配置:
spring.kafka.producer.value-serializer=com.example.myremotechunking.batch.CustomSerializer
和一個自定義反序列化器:
public class CustomDeserializer implements Deserializer<Object> {
@Override
public Object deserialize(String s, byte[] bytes) {
try {
return new DefaultDeserializer().deserialize(new ByteArrayInputStream(bytes));
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}
配置:
spring.kafka.consumer.value-deserializer=com.example.myremotechunking.batch.CustomDeserializer
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/426033.html
上一篇:repository.save()如何作業,以及如何測驗SpringdataJPA中的唯一約束
下一篇:春季啟動自動配置。意外行為