2015-11-10 13 views
7

avroファイルを含む生成されたS3パスの一部でスパークジョブ(spark v1.5.1)を実行したいとします。私はそれらをロードしています:sparkが欠落している入力ファイルを無視する方法を教えてください。

val avros = paths.map(p => sqlContext.read.avro(p)) 

いくつかのパスは存在しません。それらの空のパスを無視するようにスパークさせるにはどうすればよいですか?以前はthis answerを使用しましたが、新しいデータフレームAPIでどのように使用するのかよくわかりません。

注:私は理想的には入力パスをオプションにするリンクされた答えに似たアプローチを探しています。私は特にS3のパスの存在を明示的にチェックする必要はありません(これは厄介で開発が厄介なものになる可能性があります)。しかし、今これを実装するためのクリーンな方法がないと私の後悔です。

答えて

9

avroファイルのディレクトリを読み込むときに失敗する可能性を処理するために、scala Tryタイプを使用します。私たちは、コード内の明示的な故障の可能性を作ることができる「お試しください」、および機能的な方法でそれを処理:スパークドキュメントから

object Main extends App { 

    import scala.util.{Success, Try} 
    import org.apache.spark.{SparkConf, SparkContext} 
    import com.databricks.spark.avro._ 

    val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("example")) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

    //the first path exists, the second one doesn't 
    val paths = List("/data/1", "/data/2") 

    //Wrap the attempt to read the paths in a Try, then use collect to filter 
    //and map with a single partial function. 
    val avros = 
    paths 
     .map(p => Try(sqlContext.read.avro(p))) 
     .collect{ 
     case Success(df) => df 
     } 
    //Do whatever you want with your list of dataframes 
    avros.foreach{ df => 
    println(df.collect()) 
    } 
    sc.stop() 
} 
+0

:ドライバがメモリ不足する可能性があります() ''を収集し、しかし、 'collect()'はRDD全体を単一のマシンにフェッチするためです。 'collect()'を使わないで解決策がありますか?これは非常に大きなデータセット用です。 – jbrown

+2

これは、 'collect()'がRDD上で呼び出されたときに当てはまります。部分的な関数を含む 'collect(...)'関数を最初に呼び出すと、RDDのリストにあります。これは、RDDではなくListのcollect関数です。これは 'map'と' filter'を実行するのと同じです。最後に 'foreach'の内部で' collect() 'をもう一度使っていますが、これはRDDのリストを操作するための単なる例ですが、あなた自身のアプリケーションではそうではないと思いますが、アプローチが正しく機能していることを確認するためには簡単な結末が必要でした。 – mattinbits

+0

ありがとうございます。私はそれを試して、それが動作するかどうかを確認します。最初の 'collect'がRDDを評価し、すべてのデータをドライバノードに送ると思った。 – jbrown