2017-07-27 6 views
0

Apache Flinkで2つのファイルを順に処理したいと思います。Apache Flinkで2つのデータソースを連続的に処理します。

具体例として、2番目のファイルの行が最初の行に続くように、各行にインデックスを割り当てたいとします。

val env = ExecutionEnvironment.getExecutionEnvironment 

val text1 = env.readTextFile("/path/to/file1") 
val text2 = env.readTextFile("/path/to/file2") 

val union = text1.union(text2).flatMap { ... } 

は私がtext1のすべてが最初flatMapオペレータを介して送信されていることを確認したい、その後、text2のすべて:代わりに、そうすることで、次のコードは、2つのファイルに行をインターリーブします。そうするための推奨される方法は何ですか?

ご協力いただきありがとうございます。

答えて

1

DataSet.union()は入力間の順序保証はありません。同じ入力パーティションからのレコードは、順序どおりに残りますが、他の入力からのレコードとマージされます。

しかし、より根本的な問題があります。 Flinkはパラレルデータプロセッサです。データを並行して処理する場合、大域順序は保持できません。たとえば、Flinkがファイルを並行して読み込む場合、これらのファイルを分割して各分割を独立して処理しようとします。分割は特別な順序なしで引き渡されます。したがって、1つのファイルのレコードはすでにシャッフルされています。ジョブ全体の並列性を1に設定し、この作業を行うにはカスタムInputFormatを実装する必要があります。

あなたはその作業を行うことができますが、それは並行ではなく、多くのことを微調整する必要があります。私はFlinkがそのような仕事のための最良のツールだとは思わない。 あなたは単純なunixコマンドラインツールを使用してファイルを連結することを検討しましたか?

+0

彼は 'DataSet' APIを使いたいと思います。したがって、これはカスタム 'FileInputFormat'を実装することを意味します。 –

+0

ありがとうございます。私は私の答えに適応した。 –

+0

答えをありがとう。私が実際にやろうとしているのは、ファイルの例よりも少し複雑です。私は、歴史ストリームとそれに続くリアルタイムストリームを処理して、状態と順序を保持したいと思います。 簡単な方法はないようです。 –

関連する問題