以下のAWS Glue用の簡単なスクリプトがあります。私は空のセルとNULL値を受け入れるテーブルを持つテキストファイルを持っています。グルージョブを実行すると、「NullTypeをREDSHIFTに保存する方法がわからない」例外が発生して失敗します。AWS接着剤 - NullTypeをREDSHIFTに保存する方法を知らない
RedShift via Glueでサポートされていない、またはNULLインサートですか?
ジョブスクリプト:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "poc_edw", table_name = "delta_orderheader", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "poc_edw", table_name = "delta_orderheader", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("partitionnumber", "int", "partitionnumber", "int"), ("messagekey", "long", "messagekey", "long"), ("applicationversion", "string", "applicationversion", "string"), ("businessdate", "date", "businessdate", "date"), ("change", "decimal(10,2)", "change", "decimal(10,2)"), ("employeeid", "int", "employeeid", "int"), ("employeename", "string", "employeename", "string"), ("employeeuserid", "string", "employeeuserid", "string"), ("meallocation", "int", "meallocation", "int"), ("messageid", "string", "messageid", "string"), ("ordernumber", "int", "ordernumber", "int"), ("ordersourcetypekey", "short", "ordersourcetypekey", "short"), ("posid", "int", "posid", "int"), ("satellitenumber", "int", "satellitenumber", "int"), ("spmhostordercode", "string", "spmhostordercode", "string"), ("storenumber", "int", "storenumber", "int"), ("taxamount", "decimal(10,2)", "taxamount", "decimal(10,2)"), ("taxexempt", "int", "taxexempt", "int"), ("taxinclusiveamount", "decimal(10,2)", "taxinclusiveamount", "decimal(10,2)"), ("terminalnumber", "string", "terminalnumber", "string"), ("transactiondate", "timestamp", "transactiondate", "timestamp"), ("transactionid", "int", "transactionid", "int"), ("version", "decimal(10,2)", "version", "decimal(10,2)"), ("woddescription", "string", "woddescription", "string"), ("wodpromotionid", "int", "wodpromotionid", "int"), ("wodtype", "short", "wodtype", "short"), ("wodvalue", "decimal(10,2)", "wodvalue", "decimal(10,2)"), ("sqlinsertedprocessid", "int", "sqlinsertedprocessid", "int"), ("insertedprocessid", "int", "insertedprocessid", "int"), ("lastupdatedprocessid", "int", "lastupdatedprocessid", "int"), ("createddatetime", "timestamp", "createddatetime", "timestamp"), ("lastupdateddatetime", "timestamp", "lastupdateddatetime", "timestamp"), ("applyprocessid", "int", "applyprocessid", "int"), ("applydatetime", "timestamp", "applydatetime", "timestamp"), ("ordernetamount", "decimal(10,2)", "ordernetamount", "decimal(10,2)"), ("loyaltysubcardid", "string", "loyaltysubcardid", "string"), ("loyaltymemberid", "string", "loyaltymemberid", "string"), ("basepointegersearned", "int", "basepointegersearned", "int"), ("bonuspointegersearned", "int", "bonuspointegersearned", "int"), ("loyaltynetsales", "decimal(10,2)", "loyaltynetsales", "decimal(10,2)"), ("rewardsredeemedamount", "decimal(10,2)", "rewardsredeemedamount", "decimal(10,2)"), ("rewardsabandonedamount", "decimal(10,2)", "rewardsabandonedamount", "decimal(10,2)"), ("loyaltymemberlookuptypekey", "short", "loyaltymemberlookuptypekey", "short"), ("remoteorderid", "string", "remoteorderid", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("partitionnumber", "int", "partitionnumber", "int"), ("messagekey", "long", "messagekey", "long"), ("applicationversion", "string", "applicationversion", "string"), ("businessdate", "date", "businessdate", "date"), ("change", "decimal(10,2)", "change", "decimal(10,2)"), ("employeeid", "int", "employeeid", "int"), ("employeename", "string", "employeename", "string"), ("employeeuserid", "string", "employeeuserid", "string"), ("meallocation", "int", "meallocation", "int"), ("messageid", "string", "messageid", "string"), ("ordernumber", "int", "ordernumber", "int"), ("ordersourcetypekey", "short", "ordersourcetypekey", "short"), ("posid", "int", "posid", "int"), ("satellitenumber", "int", "satellitenumber", "int"), ("spmhostordercode", "string", "spmhostordercode", "string"), ("storenumber", "int", "storenumber", "int"), ("taxamount", "decimal(10,2)", "taxamount", "decimal(10,2)"), ("taxexempt", "int", "taxexempt", "int"), ("taxinclusiveamount", "decimal(10,2)", "taxinclusiveamount", "decimal(10,2)"), ("terminalnumber", "string", "terminalnumber", "string"), ("transactiondate", "timestamp", "transactiondate", "timestamp"), ("transactionid", "int", "transactionid", "int"), ("version", "decimal(10,2)", "version", "decimal(10,2)"), ("woddescription", "string", "woddescription", "string"), ("wodpromotionid", "int", "wodpromotionid", "int"), ("wodtype", "short", "wodtype", "short"), ("wodvalue", "decimal(10,2)", "wodvalue", "decimal(10,2)"), ("sqlinsertedprocessid", "int", "sqlinsertedprocessid", "int"), ("insertedprocessid", "int", "insertedprocessid", "int"), ("lastupdatedprocessid", "int", "lastupdatedprocessid", "int"), ("createddatetime", "timestamp", "createddatetime", "timestamp"), ("lastupdateddatetime", "timestamp", "lastupdateddatetime", "timestamp"), ("applyprocessid", "int", "applyprocessid", "int"), ("applydatetime", "timestamp", "applydatetime", "timestamp"), ("ordernetamount", "decimal(10,2)", "ordernetamount", "decimal(10,2)"), ("loyaltysubcardid", "string", "loyaltysubcardid", "string"), ("loyaltymemberid", "string", "loyaltymemberid", "string"), ("basepointegersearned", "int", "basepointegersearned", "int"), ("bonuspointegersearned", "int", "bonuspointegersearned", "int"), ("loyaltynetsales", "decimal(10,2)", "loyaltynetsales", "decimal(10,2)"), ("rewardsredeemedamount", "decimal(10,2)", "rewardsredeemedamount", "decimal(10,2)"), ("rewardsabandonedamount", "decimal(10,2)", "rewardsabandonedamount", "decimal(10,2)"), ("loyaltymemberlookuptypekey", "short", "loyaltymemberlookuptypekey", "short"), ("remoteorderid", "string", "remoteorderid", "string")], transformation_ctx = "applymapping1")
## @type: SelectFields
## @args: [paths = ["applydatetime", "messagekey", "businessdate", "transactiondate", "sqlinsertedprocessid", "ordernetamount", "applicationversion", "messageid", "storenumber", "satellitenumber", "loyaltynetsales", "spmhostordercode", "bonuspointegersearned", "employeeid", "transactionid", "loyaltysubcardid", "employeeuserid", "taxinclusiveamount", "meallocation", "ordernumber", "loyaltymemberlookuptypekey", "applyprocessid", "ordersourcetypekey", "basepointegersearned", "partitionnumber", "insertedprocessid", "wodtype", "loyaltymemberid", "rewardsredeemedamount", "change", "rewardsabandonedamount", "version", "taxexempt", "remoteorderid", "wodpromotionid", "posid", "woddescription", "wodvalue", "lastupdatedprocessid", "taxamount", "terminalnumber", "lastupdateddatetime", "createddatetime", "employeename"], transformation_ctx = "selectfields2"]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["applydatetime", "messagekey", "businessdate", "transactiondate", "sqlinsertedprocessid", "ordernetamount", "applicationversion", "messageid", "storenumber", "satellitenumber", "loyaltynetsales", "spmhostordercode", "bonuspointegersearned", "employeeid", "transactionid", "loyaltysubcardid", "employeeuserid", "taxinclusiveamount", "meallocation", "ordernumber", "loyaltymemberlookuptypekey", "applyprocessid", "ordersourcetypekey", "basepointegersearned", "partitionnumber", "insertedprocessid", "wodtype", "loyaltymemberid", "rewardsredeemedamount", "change", "rewardsabandonedamount", "version", "taxexempt", "remoteorderid", "wodpromotionid", "posid", "woddescription", "wodvalue", "lastupdatedprocessid", "taxamount", "terminalnumber", "lastupdateddatetime", "createddatetime", "employeename"], transformation_ctx = "selectfields2")
## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "poc_edw", table_name = "derik_edw_derik_stageorderheader", transformation_ctx = "resolvechoice3"]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "poc_edw", table_name = "derik_edw_derik_stageorderheader", transformation_ctx = "resolvechoice3")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice4"]
## @return: resolvechoice4
## @inputs: [frame = resolvechoice3]
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")
## @type: DataSink
## @args: [database = "poc_edw", table_name = "derik_edw_derik_stageorderheader", redshift_tmp_dir = TempDir, transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "poc_edw", table_name = "derik_edw_derik_stageorderheader", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5")
job.commit()
更新:
私はいくつかの進歩を遂げています。私は問題がNULL文字(0x00)だと思ったが、そうではないことが判明した。私はNULL文字なしで私のファイルを再作成し、私は同じ問題がありました。
私はこのコード行を追加しました。
df = DropNullFields.apply(frame = resolvechoice4, transformation_ctx = "df")
私は完全に理由を理解していないが、私は集めることができる最高はDynamicFrameが存在していなかったいくつかのNullTypeフィールドを推測することです。このコード行を追加した後、行が挿入されましたが、文字列フィールドが含まれていないようです。私のフィールドの約半分だけが値を持っています。
私のexperinceから
私は値のインライン交換を行う方法を見つけ出すように見えることはできません。たとえば、以下のリンクは、DataFrameクラスのna.fill()およびfillna()関数を参照しています。 Glueは、DataFrameの抽象であるDynamicFrameを使用していましたが、これは明らかに.fillna()またはそのエイリアスを実装していません。 DynamicFrameクラスのDropNullFields()関数は、フィールド内のNULL文字を省略するのではなく、NULL値を持つフィールド全体を削除します。 http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html –
当面はAWS Glueの使用を中止しました。 –
私はETL負荷を管理するためにAWS Glueを使用しています。私がやっていることは次のとおりです: カラムを追加するか計算を行う度に、動的フレームをスパークデータフレームに変換します: dataframe.toDF() –