2017-06-09 10 views
0

私はKafkaにJsonオブジェクトを生成して手動で消費しようとしていますが、org.apache.kafka.streams.examples.pageviewでJSONPOJO Serdesを使用しています。KafkaからJSONオブジェクトを消費する際にシリアル化エラーが発生しました

私のプロデューサーのコードは次のとおりです。

package JsonProducer; 

imports ... 

public class jsnPdc { 

    public static void main(String[] args) throws IOException { 

     byte[] arr= "XXXX  THIS IS TEST DATA \n XYZ".getBytes();  
     JSONObject jsn = new JSONObject(); 
     jsn.put("Header_Title", (Arrays.copyOfRange(arr, 0, 4))); 
     jsn.put("Data_Part", (Arrays.copyOfRange(arr, 4, arr.length))); 


     Properties props = new Properties(); 
     props.put("bootstrap.servers", "xxxxxxxxxxxxxxxxxxxxx:xxxx"); 
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("value.serializer", "org.apache.kafka.streams.examples.pageview.JsonPOJOSerializer"); 

     KafkaProducer<String, JSONObject> pdc = new KafkaProducer<>(props); 
     pdc.send(new ProducerRecord<String,JSONObject>("testoutput", jsn)); 

     System.in.read(); 


    } 

} 

と消費者のためのコードは次のとおりです。

package testConsumer; 

imports ... 

public class consumer_0 { 
    static public void main(String[] argv) throws ParseException { 

     //Configuration 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "xxxxxxxxxxxxxxxxxxx:xxxx"); 
     props.put("group.id", "test"); 
     props.put("enable.auto.commit", "false"); 
     props.put("auto.commit.interval.ms", "1000"); 
     props.put("session.timeout.ms", "30000"); 
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", "org.apache.kafka.streams.examples.pageview.JsonPOJODeserializer"); 


     //Create Consumer Object 
     KafkaConsumer<String, JSONObject> consumer = new KafkaConsumer<String, JSONObject>(props); 
     consumer.subscribe(Arrays.asList("testoutput")); 


     //Keep Polling Records 
     System.out.println("Polling new record...\n"); 
     while (true) { 
      ConsumerRecords<String, JSONObject> records = consumer.poll(100); 

      //Print Each Record 
      for (ConsumerRecord<String, JSONObject> record : records){ 
       JSONObject json = record.value(); 

       //Some print code, print(json) ... 

      } 
     } 
    } 
} 

そして、私はこの問題を得る:

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition testoutput-0 at offset 20491 
Caused by: org.apache.kafka.common.errors.SerializationException: java.lang.IllegalArgumentException: Unrecognized Type: [null] 
Caused by: java.lang.IllegalArgumentException: Unrecognized Type: [null] 
    at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1170) 
    at com.fasterxml.jackson.databind.type.TypeFactory.constructType(TypeFactory.java:618) 
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929) 
    at org.apache.kafka.streams.examples.pageview.JsonPOJODeserializer.deserialize(JsonPOJODeserializer.java:49) 
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:882) 
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:788) 
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:480) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1061) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
    at testConsumer.consumer_0.main(consumer_0.java:43) 

私はの値のフィールドタイプを必要としますjsonはバイト配列になります。なぜこれが起こっているのか?

答えて

0

あなたは値をシリアル化する責任を誤解しています。あなたは、

class Data { 
    private String headerTitle; 
    private String dataPart; 
    //... constructors, getters, setters 
} 

のようなものをプレーンなJavaオブジェクトを期待しているorg.apache.kafka.streams.examples.pageview.JsonPOJOSerializerを使用して、あなたはそれを与える値をシリアル化するためにカフカを言っているしかし、あなたは実際にあなたがた他の言葉でProducerRecord(にJSONObjectに合格していますカフカに渡す前にデータを自分でシリアル化しています。それで、シリアル化しようとします。)。

あなたのjsn自身をヴァーレが、あなたのvalue.serializerとしてorg.apache.kafka.common.serialization.StringDeserializerを使用してシリアライズするか、またはあなたがorg.apache.kafka.streams.examples.pageview.JsonPOJOSerializerをunsingとストリックと上記Dataようなクラスを定義し、ProducerRecordにそのクラスのinteranceを渡すことができます。

+0

ありがとうございます!今、私は最終的に何が起こっているのか理解していますが、jsnオブジェクト全体をbyte []としてシリアル化してから、消費者側でbyte []としてデシリアライズしてこの問題を修正しました。どちらも "org.apache.kafka.common.serialization.ByteArraySerde"を使用しています。 POJOとJsonPOJOSerdeを使用する場合と比較して、これが私のストリーミング/消費のパフォーマンスに影響すると思いますか? – zzlyn

+0

データの複雑なツリーを扱っている場合、POJOはより理解しやすいコードを作成します。私はjsnルートを使うほうがより効率的だと思います。(Jackson氏のObjectMapperはリフレクションで動作するため)パフォーマンスが重要な要件であれば、より速いプロファイルを作成する必要があります。 –

関連する問題