2016-12-09 9 views
3

私は、ファイルから情報を読み取り、各行を保存し、この状態を使って別のストリームをフィルタリングする必要があるFlinkのユースケースを持っています。Flink:状態を保存して別のストリームで使用する方法は?

connectオペレーターとRichCoFlatMapFunctionではこれがすべて機能していますが、複雑すぎると感じています。また、私は状態のすべてがファイルからロードされる前にflatMap2が実行を開始できることを心配:負荷状態のこのパターンを達成するために

fileStream 
    .connect(partRecordStream.keyBy((KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId())) 
    .keyBy((KeySelector<String, String>) partId -> partId, (KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId()) 
    .flatMap(new RichCoFlatMapFunction<String, PartRecord, PartRecord>() { 
     private transient ValueState<String> storedPartId; 
     @Override 
     public void flatMap1(String partId, Collector<PartRecord> out) throws Exception { 
      // store state 
      storedPartId.update(partId); 
     } 

     @Override 
     public void flatMap2(PartRecord record, Collector<PartRecord> out) throws Exception { 
      if (record.getPartId().equals(storedPartId.value())) { 
       out.collect(record); 
      } else { 
       // do nothing 
      } 
     } 

     @Override 
     public void open(Configuration parameters) throws Exception { 
      ValueStateDescriptor<String> descriptor = 
        new ValueStateDescriptor<>(
          "partId", // the state name 
          TypeInformation.of(new TypeHint<String>() {}), 
          null); 
      storedPartId = getRuntimeContext().getState(descriptor); 
     } 
    }); 

は(FLINK 1.1.3のように)より良い方法があり、その後のストリームでそれを使用しますか?

答えて

2

CoFlatMapFunctionについてのご意見は正しいですか? flatMap1flatMap2が呼び出される順序は制御できず、データの到着順序に依存します。したがって、すべてのデータがflatMap1によって読み取られる前にflatMap2が呼び出される可能性があります。

ストリームの処理を開始する前にすべてのデータを読み取る唯一の方法は、メソッドのデータをRichFlatMapFunctionで消費することです。つまり、手動でファイルを読み込んで解析する必要があります。

これは基本的にブロードキャスト結合戦略です。つまり、オペレータの各並列インスタンスがこれを行います。欠点は、ファイルのデータが複製されることです。メリットは、「メイン」ストリームをシャッフルする必要がないことです(keyBy()を使用する必要はありません)。

+0

わかりました。だから私は 'open()'メソッド内でファイルを解析することができますが、 'keyBy()'上流を使わない限り、演算子のインスタンスは1つしかありません。しかしこれは事実上シリアル操作になりますか? – epb

+1

'FlatMap'演算子は、' keyBy() 'を呼び出さずに並行して実行することもできます。演算子の並列性を指定するだけです。ただし、データはパラレルスレッド全体にランダムに分散されます。 'keyBy'はデータをハッシュ分割します。 'FlatMap'演算子が複数ある場合、各演算子はファイルを読み込んで状態を保持します。したがって、IOとメモリの使用量は冗長になりますが、オペレータは並行して実行されます。 –

関連する問題