2017-10-06 8 views
1

Sparkでデータを作成してから結合操作を実行しましたが、最終的に出力をパーティションファイルに保存する必要がありました。saveAsTextFileを使用してsparkデータフレームでカスタムパーティションを作成する方法

データフレームをRDDに変換してから、複数の区切り文字を使用できるテキストファイルとして保存しています。私の質問は、この場合、カスタムパーティションとしてデータフレーム列を使用する方法です。

それはマルチchar型の区切り文字をサポートしていないので、私はカスタムパーティション用のオプションの下に使用することはできません

dfMainOutput.write.partitionBy("DataPartiotion","StatementTypeCode") 
    .format("csv") 
    .option("delimiter", "^") 
    .option("nullValue", "") 
    .option("codec", "gzip") 
    .save("s3://trfsdisu/SPARK/FinancialLineItem/output") 

を私はコードの下のようなRDDでこれを変換したマルチchar型の区切り文字を使用するには:

dfMainOutput.rdd.map(x=>x.mkString("|^|")).saveAsTextFile("dir path to store") 

上記のオプションでは、 "DataPartiotion"と "StatementTypeCode"という列に基づいてカスタムパーティションを作成する方法はありますか?

RDDからデータフレームに変換する必要がありますか?ここで

は、私は、これはconcat_wsを使用して行うことができ

val dfMainOutput = df1result.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer") 
     .select($"LineItem_organizationId", $"LineItem_lineItemId", 
     when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition_1").as("DataPartition_1"), 
     when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"), 
     when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").alias("StatementtypeCode"), 
     when($"LineItemName_1".isNotNull, $"LineItemName_1").otherwise($"LineItemName").as("LineItemName"), 
     when($"LocalLanguageLabel_1".isNotNull, $"LocalLanguageLabel_1").otherwise($"LocalLanguageLabel").as("LocalLanguageLabel"), 
     when($"FinancialConceptLocal_1".isNotNull, $"FinancialConceptLocal_1").otherwise($"FinancialConceptLocal").as("FinancialConceptLocal"), 
     when($"FinancialConceptGlobal_1".isNotNull, $"FinancialConceptGlobal_1").otherwise($"FinancialConceptGlobal").as("FinancialConceptGlobal"), 
     when($"IsDimensional_1".isNotNull, $"IsDimensional_1").otherwise($"IsDimensional").as("IsDimensional"), 
     when($"InstrumentId_1".isNotNull, $"InstrumentId_1").otherwise($"InstrumentId").as("InstrumentId"), 
     when($"LineItemSequence_1".isNotNull, $"LineItemSequence_1").otherwise($"LineItemSequence").as("LineItemSequence"), 
     when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"), 
     when($"FinancialConceptCodeGlobalSecondary_1".isNotNull, $"FinancialConceptCodeGlobalSecondary_1").otherwise($"FinancialConceptCodeGlobalSecondary").as("FinancialConceptCodeGlobalSecondary"), 
     when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1").otherwise($"IsRangeAllowed".cast(DataTypes.StringType)).as("IsRangeAllowed"), 
     when($"IsSegmentedByOrigin_1".isNotNull, $"IsSegmentedByOrigin_1").otherwise($"IsSegmentedByOrigin".cast(DataTypes.StringType)).as("IsSegmentedByOrigin"), 
     when($"SegmentGroupDescription".isNotNull, $"SegmentGroupDescription").otherwise($"SegmentGroupDescription").as("SegmentGroupDescription"), 
     when($"SegmentChildDescription_1".isNotNull, $"SegmentChildDescription_1").otherwise($"SegmentChildDescription").as("SegmentChildDescription"), 
     when($"SegmentChildLocalLanguageLabel_1".isNotNull, $"SegmentChildLocalLanguageLabel_1").otherwise($"SegmentChildLocalLanguageLabel").as("SegmentChildLocalLanguageLabel"), 
     when($"LocalLanguageLabel_languageId_1".isNotNull, $"LocalLanguageLabel_languageId_1").otherwise($"LocalLanguageLabel_languageId").as("LocalLanguageLabel_languageId"), 
     when($"LineItemName_languageId_1".isNotNull, $"LineItemName_languageId_1").otherwise($"LineItemName_languageId").as("LineItemName_languageId"), 
     when($"SegmentChildDescription_languageId_1".isNotNull, $"SegmentChildDescription_languageId_1").otherwise($"SegmentChildDescription_languageId").as("SegmentChildDescription_languageId"), 
     when($"SegmentChildLocalLanguageLabel_languageId_1".isNotNull, $"SegmentChildLocalLanguageLabel_languageId_1").otherwise($"SegmentChildLocalLanguageLabel_languageId").as("SegmentChildLocalLanguageLabel_languageId"), 
     when($"SegmentGroupDescription_languageId_1".isNotNull, $"SegmentGroupDescription_languageId_1").otherwise($"SegmentGroupDescription_languageId").as("SegmentGroupDescription_languageId"), 
     when($"SegmentMultipleFundbDescription_1".isNotNull, $"SegmentMultipleFundbDescription_1").otherwise($"SegmentMultipleFundbDescription").as("SegmentMultipleFundbDescription"), 
     when($"SegmentMultipleFundbDescription_languageId_1".isNotNull, $"SegmentMultipleFundbDescription_languageId_1").otherwise($"SegmentMultipleFundbDescription_languageId").as("SegmentMultipleFundbDescription_languageId"), 
     when($"IsCredit_1".isNotNull, $"IsCredit_1").otherwise($"IsCredit".cast(DataTypes.StringType)).as("IsCredit"), 
     when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"), 
     when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"), 
     when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"), 
     when($"FFAction_1".isNotNull, $"FFAction_1").otherwise((concat(col("FFAction"), lit("|!|"))).as("FFAction"))) 
     .filter(!$"FFAction".contains("D")) 

val dfMainOutputFinal = dfMainOutput.select(concat_ws("|^|", columns.map(c => col(c)): _*).as("concatenated")) 

    dfMainOutputFinal.write.partitionBy("DataPartition_1","StatementTypeCode") 
    .format("csv") 
    .option("codec", "gzip") 
    .save("s3://trfsdisu/SPARK/FinancialLineItem/output") 

答えて

1

を試してみましたが、私のコードで、この機能はmkStringと同様に動作しますが、データフレーム上で直接に実行することができます。これにより、変換ステップが冗長になり、df.write.partitionBy()メソッドを使用できます。あなたは、単に「COL3」、「COL1」、「COL2」を置き換えることができ、使用可能なすべての列を連結します小さな例、

import org.apache.spark.sql.functions._ 
import spark.implicits._ 

val df = Seq(("01", "20000", "45.30"), ("01", "30000", "45.30")) 
    .toDF("col1", "col2", "col3") 

val df2 = df.select($"DataPartiotion", $"StatementTypeCode", 
    concat_ws("|^|", df.schema.fieldNames.map(c => col(c)): _*).as("concatenated")) 

これはあなたにこのような結果のデータフレームを与える、

+--------------+-----------------+------------------+ 
|DataPartiotion|StatementTypeCode|  concatenated| 
+--------------+-----------------+------------------+ 
|   01|   20000|01|^|20000|^|45.30| 
|   01|   30000|01|^|30000|^|45.30| 
+--------------+-----------------+------------------+ 
+1

@Anupam使用したい列を使用します。データフレーム上のすべてを直接行うことは、パフォーマンスの観点からは優れているはずですが、変換を高速にする必要があります(これは1つの列なので)。 – Shaido

+0

これには1つの問題があります。レコードの中に '、'が見つかりました。最終的なcsv出力に '' 'があります。このhttps://stackoverflow.com/questions/のために別の質問を作成しました。 47002414 /追加 - カスタム - デリミタ - 追加 - 二重引用符 - 最後 - スパーク - データフレーム - csv - アウト/ 47003011?noredirect = 1#comment80963283_47003011 –

関連する問題