私はKafka 0.8ベータ版を使用しています。異なるオブジェクトを送信し、自分のエンコーダを使用してシリアル化し、それらを既存のブローカ構成に送信することで、混乱させようとしています。今のところ私はDefaultEncoderを動作させようとしています。Apache Kafkaデフォルトのエンコーダが動作しない
StringEncoderのブローカとすべての設定と作業がありますが、純粋なbyte []だけを含む他のデータ型をブローカで送受信することはできません。
プロデューサーのための私のコードは次のとおりです。
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
public class ProducerTest {
public static void main(String[] args) {
long events = 5;
Random rnd = new Random();
rnd.setSeed(new Date().getTime());
Properties props = new Properties();
props.setProperty("metadata.broker.list", "localhost:9093,localhost:9094");
props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
props.setProperty("partitioner.class", "example.producer.SimplePartitioner");
props.setProperty("request.required.acks", "1");
props.setProperty("producer.type", "async");
props.setProperty("batch.num.messages", "4");
ProducerConfig config = new ProducerConfig(props);
Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(config);
for (long nEvents = 0; nEvents < events; nEvents++) {
byte[] a = "Hello".getBytes();
byte[] b = "There".getBytes();
KeyedMessage<byte[], byte[]> data = new KeyedMessage<byte[], byte[]>("page_visits", a, b);
producer.send(data);
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.close();
}
}
私はhere与えられた例のように同じSimplePartitionerを使用し、文字列ですべてのバイト配列を交換しkafka.serializer.StringEncoderにシリアライザを変更することは完璧に動作します。参考のため
は、SimplePartitioner:
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner<String> {
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(String key, int a_numPartitions) {
int partition = 0;
int offset = key.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt(key.substring(offset+1)) % a_numPartitions;
}
return partition;
}
}
私が間違って何をしているのですか?
パーティショナーの特定の戻り値をハードコーディングするのではなく、あなたのpartitioner.classプロパティのデフォルトのkafka.producer.DefaultPartitionerに固執してください。 – gazarsgo
これは試乗のためのものです。しかし、ここでは、デフォルトのパーティショナーが動作しないシナリオを示します。メッセージの特定のサブシーケンスを、生成された順序で厳密に消費させたいと仮定します。これは、デフォルトではキーのハッシュを使用するだけなので、デフォルトのパーティショナーを使用すると悲惨に失敗します。これは予測できません。代わりに、独自のカスタムパーティショナーを作成し、サブシーケンスを検出する方法がある場合は、それらを同じパーティションに割り当てることができます。この正確なユースケースは私のアプリケーションで発生しました。 –