2016-10-24 13 views
1

私たちが持っているキーに私自身のパーティショナーベースを書く必要があります。独自のカスタムパーティショナーを作成できるようです。彼らはkafka用の独自のカスタムパーティショナーを作成

生産者が自分の好きな話題にデータを公開すると言うカフカメインのサイトから

。プロデューサは、 で、どのレコードをどのパーティションに割り当てるかを選択します。 これは、ラウンドロビン方式で、単に のバランスロードにすることも、いくつかのセマンティックパーティション 関数(レコードのいくつかのキーに基づいて言います)に従って行うこともできます。 2回目の分割で を使用する方法の詳細私の場合は

、我々はデータが入るべきパーティションを決定するために、この数式を使用したいので、我々はそれのため10 partitionsを持っていますトピックを与えられました。

ここ
partition = client_id % MOD 10 

client_idが鍵となると、それは常に、それは常にlongデータ型になり、数値になります。私はclient_idのためにどのパーティションを使うべきか教えてくれるカフカプロデューサー用の独自のカスタムパーティショナーを書くことができます。

私はPartitionerクラスを実装していくつかの変更を加えなければなりませんでしたが、keyBytes変数を使用して上記の式に基づいてパーティションを把握する方法がわかりません。

@Override 
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, 
    Cluster cluster) { 
    // TODO Auto-generated method stub 
    return 0; 
} 

私はKafka 0.10.0.0バージョンを実行しています。

答えて

1

keyBytesは、パーティションを作成するためのシリアル化されたキーです。パーティションを行うには、 'key'(Object型)を直接使うことができます。

0

キーを指定した場合(つまり、nullではない)、パーティショナーは指定しない場合、カフカは意図したとおりの操作を行います。オライリーメディアによる:

キーが存在し、デフォルトのパーティショナを使用する場合は、カフカがそのを使用して(キーをハッシュします:ここでは

は著書「Definitive Guideのカフカ」から取られた抜粋です独自のハッシュアルゴリズムを使用するため、Javaをアップグレードするとハッシュ値は変更されません)、結果を使用してメッセージを特定のパーティションにマップします。

関連する問題