1

私はすべてのフィールドに連結するデータフレームを持っています。spark csvデータフレームの列を削除する

連結後、別のデータフレームになり、最終的にその出力を2つの列に分割されたcsvファイルに書き出します。その列の1つが、最終出力に含めたくない最初のデータフレームに存在します。ここで

私のコードです:

val dfMainOutput = df1resultFinal.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer") 
     .select($"LineItem_organizationId", $"LineItem_lineItemId", 
     when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition".cast(DataTypes.StringType)).as("DataPartition"), 
     when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"), 
     when($"FFAction_1".isNotNull, concat(col("FFAction_1"), lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction")) 
     .filter(!$"FFAction".contains("D")) 

ここで私は別のデータフレームを連結して作成しています:

val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.fieldNames.map(c => col(c)): _*).as("concatenated"))  

これは私が今私がいけない

dfMainOutputFinal 
    .drop("DataPartition") 
    .write 
    .partitionBy("DataPartition","StatementTypeCode") 
    .format("csv") 
    .option("header","true") 
    .option("encoding", "\ufeff") 
    .option("codec", "gzip") 
    .save("path to csv") 

を試してみましたが、何であるDataPartition私の出力の列。

私はDataPartitionに基づいてパーティションを作成していますが、DataPartitionはメインデータフレームに存在するため、出力に表示されています。

QUESTION 1:はQUESTION 2

DATAFRAME

から列を無視することができます方法:は、私のエンコード形式はUTFになりますように、私の実際のデータを書き込む前に、CSV出力ファイルに "\ufeff"を追加する方法はあります-8-BOM。

これは私が

val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.filter(_ != "DataPartition").fieldNames.map(c => col(c)): _*).as("concatenated")) 

を試してみましたが、私は削除する必要がある場合、エラー以下

<console>:238: error: value fieldNames is not a member of Seq[org.apache.spark.sql.types.StructField] 
       val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.filter(_ != "DataPartition").fieldNames.map(c => col(c)): _*).as("concatenated")) 

の下に得ることが問題でてきたものである提案答え

を1として最終出力の2列

val dfMainOutputFinal = dfMainOutput.select($"DataPartition","PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition","PartitionYear").map(c => col(c)): _*).as("concatenated")) 

答えて

1

質問1:あなたはdf.write.partitionBy()で使用

列は、最終的なcsvファイルに追加されません。データはファイル構造でエンコードされるため、これらは自動的に無視されます。しかし、何を意味するconcat_ws(およびそれによってファイルから)からそれを削除する場合、小さな変更を行うことが可能です:

ここ
concat_ws("|^|", 
    dfMainOutput.schema.fieldNames 
    .filter(_ != "DataPartition") 
    .map(c => col(c)): _*).as("concatenated")) 

列DataPartitionは、連結前に離れて濾過されます。

質問2:

スパークUTF-8 BOMをサポートしていないようですし、形式でファイルを読み込むときproblemsを引き起こしているようです。私はSparkが終了した後にそれらを追加するスクリプトを書く以外のBOMバイトを各csvファイルに追加する簡単な方法は考えていません。私の推薦は、単に通常のUTF-8フォーマットを使用することです。

dfMainOutputFinal.write.partitionBy("DataPartition","StatementTypeCode") 
    .format("csv") 
    .option("header", "true") 
    .option("encoding", "UTF-8") 
    .option("codec", "gzip") 
    .save("path to csv") 

また、Unicode standardによると、BOMは推奨されません。

は... BOMの使用は必要もなく、UTF-8のためにお勧めしますが、UTF-8のデータは、BOMを使用するか、BOMを使用する場合、他のエンコード形式から変換された状況で遭遇することができるもされていませんUTF-8署名として

0

QUESTION 1:DATAFRAME

から列を無視することができますどのように

回答:

val df = sc.parallelize(List(Person(1,2,3), Person(4,5,6))).toDF("age", "height", "weight") 

df.columns 
df.show() 



+---+------+------+ 
|age|height|weight| 
+---+------+------+ 
| 1|  2|  3| 
| 4|  5|  6| 
+---+------+------+ 


val df_new=df.select("age", "height") 
    df_new.columns 
    df_new.show() 

+---+------+ 
|age|height| 
+---+------+ 
| 1|  2| 
| 4|  5| 
+---+------+ 

df: org.apache.spark.sql.DataFrame = [age: int, height: int ... 1 more field] 
df_new: org.apache.spark.sql.DataFrame = [age: int, height: int] 

QUESTION 2:には "\ ufeff" を追加するにはどのような方法があります私の実際のデータを書き込む前に、csv出力ファイル を使用して、エンコーディングフォーマットが UTF-8-BOMになるようにしてください。

回答:

String path= "/data/vaquarkhan/input/unicode.csv"; 

String outputPath = "file:/data/vaquarkhan/output/output.csv"; 
    getSparkSession() 
     .read() 
     .option("inferSchema", "true") 
     .option("header", "true") 
     .option("encoding", "UTF-8") 
     .csv(path) 
     .write() 
     .mode(SaveMode.Overwrite) 
     .csv(outputPath); 
} 
+0

UTF-BOMが動作していません –

+0

私はちょうど列を選択したくありません、私は第2データフレームが派生した最初のデータフレームに基づいてパーティションを作成したい –

関連する問題