2016-08-07 12 views
0

私はlogstashを初めて使用しています。logstashをkafkaに出力 - パーティションキーを設定

ファイルからjsonデータを読み込み、kafkaに送信しようとしています。私が読んでいるjsonには、トピック、パーティション、実際のメッセージのキーが含まれています。

パーティションキーの設定方法がわかりません。

input { 
    file { 
     path => "/data/files/*.*" 
     start_position => "beginning" 
     codec => "json" 
    } 
} 
filter { 
    json { 
     source => message 
    } 
} 
output { 
    kafka { 
     bootstrap_servers => "localhost:9092" 
     topic_id => "%{topic}" 
     message_key => "%{dataAsString}" 
    } 
} 

ヘルプしてください...

よろしく、IDO

答えて

0

私の知る限り、あなたはLogstashからパーティション番号を設定することはできません。あなたが持っているのは、logstash kafkaプロデューサーがパーティションを選択するために使用するmessage_keyだけです。下記のKafka DefaultPartitioner.scalaを確認してください。

package kafka.producer 

private[kafka] class DefaultPartitioner[T] extends Partitioner[T] { 
    private val random = new java.util.Random 

    def partition(key: T, numPartitions: Int): Int = { 
    if(key == null) 
     random.nextInt(numPartitions) 
    else 
     key.hashCode % numPartitions 
    } 
} 

キーを指定しないと分かりますが、ランダムなパーティションが選択されています。キーを指定すると、キーはハッシュ化され、使用可能なパーティション間で選択されるように変更されます。

あなたが求めていることを達成するには、このようなクラスを書く必要があります。このクラスを指定できるようにlogstashプラグインを変更し、そのプラグイン内でパーティション番号を選択します。

Apache Flumeでは、デフォルトのパーティショナークラスを設定できますが、logstash kafka出力プラグインで同様のattirbuteが表示されません。

関連する問題