2017-10-27 16 views
0

私は私の専門用語がオフであってもよいし、スパークする(非常に)新しいんだけど、ここで私がやろうとしているものです:Sparkで複数のDataFramesを同時に処理する方法は?

  • 私はテーブルのスナップショットを表すCSVファイルのセットを持って毎日のために(例えば、s3://bucket/20171027/a.csv.gzに格納されているテーブルA、B、Cを呼び出す)
  • キー(id)にSpark SQLを使用してこれらのテーブルを結合してから、結合したテーブルを保存したいJSONとしてS3へ。

これを順番に(毎日)実行できますが、スパーク並列化を利用したいと考えています。

私の現在のプロセスは、おおよそ次のとおりです。

  • リストS3
  • 内のすべてのファイル
  • グループにそれらのタイムスタンプ
  • によって
  • 結果を結合するためにファイル名の配列を作成する> =タイムスタンプのマップですファイル(例:20171027 => [ "S3://foo/20171027/a.csv"、 "S3://foo/20171027/b.csv"])

次に、それぞれの日のために、私はロード各ファイルDataFrameに重複する列を取り除くためにいくつかのロジックを行い、df1.join(df2)を呼び出します。結合が完了したら、私はdf.write.json

を毎日呼び出すことができますが、Sparkにこれらの結合操作を同時に実行させる方法はわかりません。私はsc.parallelizeをシーケンスとしてタイムスタンプとともに使用しようとしましたが、SparkコンテキストにエグゼキュータのDataFramesをロードすることはできません。parallelizeを呼び出す前にDataFramesをロードすると、エグゼキュータはそれらを読み込むことができず、NullObjectExceptionをスローします。私は先物を使って調べる必要があると思っていますが、私がしようとしていることを達成するための他の選択肢があるのか​​、それとも必要以上に複雑にしているのか疑問に思っています。

+0

(https://stackoverflow.com/questions/31912858/processing-multiple-files-as-independent-rdds-in-parallel) – user8371915

+0

う[独立したRDDの並列のように複数のファイルを処理する]の可能重複なぜあなたの質問が@ user8371915に記載されているものと重複しているのか説明してください。 – eliasah

答えて

0

私が思いついた解決策は、エグゼキュータの数に等しいスレッドプールを使ってFuturesを使用することでした。毎日繰り返すことで、私はテーブルに参加し、それから自分の将来のディスクに書き込む作業を行います。スレッドプールは、並行性をエグゼキュータの数に制限します。その後、すべての先物が完了する前に完了するのを待ちます。

implicit val ec = ExecutionContext.fromExecutorService(Executors.newWorkStealingPool(numExecutors)) 
val futures = ArrayBuffer[Future[Unit]]() 

for (date <- files.keys) { 
    val f = Future { 
    // load tables from S3 into data frames 
    // join data frames on ID 
    // write joined dataframe to S3 
    } 
    futures += f 
} 

futures.foreach(f => Await.ready(f, Duration.Inf)) 
関連する問題