2017-09-18 11 views
0

I持って自分のスパークアプリケーションで次の方法:スパークタスクに新しいS3クライアントのインスタンスを作成します

private def downloadChunk(sitemChunk : DataChunk) : String = { 

    val s3Client = new AmazonS3Client() 
    val fileName = s3Client.getObject(....) 

    fileName 
} 

それはスパークタスクで実行されますときS3からファイルをダウンロードし、基本的には、

val mydata = sc.parallelize(listOfChunks) 
mydata.map(x => downloadChunk(x)).collect 

私はs3Clientの新しいインスタンスを各タスクに作成しているのではないかと思いますが、より良い方法はありますか?

+0

新しいクライアントインスタンスを最もリソース集約的な操作にしていますか? downloadChunkを別のエグゼキュータ(クラスタノードなど)で実行できる場合、他にどのようなオプションがありますか? – Bunyk

+0

s3Clientは既にHTTP接続を再利用するために接続プール自体をいくつか持っていますが、同じインスタンスを再使用するだけで十分ですが、これがエグゼキュータで動作するかどうかはわかりません。 – egovconcepts

答えて

0

すべてのマッパーで新しいクライアントが必要です。これらのクライアントは異なるjvms(場合によっては異なるノード)で実行されるため、同じクライアントを再利用することはオプションではありません。

myData.mapPartitions { part => 
    val client = new AmazonS3Client 
    part.flatMap { chunk => client.getObject(...) } 
    } 

この方法では、あなたではなくRDDのすべての単一の要素の場合よりも、マッパーごとに一度クライアントを作成している:あなたは何ができるか

.mapPartitionsを使用しています。

関連する問題