2016-05-26 19 views
0

私はkafkaを初めて使用しています。私は、私の春のプロジェクトで、activeMQ(jms)に代わるカフカプロデューサ/コンシューマを使用したいと思います。私が必要とするのは、カフカのプロデューサーが自分のメッセージオブジェクトをトピックに公開し、消費者がそれをトピックから購読することです。カフカプロデューサをカスタムエンコーダでインスタンス化すると、インスタンス化できません

最初は私のカスタムエンコーダ、(私のメッセージクラスConfigurationActionMsg用)のデコーダのために同じものである:

以下
@Component 
public class ConfigActionMessageEncoder implements Encoder<ConfigurationActionMsg> { 

    public ConfigActionMessageEncoder() { 
    /* This constructor must be present for successful compile. */ 
    } 
    public ConfigActionMessageEncoder(VerifiableProperties verifiableProperties) { 
    /* This constructor must be present for successful compile. */ 
    } 

    @Override 
    public byte[] toBytes(ConfigurationActionMsg actionMsg){ 
     return SerializationUtils.serialize(actionMsg); 
    }} 

が、この場合、私はわからないプロセッサと消費者

@Configuration 
@ComponentScan(basePackages = {"XXX"}) 
public class KafkaConfig { 
@Bean 
public KafkaProducer<String,ConfigurationActionMsg> kafkaProducer(){ 
    Properties props = new Properties(); 
    props.put("zk.connect", "127.0.0.1:2181"); 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("acks", "all"); 
    props.put("retries", 0); 
    props.put("batch.size", 16384); 
    props.put("linger.ms", 1); 
    props.put("buffer.memory", 33554432); 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "com.atlas.configengine2.XXX.ConfigActionMessageEncoder"); 

    return new KafkaProducer<>(props); 
} 

@Bean 
public KafkaConsumer<String, ConfigurationActionMsg> kafkaConsumer(){ 

    Properties props = new Properties(); 
    props.put("zk.connect", "127.0.0.1:2181"); 
    props.put("bootstrap.servers", "localhost:9092"); 
    //We should only have one process running for consumer 
    props.put("group.id", "resolverActionTrigger"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "com.atlas.configengine2.XXX.ConfigActionMessageDecoder"); 
    KafkaConsumer<String, ConfigurationActionMsg> consumer = new KafkaConsumer<>(props); 
    consumer.subscribe(Arrays.asList("configAction")); 
    return consumer; 
}} 

のための私の設定ですプロデューサ/コンシューマをインスタンス化する適切な方法です。しかし、この方法では機能しません。私のkafkaProducerはインスタンス化できなかったからです。

いくつかのデバッグ情報:org.apache.kafka.common.KafkaException:によって引き起こさ

com.atlas.configengine2.jms.ConfigActionMessageEncoderがorg.apache.kafka.commonの インスタンスではありません。シリアル化。シリアル化

しかし、それが唯一の問題であるかどうかはわかりません。カスタムエンコーダの作成方法は?

答えて

0

org.apache.kafka.common.serialization.SerializerEncoderではありません。

CustomSerializerを参照してください。

+0

ちょっと私はリンクhttp://stackoverflow.com/questions/23755976/kafka-writing-custom-serializerに従っています。私が理解する限り、彼らは単にシリアライザとしてのEncoderクラスを使用していますか? – Acton

+0

あなたはscala(<0.8.1)で実装されている古いプロデューサを参照しています。新バージョン(0.8.2以上)では、KafkaProducerは純粋にjavaで書かれており、 'Serializer' –

関連する問題