2017-10-27 8 views
2

私は興味のある最終的なデータセットをもたらすDataSet APIを使用してscalaで書かれたflinkバッチプログラムを持っています。私はそのデータセットを変数や値(例えば、文字列のリストやシーケンス)を自分のプログラム内で、どのファイルにも書く必要はありません。出来ますか?Flink:ファイルの代わりに変数にDataSetを書き込む方法

私は、フリンクがデバッグするために収集データシンクを許可することを知りました(彼らのドキュメントの唯一の例はJavaです)。しかし、これはローカル実行でのみ許されています。とにかく、私はScalaで同等のことを知らないのです。私が望むのは、全体のフリンク並列実行がプログラムの値または変数に対して行われた後に、最終的に得られるデータセットを書き出すことです。

答えて

2

まず、収集データシンクのScalaのバージョンのためにこれを試してみてください。 インポートorg.apache.flink.api.scala._ 輸入org.apache.flink.api.java.io.LocalCollectionOutputFormat。

. 
. 
val env = ExecutionEnvironment.getExecutionEnvironment 

// Create a DataSet from a list of elements 
val words = env.fromElements("w1","w2", "w3") 

var outData:java.util.List[String]= new java.util.ArrayList[String]() 
words.output(new LocalCollectionOutputFormat(outData)) 

// execute program 
env.execute("Flink Batch Scala") 
println(outData) 

第2に、データセットが1つのマシンのメモリに収まる場合、なぜ分散処理フレームワークを使用する必要がありますか?私はあなたのユースケースについてもっと考えなければならないと思います!あなたのデータセットで右transformationsを使用しようとします。

+0

ありがとうございました!誤解をおかけして申し訳ありませんが、最初のデータセットはメモリに収まらないのですが、すべての変換を適用した後の答えとしてのデータセットは非常に大きなリストからいくつかの条件を指定してグループの最大値を見つけるようなものです。 – Aitor

関連する問題