2017-07-10 1 views
0

次のコードは、S3からデータを読み込み、SparkSQLを使用して重複を消去し、JDBCを使用してデータをRedshiftに保存します。私はまた、spark-redshiftの依存関係を使用して同じ結果を得ようとしました。私はSpark 2.0を使用しています。スパークがすべてのデータを赤色シフトに保存しない

私が理解できないことは、メモリにロードされた結果を表示するときに、合計が予想される数値であることです。ただし、スパークがレッドシフトに保存されている場合は常に少なくなります。どういうわけか、すべてのレコードが保存されているわけではなく、STL_LOAD_ERRORSでもエラーは表示されません。誰かがこれに遭遇したか、これがどうして起こるかについてのアイデアがありますか?大規模な赤方偏移が遅いある - jdbcを使用して

 // Load files that were loaded into firehose on this day 
    var s3Files = spark.sqlContext.read.schema(schema).json("s3://" + job.getAWSAccessKey + ":" + job.getAWSSecretKey + "@" + job.getBucketName + "/"+ job.getAWSS3RawFileExpression + "/" + year+ "/" + monthCheck+ "/" + dayCheck + "/*/").rdd 

    // Apply the schema to the RDD, here we will have duplicates 
    val usersDataFrame = spark.createDataFrame(s3Files, schema) 

    usersDataFrame.createOrReplaceTempView("results") 

    // Clean and use partition by the keys to eliminate duplicates and get latest record 
    var results = spark.sql(buildCleaningQuery(job,"results")) 
    results.createOrReplaceTempView("filteredResults") 

    // This returns the correct result! 
    var check = spark.sql("select sum(Reward) from filteredResults where period=1706") 
    check.show() 

    var path = UUID.randomUUID().toString() 

    println("s3://" + job.getAWSAccessKey + ":" + job.getAWSSecretKey + "@" + job.getAWSS3TemporaryDirectory + "/" + path) 

    val prop = new Properties() 

    results.write.jdbc(job.getRedshiftJDBC,"work.\"" + path + "\"",prop) 

答えて

0

はスパークが繰り返さINSERT INTO文をやろうとしていることを意味します。そのため、stl_load_errorsにエントリが表示されていません。

代わりにspark-redshiftライブラリを使用することをおすすめします。それはよくテストされ、より良いパフォーマンスを発揮します。 (多くのオプションを示す)https://github.com/databricks/spark-redshift

例:私は説明したように

my_dataframe.write 
    .format("com.databricks.spark.redshift") 
    .option("url", "jdbc:redshift://my_cluster.qwertyuiop.eu-west-1.redshift.amazonaws.com:5439/my_database?user=my_user&password=my_password") 
    .option("dbtable", "my_table") 
    .option("tempdir", "s3://my-bucket") 
    .option("diststyle", "KEY") 
    .option("distkey", "dist_key") 
    .option("sortkeyspec", "COMPOUND SORTKEY(key_1, key_2)") 
    .option("extracopyoptions", "TRUNCATECOLUMNS COMPUPDATE OFF STATUPDATE OFF") 
    .mode("overwrite") // "append"/"error" 
    .save() 
+0

はい、私は、両方を使用します。私は、スパーク・レッド・シフトから出てくる問題の可能性を排除するためにJDBCを使用しました。 – Mez

+0

一部のファイルがスキップされる場合があります。 'spark-redshift'ライブラリがステージングデータを削除し、S3ファイルのリストに対して' stl_load_commits'をチェックしないようにします。 –

関連する問題