2016-12-14 17 views
1

私はIgniteとKafkaの統合を試みて、kafkaメッセージをIgniteキャッシュに入れようとしています。IgniteとKafkaの統合

私のメッセージキーはランダムな文字列(のIgniteを操作するには、カフカのメッセージキーがnullにすることはできません)で、かつのIgniteが受信すると値が人のためのJSON文字列表現(Javaクラス)

ですそのようなメッセージは、Igniteはキャッシュキーとしてメッセージのキー(私の場合はランダムな文字列)を使用するように見えます。

メッセージキーを人のIDに変更して、キャッシュに入れることは可能ですか?

streamer.receiver(new StreamReceiver)

 streamer.receiver(new StreamReceiver<String, String>() { 
      public void receive(IgniteCache<String, String> cache, Collection<Map.Entry<String, String>> entries) throws IgniteException { 
       for (Map.Entry<String, String> entry : entries) { 
        Person p = fromJson(entry.getValue()); 
        //ignore the message key,and use person id as the cache key 
        cache.put(p.getId(), p); 
       } 
      } 
     }); 

が、これは推奨される方法です実行可能であることを見えますか? StreamReceiverでのcache.putの呼び出しが正しい方法であるかどうかはわかりません。これは、キャッシュに書き込む前の前処理ステップにすぎないからです。

答えて

2

Data Streamerはすべてのキーをキャッシュアフィニティノードにマップし、エントリのバッチを作成し、アフィニティノードにバッチを送信します。 StreamReceiverがあなたのエントリーを受け取ったら、PersonのIDを取得し、cache.put(K, V)を呼び出します。あなたのキーを対応するキャッシュ・アフィニティ・ノードにマッピングして、このノードに更新要求を送信することになる。

すべてがうまく見えます。しかし、あなたのランダムキーをKafkaからマップした結果と、PersonのIDをマッピングした結果は異なっています(おそらく異なるノード)。結果として、冗長ネットワークホップのためにパフォーマンスが低下します。残念ながら、現在のKafkaStreamer実装は、ストリームタプル抽出器をサポートしていない(例えば、StreamSingleTupleExtractorクラスを参照)。しかし、既存のカフカストリーマ実装を例として簡単に作成することができます。

また、あなたはカフカのメッセージからのID「Personを抽出するために、S keyDecodervalDecoder」をKafkaStreamer使用を試すことができます。私は確信していませんが、それは助けることができます。

+0

ありがとう@a_guraの有益な回答! – Tom