2011-11-28 13 views
21

私はKafkaから読み書き中のメッセージにAvroを使用しようとしています。誰もAvroバイナリエンコーダを使用してメッセージキューに置かれるデータをエンコード/デコードする例がありますか?Avroバイナリエンコーダを使用してKafkaメッセージをエンコード/デコードする方法は?

私はカフカの部分よりもAvroの部分が必要です。あるいは、私は別の解決策を検討すべきでしょうか?基本的には、私はスペースに関してJSONのより効率的なソリューションを探しています。 AvroはJSONよりコンパクトなので、今述べたばかりです。

答えて

11

私はついにカフカのメーリングリストに尋ねて覚えていて、答えとしては完璧に働いていました。

はい、バイト配列としてメッセージを送信できます。あなたはMessageクラスのコンストラクタ を見れば、次のように表示されます -

をデフこの(バイト:配列[バイト])今

、プロデューサーセンドを見て()API - デフ

VをMessageタイプに、Kを鍵にするように設定できます(プロデューサデータ:ProducerData [K、V] *)。 キーを使用してパーティショニングを気にしない場合は、それをメッセージ タイプにも設定します。

おかげで、 ネハ、あなたは、単にデータを圧縮する検討することもでき代わりにアブロの

2

。 gzip(良好な圧縮、より高いCPU)またはLZFまたはSnappy(はるかに速く、少し遅い圧縮)のいずれかを使用します。

又は代替Smile binary JSONもある、(this extensionで)ジャクソンJavaでサポートされている:それはコンパクトバイナリ形式であり、アブロよりも使用することがはるかに簡単:JSONと同様

ObjectMapper mapper = new ObjectMapper(new SmileFactory()); 
byte[] serialized = mapper.writeValueAsBytes(pojo); 
// or back 
SomeType pojo = mapper.readValue(serialized, SomeType.class); 

基本的には同じ符号を除き異なるフォーマットのファクトリを渡すためのものです。 データサイズの観点から、SmileかAvroのどちらがよりコンパクトであるかは、ユースケースの詳細によって異なります。どちらもJSONよりもコンパクトです。

これは、JSONとSmileの両方で、同じコードで、POJOだけを使用して高速に動作するという利点があります。 GenericRecordをパックして解凍するためのコード生成が必要なAvro、または多くの手動コードと比較します。

7

あなたは(カフカの一部が既に回答された)アブロメッセージからバイト配列を取得したい場合は、バイナリエンコーダを使用します。

GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); 
    ByteArrayOutputStream os = new ByteArrayOutputStream(); 
    try { 
     Encoder e = EncoderFactory.get().binaryEncoder(os, null); 
     writer.write(record, e); 
     e.flush(); 
     byte[] byteData = os.toByteArray(); 
    } finally { 
     os.close(); 
    } 
+0

このbyteDataをKafkaBrokerに送信し、それをコンソールコンシューマから読み取ることはできますか?プロデューサキーシリアライザは何が必要ですか? – user2441441

+0

レスポンスで述べたように、カフカの部分は他の回答に記載されています - http://stackoverflow.com/a/8348264/5266とhttp://stackoverflow.com/a/32341917/5266 –

12

これは基本的な例です。私は複数のパーティション/トピックでそれを試していません。

//サンプルプロデューサーコード

import org.apache.avro.Schema; 
import org.apache.avro.generic.GenericData; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.avro.io.*; 
import org.apache.avro.specific.SpecificDatumReader; 
import org.apache.avro.specific.SpecificDatumWriter; 
import org.apache.commons.codec.DecoderException; 
import org.apache.commons.codec.binary.Hex; 
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 
import java.io.ByteArrayOutputStream; 
import java.io.File; 
import java.io.IOException; 
import java.nio.charset.Charset; 
import java.util.Properties; 


public class ProducerTest { 

    void producer(Schema schema) throws IOException { 

     Properties props = new Properties(); 
     props.put("metadata.broker.list", "0:9092"); 
     props.put("serializer.class", "kafka.serializer.DefaultEncoder"); 
     props.put("request.required.acks", "1"); 
     ProducerConfig config = new ProducerConfig(props); 
     Producer<String, byte[]> producer = new Producer<String, byte[]>(config); 
     GenericRecord payload1 = new GenericData.Record(schema); 
     //Step2 : Put data in that genericrecord object 
     payload1.put("desc", "'testdata'"); 
     //payload1.put("name", "अasa"); 
     payload1.put("name", "dbevent1"); 
     payload1.put("id", 111); 
     System.out.println("Original Message : "+ payload1); 
     //Step3 : Serialize the object to a bytearray 
     DatumWriter<GenericRecord>writer = new SpecificDatumWriter<GenericRecord>(schema); 
     ByteArrayOutputStream out = new ByteArrayOutputStream(); 
     BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); 
     writer.write(payload1, encoder); 
     encoder.flush(); 
     out.close(); 

     byte[] serializedBytes = out.toByteArray(); 
     System.out.println("Sending message in bytes : " + serializedBytes); 
     //String serializedHex = Hex.encodeHexString(serializedBytes); 
     //System.out.println("Serialized Hex String : " + serializedHex); 
     KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>("page_views", serializedBytes); 
     producer.send(message); 
     producer.close(); 

    } 


    public static void main(String[] args) throws IOException, DecoderException { 
     ProducerTest test = new ProducerTest(); 
     Schema schema = new Schema.Parser().parse(new File("src/test_schema.avsc")); 
     test.producer(schema); 
    } 
} 

//サンプルコンシューマコード

パート1:消費者グループコード:複数のパーティション/トピックに対して複数の消費者よりも多くを持つことができるよう。

import kafka.consumer.ConsumerConfig; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 

import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
import java.util.concurrent.Executor; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

/** 
* Created by on 9/1/15. 
*/ 
public class ConsumerGroupExample { 
    private final ConsumerConnector consumer; 
    private final String topic; 
    private ExecutorService executor; 

    public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic){ 
     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
       createConsumerConfig(a_zookeeper, a_groupId)); 
     this.topic = a_topic; 
    } 

    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId){ 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", a_zookeeper); 
     props.put("group.id", a_groupId); 
     props.put("zookeeper.session.timeout.ms", "400"); 
     props.put("zookeeper.sync.time.ms", "200"); 
     props.put("auto.commit.interval.ms", "1000"); 

     return new ConsumerConfig(props); 
    } 

    public void shutdown(){ 
     if (consumer!=null) consumer.shutdown(); 
     if (executor!=null) executor.shutdown(); 
     System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); 
     try{ 
      if(!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)){ 

      } 
     }catch(InterruptedException e){ 
      System.out.println("Interrupted"); 
     } 

    } 


    public void run(int a_numThreads){ 
     //Make a map of topic as key and no. of threads for that topic 
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
     topicCountMap.put(topic, new Integer(a_numThreads)); 
     //Create message streams for each topic 
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
     List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 

     //initialize thread pool 
     executor = Executors.newFixedThreadPool(a_numThreads); 
     //start consuming from thread 
     int threadNumber = 0; 
     for (final KafkaStream stream : streams) { 
      executor.submit(new ConsumerTest(stream, threadNumber)); 
      threadNumber++; 
     } 
    } 
    public static void main(String[] args) { 
     String zooKeeper = args[0]; 
     String groupId = args[1]; 
     String topic = args[2]; 
     int threads = Integer.parseInt(args[3]); 

     ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); 
     example.run(threads); 

     try { 
      Thread.sleep(10000); 
     } catch (InterruptedException ie) { 

     } 
     example.shutdown(); 
    } 


} 

パート2:実際にメッセージを消費する個人消費者。

import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.message.MessageAndMetadata; 
import org.apache.avro.Schema; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.avro.generic.IndexedRecord; 
import org.apache.avro.io.DatumReader; 
import org.apache.avro.io.Decoder; 
import org.apache.avro.io.DecoderFactory; 
import org.apache.avro.specific.SpecificDatumReader; 
import org.apache.commons.codec.binary.Hex; 

import java.io.File; 
import java.io.IOException; 

public class ConsumerTest implements Runnable{ 

    private KafkaStream m_stream; 
    private int m_threadNumber; 

    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { 
     m_threadNumber = a_threadNumber; 
     m_stream = a_stream; 
    } 

    public void run(){ 
     ConsumerIterator<byte[], byte[]>it = m_stream.iterator(); 
     while(it.hasNext()) 
     { 
      try { 
       //System.out.println("Encoded Message received : " + message_received); 
       //byte[] input = Hex.decodeHex(it.next().message().toString().toCharArray()); 
       //System.out.println("Deserializied Byte array : " + input); 
       byte[] received_message = it.next().message(); 
       System.out.println(received_message); 
       Schema schema = null; 
       schema = new Schema.Parser().parse(new File("src/test_schema.avsc")); 
       DatumReader<GenericRecord> reader = new SpecificDatumReader<GenericRecord>(schema); 
       Decoder decoder = DecoderFactory.get().binaryDecoder(received_message, null); 
       GenericRecord payload2 = null; 
       payload2 = reader.read(null, decoder); 
       System.out.println("Message received : " + payload2); 
      }catch (Exception e) { 
       e.printStackTrace(); 
       System.out.println(e); 
      } 
     } 

    } 


} 

テストAVROスキーマ:注意すべき

{ 
    "namespace": "xyz.test", 
    "type": "record", 
    "name": "payload", 
    "fields":[ 
     { 
      "name": "name", "type": "string" 
     }, 
     { 
      "name": "id", "type": ["int", "null"] 
     }, 
     { 
      "name": "desc", "type": ["string", "null"] 
     } 
    ] 
} 

重要なものは以下のとおりです。

  1. youllは箱の外にこのコードを実行するために、標準カフカとアブロ瓶を必要としています。

  2. 非常に重要ですprops.put( "serializer.class"、 "kafka.serializer.DefaultEncoder"); Don t use stringEncoder as that wonバイト配列をメッセージとして送信している場合は動作しません。

  3. バイト[]を16進文字列に変換し、それをコンシューマコンバートで16進文字列をバイト[]に変換してから元のメッセージに変換することができます。

  4. ここで述べたように、飼育係とブローカーを実行して:http://kafka.apache.org/documentation.html#quickstartというトピックを作成し、「page_views」などのトピックを作成します。

  5. ProducerTest.javaを実行してからConsumerGroupExample.javaを実行し、生成され消費されるavroデータを確認します。

+0

助けてくれてありがとう! !私はこれを試してみましたが、消費者コードではit.hasNext()関数がfalseを返すので、コントロールはwhileループに入ることはありません。私は何が間違っていることができるか考えていますか? –

3

更新済み回答。

カフカは、(フォーマットSBT)のMavenとアブロシリアライザ/デシリアライザを有する座標:

"io.confluent" % "kafka-avro-serializer" % "3.0.0" 

あなたはKafkaProducerコンストラクタにKafkaAvroSerializerのインスタンスを渡します。

次に、Avro GenericRecordインスタンスを作成し、それらをKafkaProducerで送信できるKafka ProducerRecordインスタンス内の値として使用できます。

カフカの消費者側では、KafkaAvroDeserializerとKafkaConsumerを使用します。

+0

簡潔で完全な例を提供できますか? –

+1

これは、Confluent自身のMavenリポジトリを追加した場合にのみ機能します。これは、アーティファクトをmaven centralに公開しないためです:http://packages.confluent.io/maven –

関連する問題