S3からSparkにファイルをロードするには、次のコードを使用してください。動作していますが、1つのファイルと別のファイルの間に遅延があり、順次ロードされていることに気づいています。私はこれを並列に読み込むことで改善したいと思います。S3ファイルを並列に読み込みます。
// Load files that were loaded into firehose on this day
var s3Files = spark.sqlContext.read.schema(schema).json("s3n://" + job.awsaccessKey + ":" + job.awssecretKey + "@" + job.bucketName + "/" + job.awss3RawFileExpression + "/" + year + "/" + monthCheck + "/" + dayCheck + "/*/").rdd
// Apply the schema to the RDD, here we will have duplicates
val usersDataFrame = spark.createDataFrame(s3Files, schema)
usersDataFrame.createOrReplaceTempView("results")
// Clean and use partition by the keys to eliminate duplicates and get latest record
var results = spark.sql(buildCleaningQuery(job, "results"))
results.createOrReplaceTempView("filteredResults")
val records = spark.sql("select count(*) from filteredResults")
Iはまた、その後、私はスパークSQLを使用するに移動する必要があるので、しかし、私はRDDにRDD [文字列]を変換する問題[行]を有するAM、テキストファイル()メソッドを介してロードしようとしています。私は次のように使用しています。
var s3Files = sparkContext.textFile("s3n://" + job.awsaccessKey + ":" + job.awssecretKey + "@" + job.bucketName + "/" + job.awss3RawFileExpression + "/" + year + "/" + monthCheck + "/" + dayCheck + "/*/").toJavaRDD()
JSONファイル(それぞれ50MB程度の複数ファイル)をSparkに読み込む理想的な方法はありますか?私はスキーマに対してプロパティを検証したいので、後でSQLクエリをSparkしてデータを消去することができます。
s3Filesをrddに変更する必要がありましたか?私はあなたがそれをrddに変更しなければ、ファイル内容を並行して引き出すと信じています。 – wllmtrng
最終結果はデータフレームとなり、Spark SQLクエリを実行してredshiftに保存します。 RDDに変換することなく、私は何かが欠けていない限り、私はその論理に従うことができません。 – Mez
var s3Files = spark.sqlContext.read.schema(スキーマ).json(...)。createOrReplaceTempView( "results") で十分です。それを試してみて、それでもまだ物事を読むかどうかを確認してください。 – wllmtrng