2016-06-14 5 views
0

私はKafkaを非常に新しくしましたが、今日はさまざまなパーティションのKafkaトピックでメッセージを生成するJava Producerの作成を試みました。KafkaプロデューサーJava APIがすべてのトピックパーティションにメッセージを配信していません

最初に、TestProducerSimplePartitionerという2つのクラスを作成したパッケージraggieKafkaを作成しました。

TestProducerクラスには、コードを次しています

package raggieKafka; 

import java.io.BufferedReader; 
import java.io.InputStreamReader; 
import java.util.*; 
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

public class TestProducer{ 

    public static void main(String args[]) throws Exception 
    { 
     long events = 0; 

     BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); 
     events = Integer.parseInt(reader.readLine()); 
     Random rnd = new Random(); 

     Properties props = new Properties(); 
     props.put("metadata.broker.list", "localhost:9092"); 
     props.put("topic.metadata.refresh.interval.ms", "1"); 
     props.put("serializer.class", "kafka.serializer.StringEncoder"); 
     props.put("partitioner.class", "raggieKafka.SimplePartitioner"); 
     props.put("request.required.acks", "1"); 

     ProducerConfig config = new ProducerConfig(props); 
     Producer<String, String> prod = new Producer<String, String>(config); 

     for(long i = 0; i < events; i++) 
     { 
      long runtime = new Date().getTime(); 
      String ip = "192.168.2." + rnd.nextInt(255); 
      String msg = runtime + ",www.example.com, " + ip; 
      KeyedMessage<String,String> data = new KeyedMessage<String, String>("page_visits", ip, msg); 
      prod.send(data); 
     } 
     prod.close(); 
    } 
} 

SimplePartitionerクラスには、次のコードしている:これらのJavaプログラムをコンパイルする前に

package raggieKafka; 

import kafka.producer.Partitioner; 
import kafka.utils.VerifiableProperties; 

public class SimplePartitioner implements Partitioner{ 

    public SimplePartitioner(VerifiableProperties props) 
    { 

    } 

    public int partition(Object Key, int a_numPartitions) 
    { 
     int partition = 0; 
     String stringKey = (String) Key; 
     int offset = stringKey.indexOf(stringKey); 

     if(offset > 0) 
     { 
      partition = Integer.parseInt(stringKey.substring(offset+1)) % a_numPartitions; 
     } 
     return partition; 
    } 
} 

を私はカフカブローカーに関するトピックを作成しました:

C:\kafka_2.11-0.9.0.1>.\bin\windows\kafka-topics.bat --create --topic page_visit 
s --zookeeper localhost:2181 --partitions 5 --replication-factor 1 
WARNING: Due to limitations in metric names, topics with a period ('.') or under 
score ('_') could collide. To avoid issues it is best to use either, but not bot 
h. 
Created topic "page_visits". 

今、私がコンパイルすると、すべてのメッセージが1つのパーティション、つまりすべてのメッセージがポストされているにもかかわらずpage_visits-0に置かれますが、残りのすべてのパーティションは空のまま残ります。

私のJavaプロデューサが私のメッセージを他のパーティションに配布していない理由を教えてもらえますか?

Infactは、私はGoogleで見て、その後、もう一つのプロパティを追加しました:

props.put("topic.metadata.refresh.interval.ms", "1"); 

それでもプロデューサーは、すべてのトピックにメッセージを生産されていません。

お願いします。

答えて

2

あなたSimplePartitionerコードは次の行にバグがあり

int offset = stringKey.indexOf(stringKey); 

それはいつもあなたのオフセットは常に0に等しく、それは決して0より大きいあなたの場合、ブロックが実行され得ることはありませんよう0を返します。そして最後に、常にパーティション0を返します。

解決方法:キーがIPアドレスであるため、次の変更が期待通りに機能する可能性があります。

int offset = stringKey.lastIndexOf('.'); 

これが役に立ちます。

+0

ありがとうございます。私がやったばかげたミスは、私を夢中にさせた。もう一度、訂正に感謝します。乾杯。 –

関連する問題