これについていくつかの議論がありましたが、適切な解決策を理解できませんでした。 S3からRDDに数百のファイルをロードします。ここで私は今それをやっている方法です:S3から複数のファイルを並列に読む(Spark、Java)
ObjectListing objectListing = s3.listObjects(new ListObjectsRequest().
withBucketName(...).
withPrefix(...));
List<String> keys = new LinkedList<>();
objectListing.getObjectSummaries().forEach(summery -> keys.add(summery.getKey())); // repeat while objectListing.isTruncated()
JavaRDD<String> events = sc.parallelize(keys).flatMap(new ReadFromS3Function(clusterProps));
ReadFromS3Function
はAmazonS3
クライアントを使用して実際の読み取りを行います。
public Iterator<String> call(String s) throws Exception {
AmazonS3 s3Client = getAmazonS3Client(properties);
S3Object object = s3Client.getObject(new GetObjectRequest(...));
InputStream is = object.getObjectContent();
List<String> lines = new LinkedList<>();
String str;
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
if (is != null) {
while ((str = reader.readLine()) != null) {
lines.add(str);
}
} else {
...
}
} finally {
...
}
return lines.iterator();
私は一種の私は、同じ質問に対して見た答えがこの投稿を「翻訳します」 Scalaでパスのリスト全体をsc.textFile(...)
に渡すことも可能だと思いますが、どちらがベストプラクティスなのかわかりません。
回答ありがとうSteve、AWS EMRでコードを実行しようとしています両方のオプション(私のカスタムマップ関数と 'textFile(...)'にすべてのパスを渡していますが、正しく動作させるためにいくつかの問題があります)を実行してパフォーマンスを比較すると、このスレッドが更新されます。 – Nira
素敵!私は1GiBの7つのファイルで試してみましたが、 'textFile(...) '(私のカスタムコードよりも50%高速)を使ってうまく動作します。それで対応するJavaコードであなたの返信を更新してください。私はそれを受け入れますか? 'String prefix =" s3n:// "+ properties.get(" s3.source.bucket ")+"/"; objectListing.getObjectSummaries()。forEach(summery - > keys.add(接頭辞+サマリー.getKey())); //繰り返し 'JavaRDDイベントをtruncated'ながら= sc.textFile(String.join( ""、キー));' –
Nira
S3Aで試してみました、私のために正常に動作します。 spark-submitのparamとして、--packages com.amazonaws:aws-java-sdk-pom:1.10.34、org.apache.hadoop:hadoop-aws:2.7.2'が必要です。また、 'sc.hadoopConfiguration ().set( "fs.s3.impl"、 "org.apache.hadoop.fs.s3a.S3AFileSystem"); 'コード内にあります。また、EMRには、バケットの読み取り/書き込み権限があるIAMロールが必要です。そうすれば、あなたは 'AmazonS3 s3 = new AmazonS3Client();'だけを実行することができ、信用は自動的に拾い上げられます。 – Nira