Igniteのストリーミング部分について質問があります。apache ignite data streamer
私が理解しているのは、キャッシュにデータをインポートする方法ですが、ストリーム受信者が他のカスタムロジックを適用するように設定できることもわかります。
私は受信者クラスとクラスを作成してストリームにデータを注入しようとしましたが(サーバーモードでは2つのメインインスタンスと2つのIgniteインスタンス)、 "ちょうど"ストリーマのキャッシュにデータがあります受信ロジックに処理されるカスタムロジックはありません)。だから、もし私が何かを見逃してしまったのか、それともStreamsがIgniteになったのかよく分からないのかと尋ねています。
送信者の部分を受信者に入れると、私は印刷物を持っています。
誰かが私がやっていること(または理解)が間違っていることを知っていますか?
レシーバクラス:
public class Receiver {
public static void main(String[] args){
IgniteConfiguration igniteConfig = new IgniteConfiguration();
CacheConfiguration<String, String> cacheConfig = new CacheConfiguration<>("CacheStream");
igniteConfig.setCacheConfiguration(cacheConfig);
Ignite ignite = Ignition.getOrStart(igniteConfig);
IgniteDataStreamer<String, String> streamer = ignite.dataStreamer("CacheStream");
streamer.receiver(StreamVisitor.from((cacheLambda, e) -> {
System.out.println("Value : " + e.getValue());
}));
}
}
送信者クラス:
public class Sender {
public static void main(String[] args){
IgniteConfiguration igniteConfig = new IgniteConfiguration();
CacheConfiguration<String, String> cacheConfig = new CacheConfiguration<>("CacheStream");
igniteConfig.setCacheConfiguration(cacheConfig);
Ignite ignite = Ignition.getOrStart(igniteConfig);
IgniteDataStreamer<String, String> streamer = ignite.dataStreamer("CacheStream");
for(int i = 0 ; i < 10 ; i++){
streamer.addData("key-"+i, "value-"+i);
}
streamer.flush();
}
}
敬具
ありがとうございました。それは私が考えていたことです...気にしないで、私はメッセージングの部分を使用します。他の機能の代わりにStreamsを使用するアーキテクチャ上の理由はありますか? Streamsは、抽象レイヤーを作るために、キャッシュと計算、またはメッセージングの組み合わせであると私は考えました。これは、単純な分散オーケストレーターの一種を提供することができます、Igniteはそれを行うために必要なすべてを持っています... –
DataStreamerはIgniteのキャッシュに大量の連続したデータストリームを注入するように構築されました。内部的には、内部的にキーを一緒にバッチし、それらのバッチをデータがキャッシュされるノードと一緒に配置します。しかし、Receiverを使用したい場合でも、それを使用することができます。これは、データがキャッシュされるノードで直接呼び出されます。 –
ありがとうございました:)私は受信機の役割とその仕組みをよく理解していないと思っています。ありがとうございました。 –