Apache Flinkで2つのファイルを順に処理したいと思います。Apache Flinkで2つのデータソースを連続的に処理します。
具体例として、2番目のファイルの行が最初の行に続くように、各行にインデックスを割り当てたいとします。
val env = ExecutionEnvironment.getExecutionEnvironment
val text1 = env.readTextFile("/path/to/file1")
val text2 = env.readTextFile("/path/to/file2")
val union = text1.union(text2).flatMap { ... }
は私がtext1
のすべてが最初flatMap
オペレータを介して送信されていることを確認したい、その後、text2
のすべて:代わりに、そうすることで、次のコードは、2つのファイルに行をインターリーブします。そうするための推奨される方法は何ですか?
ご協力いただきありがとうございます。
彼は 'DataSet' APIを使いたいと思います。したがって、これはカスタム 'FileInputFormat'を実装することを意味します。 –
ありがとうございます。私は私の答えに適応した。 –
答えをありがとう。私が実際にやろうとしているのは、ファイルの例よりも少し複雑です。私は、歴史ストリームとそれに続くリアルタイムストリームを処理して、状態と順序を保持したいと思います。 簡単な方法はないようです。 –