2017-11-12 15 views
0

私は2つのrawストリームを持っています。これらのストリームに参加しています。そして、参加したイベントの総数と、ない。Apache Flink:DataStream内のイベントの総数を数える方法

joinedEventDataStream.map(new RichMapFunction<JoinedEvent, Object>() { 

      @Override 
      public Object map(JoinedEvent joinedEvent) throws Exception { 

       number_of_joined_events += 1; 

       return null; 
      } 
     }); 

質問#1以下のように私はjoinedEventDataStreamにマップを使用してこれをやっている:これは、ストリーム内のイベントの数をカウントするための適切な方法は何ですか?

質問#2:質問:私はあなたの誰かが信じていないかもしれない有線の動作に気付きました。私のFlinkプログラムをIntelliJ IDEで実行すると、number_of_joined_eventsの正しい値が表示されますが、このプログラムをjarとして送信した場合は、0という正しい値が表示されます。だから私は実際のカウントの代わりにjarファイルとしてプログラムを実行するとnumber_of_joined_eventsの初期値を取得しています。これはなぜファイルが提出され、IDEではなくjarの場合にのみ起こるのですか?

答えて

1

あなたのアプローチは機能していません。 JARファイル経由でプログラムを実行するときに気づいた動作が期待されます。

私はnumber_of_joined_eventsがどのように定義されているか分かりませんが、私はあなたのプログラム内でその静的変数を想定しています。 IDEでプログラムを実行すると、単一のJVMで実行されます。したがって、すべての演算子は静的変数にアクセスできます。 JARファイルをリモートプロセスに送信すると、プログラムは別のJVM(複数のJVM)で実行され、クライアントプロセスの静的変数は決して更新されません。

1を合計したFlinkのメトリックまたはReduceFunctionを使用して、処理されたレコードの数をカウントできます。

+0

うわー、ウル天才。それは静的変数です、私はjoinedDataStremでアキュムレータを使用し、それはjarファイルで働いていました。私の週末にもかかわらず:)。ありがとう –