2017-11-02 13 views
0

ファイルにs3キーのリストがあり、その値をRDDに入れたいと考えています。これを行う正しい方法は何ですか?正しい方法でs3キーをSparkで値にマップする

以下のコードは私の現在の試みです。ファイルをs3からプルダウンする呼び出しは、map関数の外部で機能しますが、その内部にヌルポインタ例外が発生します。これは、マップ呼び出しの内部でRDDを作成したり操作したりすることができないためだと思いますが、これを回避する方法がわかりません。 S3からファイルを読み込み、RDDに入れないでファイルを読み込む方法はありますか?理想的な文字列またはinputStreamに直接読み込むことができますが、Sources3a形式をサポートしているとは思われません。これは、hadoopファイルシステムから読み込んでいるものでなければなりません。

JavaRDD<String> keys = spark.sparkContext().textFile("/list/of/keys", 0).toJavaRDD(); 
SparkContext sc = spark.sparkContext(); 
JavaRDD<Tuple2<String, String>> file = spark.sparkContext().wholeTextFiles("s3a://bucket/key",0).toJavaRDD(); 

JavaRDD<String> files = 
    keys.map(
     o -> { 
      JavaRDD<Tuple2<String, String>> rawfile = spark.sparkContext().wholeTextFiles("s3a://bucket/key",0)toJavaRDD(); 
      return rawFile.take(1).get(0)._2(); 
     } 
    ); 

私はそれらが原因真されていないS3には非常に高価なことができることを読んだとして、私は複数のファイルを読み込むためにwholeTextFilesを使用していない理由は、私がs3に往復リクエストを作成しないようにするということですファイルシステム。

+0

データはCSVタイプですか? –

+0

s3のデータは?またはキーストア? –

+0

s3のデータがCSVとして保存されているかどうかを確認したいと思います。 –

答えて

0

あなたのソリューションは確かに機能します。すべてのキーに対してファイルシステムを調べない方法の1つは、代わりにkeys.mapPartitionを使用することです。また、RDD要素ごとに1行のファイルを使用する場合は、IOUtils#readLinesを使用できます。

JavaRDD<String> files = 
keys.mapPartition(
    subkeys -> { 
      FileSystem fs = FileSystem.get(new Configuration()); 
      List<String> lines = new ArrayList<>(); 

      for (k : subkeys) { 
      lines.addAll(IOUtils.readLines(fs.open(new Path("/" + o))), Charset.defaultCharset()); 
      } 

      return lines; 
    } 
); 
+0

素敵な 'mapPartition'は私が必要とするもののように見えます。私は小さな構文の問題のあなたの答えを編集しました。ありがとう! –

+0

すばらしい、ありがとう! –

0

私は、S3から読み込むためにライブラリを起動する代わりに、Hadoopライブラリを使用して何かを稼働させました。

この設定では、s3ahdfs-site.xmlで動作するように設定する必要があります。

JavaRDD<String> keys = spark.sparkContext().textFile("file:///list/of/keys", 0).toJavaRDD(); 

SparkContext sc = spark.sparkContext(); 

Configuration hConf = new Configuration(); 

JavaRDD<String> files = 
    keys.map(
     o -> { 
       return org.apache.commons.io.IOUtils.toString(FileSystem.get(new Configuration()).open(new Path("/" + o))); 
     } 
    ); 

私は、これはそれを行うための最速の方法であるかどうかわからないんだけど、それが働いている:ここに私のコードは、今のようになります。

fs.s3a.access.keyfs.s3a.secret.keyのプロパティと一緒にこれをhdfs-site.xmlに入れなければなりませんでした。

<property> 
    <name>fs.default.name</name> 
    <value>s3a://bucket-name</value> 
</property> 
関連する問題