2017-01-12 11 views
1

JSONファイルを別々のソースディレクトリから読み込み、各ディレクトリごとに別々のテーブルを作成する必要があります。これを並行して実行したいと思いますが、Sparkは入れ子になったRDDをサポートしていないので、現在は順番に実行しています。それらのディレクトリを並行して読み込み/処理するには良い解決策がありますか?別のディレクトリを読み込んでScalaで個別のRDDを作成するSpark

def readJsonCreateTable(tableInfo: (String, String)) { 
    val df = spark 
      .read 
      .json(tableInfo._1) 
    df.createOrReplaceTempView(tableInfo._2) 
} 

val dirList = List(("/mnt/jsondir1", "temptable1"), 
        ("/mnt/jsondir2", "temptable2"), 
        ("/mnt/jsondir3", "temptable3")) 
val dirRDD = sc.parallelize(dirList) 
dirRDD.foreach(readJsonCreateTable) // Nested RDD error 

が作品をdirRDD.collect.foreachする最後の行を変更したが、その後の仕事:ここ

は私がしようとしているが、それが原因ネストされたRDDSに動作しないもののサンプルスニペットです順番に実行されるため、非常に遅いです。

dirRDD.collect.par.foreachも試しましたが、ドライバ上では並列スレッドのみが実行され、他のすべてのノードは使用されません。

私はforeachAsyncを調べましたが、この状況では入れ子になっているため非同期が必ずしも並行しているとは確信していません。

これは、Spark 2.0 & Scala 2.11 via Databricksを使用しています。

===========
追加:

私はスパークでFutureActionを返すforeachAsyncを試してみましたが、それは同様にエラーが発生しました。

import scala.concurrent._ 
import scala.concurrent.duration._ 
. 
. 
. 
val dirFuture = dirRDD.foreachAsync(readJsonCreateTable) 
Await.result(dirFuture, 1 second) 

、明らかSimpleFutureActionあなたはスパークドライバ上で実行されているコードを並列化するためのScala parallel collectionsまたはfuturesを使用することができます

org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.SimpleFutureAction 

答えて

1

直列化可能ではありません。 Sparkドライバはスレッドセーフなので、期待通りに動作します。

ここで明示的に指定したスレッドプールと並列コレクションを使用した例です:

val dirList = List(
    ("dbfs:/databricks-datasets/flights/departuredelays.csv", "departuredelays"), 
    ("dbfs:/databricks-datasets/amazon/users/", "users") 
).par 

val pool = new scala.concurrent.forkjoin.ForkJoinPool(2) 

try { 
    dirList.tasksupport = new scala.collection.parallel.ForkJoinTaskSupport(pool) 
    dirList.foreach { case (filename, tableName) => 
    println(s"Starting to create table for $tableName") 
    val df = spark.read.json(filename) 
    println(s"Done creating table for $tableName") 
    df.createOrReplaceTempView(tableName) 
    } 
} finally { 
    pool.shutdown() // to prevent thread leaks. 
    // You could also re-use thread pools across collections. 
} 

私はDatabricksでこれを実行したとき、それは二つのテーブルを並列にロードされていたことを示すストリーミングログ出力を生成:

Starting to create table for departuredelays 
Starting to create table for users 
Done creating table for departuredelays 
Done creating table for users 

この並列性は、Spark UIのジョブのタイムラインビューに反映されました。

もちろん、このためにJavaスレッドを使用することもできます。簡単に言えば、複数のスレッドからSparkドライバAPIを呼び出すことは安全です。そのため、選択したJVM並行性フレームワークを選択し、Sparkドライバへの並列呼び出しを発行してテーブルを作成します。

+0

http://stackoverflow.com/questions/41426576/is-using-parallel-collections-encouraged-in-sparkによると、これはお勧めできません: "タスク内の並列実行は完全に不透明です。その結果、自動的に必要なリソースを割り当てることができません。 " – TBhimdi

+1

この問題は、ドライバのパラレルコードと並行して複数のSparkジョブを実行することとは異なり、並列性について議論しています。 –

関連する問題