2017-11-28 11 views
2

以下のAWS Glue用の簡単なスクリプトがあります。私は空のセルとNULL値を受け入れるテーブルを持つテキストファイルを持っています。グルージョブを実行すると、「NullTypeをREDSHIFTに保存する方法がわからない」例外が発生して失敗します。AWS接着剤 - NullTypeをREDSHIFTに保存する方法を知らない

RedShift via Glueでサポートされていない、またはNULLインサートですか?

ジョブスクリプト:

import sys 
from awsglue.transforms import * 
from awsglue.utils import getResolvedOptions 
from pyspark.context import SparkContext 
from awsglue.context import GlueContext 
from awsglue.job import Job 

## @params: [TempDir, JOB_NAME] 
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME']) 

sc = SparkContext() 
glueContext = GlueContext(sc) 
spark = glueContext.spark_session 
job = Job(glueContext) 
job.init(args['JOB_NAME'], args) 
## @type: DataSource 
## @args: [database = "poc_edw", table_name = "delta_orderheader", transformation_ctx = "datasource0"] 
## @return: datasource0 
## @inputs: [] 
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "poc_edw", table_name = "delta_orderheader", transformation_ctx = "datasource0") 
## @type: ApplyMapping 
## @args: [mapping = [("partitionnumber", "int", "partitionnumber", "int"), ("messagekey", "long", "messagekey", "long"), ("applicationversion", "string", "applicationversion", "string"), ("businessdate", "date", "businessdate", "date"), ("change", "decimal(10,2)", "change", "decimal(10,2)"), ("employeeid", "int", "employeeid", "int"), ("employeename", "string", "employeename", "string"), ("employeeuserid", "string", "employeeuserid", "string"), ("meallocation", "int", "meallocation", "int"), ("messageid", "string", "messageid", "string"), ("ordernumber", "int", "ordernumber", "int"), ("ordersourcetypekey", "short", "ordersourcetypekey", "short"), ("posid", "int", "posid", "int"), ("satellitenumber", "int", "satellitenumber", "int"), ("spmhostordercode", "string", "spmhostordercode", "string"), ("storenumber", "int", "storenumber", "int"), ("taxamount", "decimal(10,2)", "taxamount", "decimal(10,2)"), ("taxexempt", "int", "taxexempt", "int"), ("taxinclusiveamount", "decimal(10,2)", "taxinclusiveamount", "decimal(10,2)"), ("terminalnumber", "string", "terminalnumber", "string"), ("transactiondate", "timestamp", "transactiondate", "timestamp"), ("transactionid", "int", "transactionid", "int"), ("version", "decimal(10,2)", "version", "decimal(10,2)"), ("woddescription", "string", "woddescription", "string"), ("wodpromotionid", "int", "wodpromotionid", "int"), ("wodtype", "short", "wodtype", "short"), ("wodvalue", "decimal(10,2)", "wodvalue", "decimal(10,2)"), ("sqlinsertedprocessid", "int", "sqlinsertedprocessid", "int"), ("insertedprocessid", "int", "insertedprocessid", "int"), ("lastupdatedprocessid", "int", "lastupdatedprocessid", "int"), ("createddatetime", "timestamp", "createddatetime", "timestamp"), ("lastupdateddatetime", "timestamp", "lastupdateddatetime", "timestamp"), ("applyprocessid", "int", "applyprocessid", "int"), ("applydatetime", "timestamp", "applydatetime", "timestamp"), ("ordernetamount", "decimal(10,2)", "ordernetamount", "decimal(10,2)"), ("loyaltysubcardid", "string", "loyaltysubcardid", "string"), ("loyaltymemberid", "string", "loyaltymemberid", "string"), ("basepointegersearned", "int", "basepointegersearned", "int"), ("bonuspointegersearned", "int", "bonuspointegersearned", "int"), ("loyaltynetsales", "decimal(10,2)", "loyaltynetsales", "decimal(10,2)"), ("rewardsredeemedamount", "decimal(10,2)", "rewardsredeemedamount", "decimal(10,2)"), ("rewardsabandonedamount", "decimal(10,2)", "rewardsabandonedamount", "decimal(10,2)"), ("loyaltymemberlookuptypekey", "short", "loyaltymemberlookuptypekey", "short"), ("remoteorderid", "string", "remoteorderid", "string")], transformation_ctx = "applymapping1"] 
## @return: applymapping1 
## @inputs: [frame = datasource0] 
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("partitionnumber", "int", "partitionnumber", "int"), ("messagekey", "long", "messagekey", "long"), ("applicationversion", "string", "applicationversion", "string"), ("businessdate", "date", "businessdate", "date"), ("change", "decimal(10,2)", "change", "decimal(10,2)"), ("employeeid", "int", "employeeid", "int"), ("employeename", "string", "employeename", "string"), ("employeeuserid", "string", "employeeuserid", "string"), ("meallocation", "int", "meallocation", "int"), ("messageid", "string", "messageid", "string"), ("ordernumber", "int", "ordernumber", "int"), ("ordersourcetypekey", "short", "ordersourcetypekey", "short"), ("posid", "int", "posid", "int"), ("satellitenumber", "int", "satellitenumber", "int"), ("spmhostordercode", "string", "spmhostordercode", "string"), ("storenumber", "int", "storenumber", "int"), ("taxamount", "decimal(10,2)", "taxamount", "decimal(10,2)"), ("taxexempt", "int", "taxexempt", "int"), ("taxinclusiveamount", "decimal(10,2)", "taxinclusiveamount", "decimal(10,2)"), ("terminalnumber", "string", "terminalnumber", "string"), ("transactiondate", "timestamp", "transactiondate", "timestamp"), ("transactionid", "int", "transactionid", "int"), ("version", "decimal(10,2)", "version", "decimal(10,2)"), ("woddescription", "string", "woddescription", "string"), ("wodpromotionid", "int", "wodpromotionid", "int"), ("wodtype", "short", "wodtype", "short"), ("wodvalue", "decimal(10,2)", "wodvalue", "decimal(10,2)"), ("sqlinsertedprocessid", "int", "sqlinsertedprocessid", "int"), ("insertedprocessid", "int", "insertedprocessid", "int"), ("lastupdatedprocessid", "int", "lastupdatedprocessid", "int"), ("createddatetime", "timestamp", "createddatetime", "timestamp"), ("lastupdateddatetime", "timestamp", "lastupdateddatetime", "timestamp"), ("applyprocessid", "int", "applyprocessid", "int"), ("applydatetime", "timestamp", "applydatetime", "timestamp"), ("ordernetamount", "decimal(10,2)", "ordernetamount", "decimal(10,2)"), ("loyaltysubcardid", "string", "loyaltysubcardid", "string"), ("loyaltymemberid", "string", "loyaltymemberid", "string"), ("basepointegersearned", "int", "basepointegersearned", "int"), ("bonuspointegersearned", "int", "bonuspointegersearned", "int"), ("loyaltynetsales", "decimal(10,2)", "loyaltynetsales", "decimal(10,2)"), ("rewardsredeemedamount", "decimal(10,2)", "rewardsredeemedamount", "decimal(10,2)"), ("rewardsabandonedamount", "decimal(10,2)", "rewardsabandonedamount", "decimal(10,2)"), ("loyaltymemberlookuptypekey", "short", "loyaltymemberlookuptypekey", "short"), ("remoteorderid", "string", "remoteorderid", "string")], transformation_ctx = "applymapping1") 
## @type: SelectFields 
## @args: [paths = ["applydatetime", "messagekey", "businessdate", "transactiondate", "sqlinsertedprocessid", "ordernetamount", "applicationversion", "messageid", "storenumber", "satellitenumber", "loyaltynetsales", "spmhostordercode", "bonuspointegersearned", "employeeid", "transactionid", "loyaltysubcardid", "employeeuserid", "taxinclusiveamount", "meallocation", "ordernumber", "loyaltymemberlookuptypekey", "applyprocessid", "ordersourcetypekey", "basepointegersearned", "partitionnumber", "insertedprocessid", "wodtype", "loyaltymemberid", "rewardsredeemedamount", "change", "rewardsabandonedamount", "version", "taxexempt", "remoteorderid", "wodpromotionid", "posid", "woddescription", "wodvalue", "lastupdatedprocessid", "taxamount", "terminalnumber", "lastupdateddatetime", "createddatetime", "employeename"], transformation_ctx = "selectfields2"] 
## @return: selectfields2 
## @inputs: [frame = applymapping1] 
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["applydatetime", "messagekey", "businessdate", "transactiondate", "sqlinsertedprocessid", "ordernetamount", "applicationversion", "messageid", "storenumber", "satellitenumber", "loyaltynetsales", "spmhostordercode", "bonuspointegersearned", "employeeid", "transactionid", "loyaltysubcardid", "employeeuserid", "taxinclusiveamount", "meallocation", "ordernumber", "loyaltymemberlookuptypekey", "applyprocessid", "ordersourcetypekey", "basepointegersearned", "partitionnumber", "insertedprocessid", "wodtype", "loyaltymemberid", "rewardsredeemedamount", "change", "rewardsabandonedamount", "version", "taxexempt", "remoteorderid", "wodpromotionid", "posid", "woddescription", "wodvalue", "lastupdatedprocessid", "taxamount", "terminalnumber", "lastupdateddatetime", "createddatetime", "employeename"], transformation_ctx = "selectfields2") 
## @type: ResolveChoice 
## @args: [choice = "MATCH_CATALOG", database = "poc_edw", table_name = "derik_edw_derik_stageorderheader", transformation_ctx = "resolvechoice3"] 
## @return: resolvechoice3 
## @inputs: [frame = selectfields2] 
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "poc_edw", table_name = "derik_edw_derik_stageorderheader", transformation_ctx = "resolvechoice3") 
## @type: ResolveChoice 
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice4"] 
## @return: resolvechoice4 
## @inputs: [frame = resolvechoice3] 
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4") 
## @type: DataSink 
## @args: [database = "poc_edw", table_name = "derik_edw_derik_stageorderheader", redshift_tmp_dir = TempDir, transformation_ctx = "datasink5"] 
## @return: datasink5 
## @inputs: [frame = resolvechoice4] 
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "poc_edw", table_name = "derik_edw_derik_stageorderheader", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5") 
job.commit() 

Reference: AWS Forum Link

更新:

私はいくつかの進歩を遂げています。私は問題がNULL文字(0x00)だと思ったが、そうではないことが判明した。私はNULL文字なしで私のファイルを再作成し、私は同じ問題がありました。

私はこのコード行を追加しました。

df = DropNullFields.apply(frame = resolvechoice4, transformation_ctx = "df") 

私は完全に理由を理解していないが、私は集めることができる最高はDynamicFrameが存在していなかったいくつかのNullTypeフィールドを推測することです。このコード行を追加した後、行が挿入されましたが、文字列フィールドが含まれていないようです。私のフィールドの約半分だけが値を持っています。

私のexperinceから
+0

私は値のインライン交換を行う方法を見つけ出すように見えることはできません。たとえば、以下のリンクは、DataFrameクラスのna.fill()およびfillna()関数を参照しています。 Glueは、DataFrameの抽象であるDynamicFrameを使用していましたが、これは明らかに.fillna()またはそのエイリアスを実装していません。 DynamicFrameクラスのDropNullFields()関数は、フィールド内のNULL文字を省略するのではなく、NULL値を持つフィールド全体を削除します。 http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html –

+0

当面はAWS Glueの使用を中止しました。 –

+0

私はETL負荷を管理するためにAWS Glueを使用しています。私がやっていることは次のとおりです: カラムを追加するか計算を行う度に、動的フレームをスパークデータフレームに変換します: dataframe.toDF() –

答えて

0

接着剤は、(少なくとも、2018年2月13日のための)列とタイプについて非常に奇妙なアルゴリズムを持っています。 は、データカタログの列名と型をで読み取ってから、型を再度調べようとします。 (神々が私になぜ???)を教えてくれます。それが空の値を扱うとき、それはnull型を "把握する"。

これは、csvから値をいくつかの数値型にキャストしようとすると、特に苦痛です。また、時には接着剤で値を持たないカラムを削除するだけです(例えば、タイプAからタイプBにカラムマッピングを追加すると、カラムタイプがCと認識されます)。あなたはオークの形式でデータを保存しようとすると、ヌル種類と

すべての列が素敵IllegalArgumentExceptionにつながる:あなたはDropNullFieldsを呼び出す必要がありmeantionとして

  1. を解決する方法

    java.lang.IllegalArgumentException: Error: type expected at the position x of 'int:string:nullstring:int' but 'null' is found. 
    

  2. しかし、この列をSQL文で使用する必要がある場合、この列が見つからないというエラーが発生します。だから、右のタイプ(スカラ座のコード)で再度だけ削除列を「追加」する必要があります。

    //your glue dynamic frame with 
    val glueDynamicFrame: DynamicFrame = ??? 
    //get spark dataframe 
    val sparkDataFrame = glueDynamicFrame.dropNulls().toDF() 
    //this is final spark data frame with all columns and right types 
    val sparkDataFrameWithColumnAndType = 
    if (!sparkDataFrame.columns.toSet.contains("myColumnWithNullType")) { 
        //still null value but with type!!! 
        import org.apache.spark.sql.types.IntegerType 
        import org.apache.spark.sql.functions.lit 
        sparkDataFrame.withColumn("myColumnWithNullType", lit(null).cast(IntegerType)) 
        //or any other type from org.apache.spark.sql.types package 
    } else { 
        sparkDataFrame 
    } 
    //convert spark data frame back to glue dynamic 
    val newDynamiFramew = DynamicFrame(sparkDataFrameWithColumnAndType, glueContext) 
    
関連する問題