2013-09-26 13 views
5

私はKafka 0.8ベータ版を使用しています。異なるオブジェクトを送信し、自分のエンコーダを使用してシリアル化し、それらを既存のブローカ構成に送信することで、混乱させようとしています。今のところ私はDefaultEncoderを動作させようとしています。Apache Kafkaデフォルトのエンコーダが動作しない

StringEncoderのブローカとすべての設定と作業がありますが、純粋なbyte []だけを含む他のデータ型をブローカで送受信することはできません。

プロデューサーのための私のコードは次のとおりです。

import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

import java.util.Date; 
import java.util.Properties; 
import java.util.Random; 


public class ProducerTest { 
    public static void main(String[] args) { 
     long events = 5; 
     Random rnd = new Random(); 
     rnd.setSeed(new Date().getTime()); 
     Properties props = new Properties(); 
     props.setProperty("metadata.broker.list", "localhost:9093,localhost:9094"); 
     props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); 
     props.setProperty("partitioner.class", "example.producer.SimplePartitioner"); 
     props.setProperty("request.required.acks", "1"); 
     props.setProperty("producer.type", "async"); 
     props.setProperty("batch.num.messages", "4"); 

     ProducerConfig config = new ProducerConfig(props); 
     Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(config); 
     for (long nEvents = 0; nEvents < events; nEvents++) { 
      byte[] a = "Hello".getBytes(); 
      byte[] b = "There".getBytes(); 

      KeyedMessage<byte[], byte[]> data = new KeyedMessage<byte[], byte[]>("page_visits", a, b); 
      producer.send(data); 
     } 
     try { 
      Thread.sleep(5000); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     producer.close(); 
    } 
} 

私はhere与えられた例のように同じSimplePartitionerを使用し、文字列ですべてのバイト配列を交換しkafka.serializer.StringEncoderにシリアライザを変更することは完璧に動作します。参考のため

は、SimplePartitioner:

import kafka.producer.Partitioner; 
import kafka.utils.VerifiableProperties; 

public class SimplePartitioner implements Partitioner<String> { 
    public SimplePartitioner (VerifiableProperties props) { 

    } 

    public int partition(String key, int a_numPartitions) { 
     int partition = 0; 
     int offset = key.lastIndexOf('.'); 
     if (offset > 0) { 
      partition = Integer.parseInt(key.substring(offset+1)) % a_numPartitions; 
     } 
     return partition; 
    } 

} 

私が間違って何をしているのですか?

答えて

6

パーティションクラスSimplePartitionerは、文字列にのみ適用できます。プロデューサを非同期で実行しようとすると、ブローカに送信する前にエンコーディングとパーティショニングを処理する別のスレッドが作成されます。このスレッドは、SimplePartitionerが文字列に対してのみ機能することを認識したときにロードブロッキングを起こしますが、別のスレッドであるため例外がスローされないため、スレッドは間違っていることを知らさずに終了します。

我々は、例えば、[]バイトを受け入れるようにSimplePartitionerを変更する場合:

import kafka.producer.Partitioner; 
import kafka.utils.VerifiableProperties; 

public class SimplePartitioner implements Partitioner<byte[]> { 
    public SimplePartitioner (VerifiableProperties props) { 

    } 

    public int partition(byte[] key, int a_numPartitions) { 
     int partition = 0; 
     return partition; 
    } 

} 

これは今完璧に動作します。

+0

パーティショナーの特定の戻り値をハードコーディングするのではなく、あなたのpartitioner.classプロパティのデフォルトのkafka.producer.DefaultPartitionerに固執してください。 – gazarsgo

+0

これは試乗のためのものです。しかし、ここでは、デフォルトのパーティショナーが動作しないシナリオを示します。メッセージの特定のサブシーケンスを、生成された順序で厳密に消費させたいと仮定します。これは、デフォルトではキーのハッシュを使用するだけなので、デフォルトのパーティショナーを使用すると悲惨に失敗します。これは予測できません。代わりに、独自のカスタムパーティショナーを作成し、サブシーケンスを検出する方法がある場合は、それらを同じパーティションに割り当てることができます。この正確なユースケースは私のアプリケーションで発生しました。 –

関連する問題