hdfsディレクトリからSpark 2.1.0 APIを使用してhdfsディレクトリからspark DataSetに複数のcsvファイルをロードしようとしています。Spark Datasetフォルダ内にヘッダーを含む複数のCSVファイルを読み込み、すべてのファイルのヘッダーが同じでない場合のレポートの不一致
val csvData = spark.read.option("header", "true").csv("csvdatatest/")
"csvdatatest"フォルダ内に複数のcsvファイルがあります。 Sparkは最初のファイルからヘッダーのみを取り出し、これを残りのcsvファイルのヘッダーを無視してDataSetのスキーマとして生成します。ここでは例えば
hadoop fs -ls /user/kumara91/csvdatatest
Found 2 items
/user/kumara91/csvdatatest/controlfile-2017-10-19.csv
/user/kumara91/csvdatatest/controlfile-2017-10-23.csv
hadoop fs -cat /user/kumara91/csvdatatest/controlfile-2017-10-19.csv
Delivery ID,BroadLog ID,Channel,Address,Event type,Event date,File name
hadoop fs -cat /user/kumara91/csvdatatest/controlfile-2017-10-23.csv
Delivery ID,BroadLog ID,Channel,Address,Event type,Event date,File name,dummycolumn
scala> val csvData = spark.read.option("header", "true").csv("csvdatatest/")
csvData: org.apache.spark.sql.DataFrame = [Delivery ID: string, BroadLog ID: string ... 5 more fields]
scala> csvData.schema.fieldNames
res1: Array[String] = Array(Delivery ID, BroadLog ID, Channel, Address, Event type, Event date, File name)
、それだけでファイル「制御ファイル・2017-10-19.csv」からヘッダをロードし、他のcsvファイルに余分な列「dummycolumn」とヘッダを無視しました。
しかし私の要件は、フォルダ内のすべてのcsvファイルのヘッダーを比較することです。 すべてのCSVファイルに同じヘッダーが含まれている場合にのみ、ファイルをロードします。大文字と小文字のcsvファイルに多かれ少なかれ異なるヘッダーが含まれている場合のレポートの不一致
通常のhdfsファイルシステムAPIを使用してこれを行うオプションがあります。その後、Spark APIを使用します。または、Spark APIを使用してすべてのCSVファイルを1つずつ読み込んで比較するその他のオプション。
しかし、Spark APIを使用する方法があるかどうかを知りたかったのですが、各ファイルを繰り返し実行することなく達成できました。また、1つのファイルからヘッダーを読み込み、残りのファイルを無視する理由。
私はすでに質問に言及していますが、上で説明した方法を使用することで問題を解決できることになります。しかし、私の質問は、それがフォルダ内のファイルのリストを行うことなく達成することができる天気だった。そしてここでの主な関心事は、スパークは、1つのファイルからのみ読み込み、残りの部分は考慮しないことによって、スキーマの錯覚を作成することです。 –
何らかの方法でファイルを反復処理せずにデータを正しく読み取る方法はありません。上記で提供したコードは、最も簡単なアプローチです。ファイルを同じヘッダーを持つグループにグループ化すると、一連のパスを 'load()'に渡して一度の操作で読み込むことができます。大きなデータでは、ファイルベースのデータソースはディレクトリベースであり、CSVの前提はディレクトリ内のすべてのファイルが同じスキーマを持つことです。 JSONソースに存在する '.read.option(" mergeSchema "、true)'と同等のものはありません。 – Sim