私はkafkaを初めて使用しています。私は、私の春のプロジェクトで、activeMQ(jms)に代わるカフカプロデューサ/コンシューマを使用したいと思います。私が必要とするのは、カフカのプロデューサーが自分のメッセージオブジェクトをトピックに公開し、消費者がそれをトピックから購読することです。カフカプロデューサをカスタムエンコーダでインスタンス化すると、インスタンス化できません
最初は私のカスタムエンコーダ、(私のメッセージクラスConfigurationActionMsg用)のデコーダのために同じものである:
以下@Component
public class ConfigActionMessageEncoder implements Encoder<ConfigurationActionMsg> {
public ConfigActionMessageEncoder() {
/* This constructor must be present for successful compile. */
}
public ConfigActionMessageEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(ConfigurationActionMsg actionMsg){
return SerializationUtils.serialize(actionMsg);
}}
が、この場合、私はわからないプロセッサと消費者
@Configuration
@ComponentScan(basePackages = {"XXX"})
public class KafkaConfig {
@Bean
public KafkaProducer<String,ConfigurationActionMsg> kafkaProducer(){
Properties props = new Properties();
props.put("zk.connect", "127.0.0.1:2181");
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.atlas.configengine2.XXX.ConfigActionMessageEncoder");
return new KafkaProducer<>(props);
}
@Bean
public KafkaConsumer<String, ConfigurationActionMsg> kafkaConsumer(){
Properties props = new Properties();
props.put("zk.connect", "127.0.0.1:2181");
props.put("bootstrap.servers", "localhost:9092");
//We should only have one process running for consumer
props.put("group.id", "resolverActionTrigger");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "com.atlas.configengine2.XXX.ConfigActionMessageDecoder");
KafkaConsumer<String, ConfigurationActionMsg> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("configAction"));
return consumer;
}}
のための私の設定ですプロデューサ/コンシューマをインスタンス化する適切な方法です。しかし、この方法では機能しません。私のkafkaProducerはインスタンス化できなかったからです。
いくつかのデバッグ情報:org.apache.kafka.common.KafkaException:によって引き起こさ
com.atlas.configengine2.jms.ConfigActionMessageEncoderがorg.apache.kafka.commonの インスタンスではありません。シリアル化。シリアル化
しかし、それが唯一の問題であるかどうかはわかりません。カスタムエンコーダの作成方法は?
ちょっと私はリンクhttp://stackoverflow.com/questions/23755976/kafka-writing-custom-serializerに従っています。私が理解する限り、彼らは単にシリアライザとしてのEncoderクラスを使用していますか? – Acton
あなたはscala(<0.8.1)で実装されている古いプロデューサを参照しています。新バージョン(0.8.2以上)では、KafkaProducerは純粋にjavaで書かれており、 'Serializer' –