2016-08-03 18 views
0

私は現在Sparkを調べています。私は次のタスクに直面しています - RDDを取得し、特定の基準に基づいてパーティションを分割し、S3バケット内の異なるフォルダに複数のファイルを書き込む。スパークRDD foreachPartition to S3

S3にアップロードするまでは問題ありません。私はSOのこの問題に関連するすべての質問を読んで、AmazonS3ClientまたはsaveToTextFileのRDDのどちらかの方法を使用できることがわかりました。私が直面する2つの問題があります。

  1. 私はAmazonS3Clientで行く場合、私はjava.io.NotSerializableExceptionを取得するコードは、スパークドライバからそれをシリアル化する必要が労働者に送られたと明らかにAmazonS3Clientがサポートしていないされているのでそれ。

  2. 私がsaveToTextFileと一緒に行くと、同様の問題が発生します。私がforeachPartitionループに入るとき、私はIterable[T](この場合はp)を取得する必要があります。saveToTextFileを使用したい場合は、IterableのRDDを作成する必要がありますので、parallelizeを作成する必要があります。問題は、SparkContext sc(正当な理由で)がシリアル化されないことです。

rdd.foreachPartition { p => sc.parallelize(p.toSeq).saveAsTextFile(s"s3n://") }

すべてのヘルプは大歓迎されます。

答えて

2

これを行う必要はありません。あなただけのRDDとsaveAsTextFileを使用することができます。

rdd.saveAsTextFile(s"s3n://dir/to/aux/file") 

saveAsTextFileは、ファイルの多くの部分でフォルダ内のS3に書き込みます(パーティションなど多くの部品として)。必要に応じて、1つのファイルにマージすることができます。

def mergeToS3(srcPath: String, dstPath: String, sc: SparkContext): Unit = { 
    val hadoopConfig = sc.hadoopConfiguration 
    val fs = FileSystem.get(new URI(srcPath), hadoopConfig) 
    FileUtil.copyMerge(fs, new Path(srcPath), fs, new Path(dstPath), true, hadoopConfig, null) 
    } 

    mergeToS3("s3n://dir/to/aux/file", "s3n://dir/to/singleFile",sc) 
関連する問題