2017-07-28 6 views
0

S3からSparkにファイルをロードするには、次のコードを使用してください。動作していますが、1つのファイルと別のファイルの間に遅延があり、順次ロードされていることに気づいています。私はこれを並列に読み込むことで改善したいと思います。S3ファイルを並列に読み込みます。

 // Load files that were loaded into firehose on this day 
    var s3Files = spark.sqlContext.read.schema(schema).json("s3n://" + job.awsaccessKey + ":" + job.awssecretKey + "@" + job.bucketName + "/" + job.awss3RawFileExpression + "/" + year + "/" + monthCheck + "/" + dayCheck + "/*/").rdd 

    // Apply the schema to the RDD, here we will have duplicates 
    val usersDataFrame = spark.createDataFrame(s3Files, schema) 

    usersDataFrame.createOrReplaceTempView("results") 

    // Clean and use partition by the keys to eliminate duplicates and get latest record 
    var results = spark.sql(buildCleaningQuery(job, "results")) 
    results.createOrReplaceTempView("filteredResults") 
    val records = spark.sql("select count(*) from filteredResults") 

Iはまた、その後、私はスパークSQLを使用するに移動する必要があるので、しかし、私はRDDにRDD [文字列]を変換する問題[行]を有するAM、テキストファイル()メソッドを介してロードしようとしています。私は次のように使用しています。

 var s3Files = sparkContext.textFile("s3n://" + job.awsaccessKey + ":" + job.awssecretKey + "@" + job.bucketName + "/" + job.awss3RawFileExpression + "/" + year + "/" + monthCheck + "/" + dayCheck + "/*/").toJavaRDD() 

JSONファイル(それぞれ50MB程度の複数ファイル)をSparkに読み込む理想的な方法はありますか?私はスキーマに対してプロパティを検証したいので、後でSQLクエリをSparkしてデータを消去することができます。

+2

s3Filesをrddに変更する必要がありましたか?私はあなたがそれをrddに変更しなければ、ファイル内容を並行して引き出すと信じています。 – wllmtrng

+0

最終結果はデー​​タフレームとなり、Spark SQLクエリを実行してredshiftに保存します。 RDDに変換することなく、私は何かが欠けていない限り、私はその論理に従うことができません。 – Mez

+1

var s3Files = spark.sqlContext.read.schema(スキーマ).json(...)。createOrReplaceTempView( "results") で十分です。それを試してみて、それでもまだ物事を読むかどうかを確認してください。 – wllmtrng

答えて

2

何が起こっているのかは、DataFrameがRDDに変換されてからDataFrameに再度変換されてから、パーティション情報が失われることです。

var s3Files = spark 
    .sqlContext 
    .read.schema(schema) 
    .json(...) 
    .createOrRepla‌​ceTempView("results"‌​) 

で十分であり、パーティション情報も存在していなければならず、jsonファイルを同時にロードできるようにする必要があります。

関連する問題