JSONファイルを別々のソースディレクトリから読み込み、各ディレクトリごとに別々のテーブルを作成する必要があります。これを並行して実行したいと思いますが、Sparkは入れ子になったRDDをサポートしていないので、現在は順番に実行しています。それらのディレクトリを並行して読み込み/処理するには良い解決策がありますか?別のディレクトリを読み込んでScalaで個別のRDDを作成するSpark
def readJsonCreateTable(tableInfo: (String, String)) {
val df = spark
.read
.json(tableInfo._1)
df.createOrReplaceTempView(tableInfo._2)
}
val dirList = List(("/mnt/jsondir1", "temptable1"),
("/mnt/jsondir2", "temptable2"),
("/mnt/jsondir3", "temptable3"))
val dirRDD = sc.parallelize(dirList)
dirRDD.foreach(readJsonCreateTable) // Nested RDD error
が作品をdirRDD.collect.foreachする最後の行を変更したが、その後の仕事:ここ
は私がしようとしているが、それが原因ネストされたRDDSに動作しないもののサンプルスニペットです順番に実行されるため、非常に遅いです。
dirRDD.collect.par.foreachも試しましたが、ドライバ上では並列スレッドのみが実行され、他のすべてのノードは使用されません。
私はforeachAsyncを調べましたが、この状況では入れ子になっているため非同期が必ずしも並行しているとは確信していません。
これは、Spark 2.0 & Scala 2.11 via Databricksを使用しています。
===========
追加:
私はスパークでFutureActionを返すforeachAsyncを試してみましたが、それは同様にエラーが発生しました。
import scala.concurrent._
import scala.concurrent.duration._
.
.
.
val dirFuture = dirRDD.foreachAsync(readJsonCreateTable)
Await.result(dirFuture, 1 second)
、明らかSimpleFutureActionあなたはスパークドライバ上で実行されているコードを並列化するためのScala parallel collectionsまたはfuturesを使用することができます
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.SimpleFutureAction
http://stackoverflow.com/questions/41426576/is-using-parallel-collections-encouraged-in-sparkによると、これはお勧めできません: "タスク内の並列実行は完全に不透明です。その結果、自動的に必要なリソースを割り当てることができません。 " – TBhimdi
この問題は、ドライバのパラレルコードと並行して複数のSparkジョブを実行することとは異なり、並列性について議論しています。 –