2016-12-09 22 views
2

これについていくつかの議論がありましたが、適切な解決策を理解できませんでした。 S3からRDDに数百のファイルをロードします。ここで私は今それをやっている方法です:S3から複数のファイルを並列に読む(Spark、Java)

ObjectListing objectListing = s3.listObjects(new ListObjectsRequest(). 
       withBucketName(...). 
       withPrefix(...)); 
List<String> keys = new LinkedList<>(); 
objectListing.getObjectSummaries().forEach(summery -> keys.add(summery.getKey())); // repeat while objectListing.isTruncated() 

JavaRDD<String> events = sc.parallelize(keys).flatMap(new ReadFromS3Function(clusterProps)); 

ReadFromS3FunctionAmazonS3クライアントを使用して実際の読み取りを行います。

public Iterator<String> call(String s) throws Exception { 
     AmazonS3 s3Client = getAmazonS3Client(properties); 
     S3Object object = s3Client.getObject(new GetObjectRequest(...)); 
     InputStream is = object.getObjectContent(); 
     List<String> lines = new LinkedList<>(); 
     String str; 
     try { 
      BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 
      if (is != null) { 
       while ((str = reader.readLine()) != null) { 
        lines.add(str); 
       } 
      } else { 
       ... 
      } 
     } finally { 
      ... 
     } 
     return lines.iterator(); 

私は一種の私は、同じ質問に対して見た答えがこの投稿を「翻訳します」 Scalaでパスのリスト全体をsc.textFile(...)に渡すことも可能だと思いますが、どちらがベストプラクティスなのかわかりません。

答えて

2

根本的な問題は、S3のオブジェクトを一覧表示することは本当に遅く、何かがない時はいつでも、ディレクトリツリーに見えるように作られている方法は、パフォーマンスを殺すことですtreewalk(パスのワイルドカードパターンマシニングのように)。

ポストのコードは、より良いパフォーマンスを提供するすべての子供のリストを実行しています。本質的にHadoop 2.8およびs3a listFiles(パス、再帰的)に付属するものはHADOOP-13208です。

はそのリストを取得した後、あなたは、テキストファイルを入力として処理するために、火花のためS3A/S3Nパスにマッピングすることができ、オブジェクトのパスに文字列を持っている、とあなたがして

val files = keys.map(key -> s"s3a://$bucket/$key").mkString(",") 
sc.textFile(files).map(...) 
に仕事を適用することができました

リクエストされたとおり、ここに使用されるJavaコードがあります。 、あなたはCPにhadoop-awsamazon-sdk JARを持っているので、私は、S3AへS3Nを切り替え

String prefix = "s3a://" + properties.get("s3.source.bucket") + "/"; 
objectListing.getObjectSummaries().forEach(summary -> keys.add(prefix+summary.getKey())); 
// repeat while objectListing truncated 
JavaRDD<String> events = sc.textFile(String.join(",", keys)) 

注意、S3Aコネクタは、使用されるべきものです。それは優れているし、人々(私)によってスパークワークロードに対して維持されテストされるものです。 The history of Hadoop's S3 connectorsを参照してください。

+0

回答ありがとうSteve、AWS EMRでコードを実行しようとしています両方のオプション(私のカスタムマップ関数と 'textFile(...)'にすべてのパスを渡していますが、正しく動作させるためにいくつかの問題があります)を実行してパフォーマンスを比較すると、このスレッドが更新されます。 – Nira

+0

素敵!私は1GiBの7つのファイルで試してみましたが、 'textFile(...) '(私のカスタムコードよりも50%高速)を使ってうまく動作します。それで対応するJavaコードであなたの返信を更新してください。私はそれを受け入れますか? 'String prefix =" s3n:// "+ properties.get(" s3.source.bucket ")+"/"; objectListing.getObjectSummaries()。forEach(summery - > keys.add(接頭辞+サマリー.getKey())); //繰り返し 'JavaRDD イベントをtruncated'ながら= sc.textFile(String.join( ""、キー));' – Nira

+1

S3Aで試してみました、私のために正常に動作します。 spark-submitのparamとして、--packages com.amazonaws:aws-java-sdk-pom:1.10.34、org.apache.hadoop:hadoop-aws:2.7.2'が必要です。また、 'sc.hadoopConfiguration ().set( "fs.s3.impl"、 "org.apache.hadoop.fs.s3a.S3AFileSystem"); 'コード内にあります。また、EMRには、バケットの読み取り/書き込み権限があるIAMロールが必要です。そうすれば、あなたは 'AmazonS3 s3 = new AmazonS3Client();'だけを実行することができ、信用は自動的に拾い上げられます。 – Nira

2

sc.textFileを使用して複数のファイルを読み取ることができます。

multiple file urlを引数として渡すことができます。

全体をdirectoriesと指定し、wildcardsと、ディレクトリとワイルドカードのCSVを使用することもできます。

例:

sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file") 

Reference from this ans

+0

ワイルドカードを使用してについての事は、それが火花を起こしリスト()に複数の呼び出しになりということです長い間、仕事が無反応であるように見える。そのため、まずすべてのキーを取得してからパラレル化することをお勧めします。ここをクリックしてください:http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219 – Nira

1

あなたが読んでAWSがエグゼキュータを利用されながら、並列化と間違いなくパフォーマンスを

改善しようと、私は推測する
val bucketName=xxx 
val keyname=xxx 
val df=sc.parallelize(new AmazonS3Client(new BasicAWSCredentials("awsccessKeyId", "SecretKey")).listObjects(request).getObjectSummaries.map(_.getKey).toList) 
     .flatMap { key => Source.fromInputStream(s3.getObject(bucketName, keyname).getObjectContent: InputStream).getLines } 
+0

とよく見えます。 listObjectsは通常、結果をバッチで返します。したがって、私はそこにループがあると仮定しています。 – Programmer

関連する問題