1

hdfsディレクトリからSpark 2.1.0 APIを使用してhdfsディレクトリからspark DataSetに複数のcsvファイルをロードしようとしています。Spark Datasetフォルダ内にヘッダーを含む複数のCSVファイルを読み込み、すべてのファイルのヘッダーが同じでない場合のレポートの不一致

val csvData = spark.read.option("header", "true").csv("csvdatatest/") 

"csvdatatest"フォルダ内に複数のcsvファイルがあります。 Sparkは最初のファイルからヘッダーのみを取り出し、これを残りのcsvファイルのヘッダーを無視してDataSetのスキーマとして生成します。ここでは例えば

hadoop fs -ls /user/kumara91/csvdatatest 
Found 2 items 
/user/kumara91/csvdatatest/controlfile-2017-10-19.csv 
/user/kumara91/csvdatatest/controlfile-2017-10-23.csv 

hadoop fs -cat /user/kumara91/csvdatatest/controlfile-2017-10-19.csv 
Delivery ID,BroadLog ID,Channel,Address,Event type,Event date,File name 

hadoop fs -cat /user/kumara91/csvdatatest/controlfile-2017-10-23.csv 
Delivery ID,BroadLog ID,Channel,Address,Event type,Event date,File name,dummycolumn 

scala> val csvData = spark.read.option("header", "true").csv("csvdatatest/") 
csvData: org.apache.spark.sql.DataFrame = [Delivery ID: string, BroadLog ID: string ... 5 more fields] 

scala> csvData.schema.fieldNames 
res1: Array[String] = Array(Delivery ID, BroadLog ID, Channel, Address, Event type, Event date, File name) 

、それだけでファイル「制御ファイル・2017-10-19.csv」からヘッダをロードし、他のcsvファイルに余分な列「dummycolumn」とヘッダを無視しました。

しかし私の要件は、フォルダ内のすべてのcsvファイルのヘッダーを比較することです。 すべてのCSVファイルに同じヘッダーが含まれている場合にのみ、ファイルをロードします。大文字と小文字のcsvファイルに多かれ少なかれ異なるヘッダーが含まれている場合のレポートの不一致

通常のhdfsファイルシステムAPIを使用してこれを行うオプションがあります。その後、Spark APIを使用します。または、Spark APIを使用してすべてのCSVファイルを1つずつ読み込んで比較するその他のオプション。

しかし、Spark APIを使用する方法があるかどうかを知りたかったのですが、各ファイルを繰り返し実行することなく達成できました。また、1つのファイルからヘッダーを読み込み、残りのファイルを無視する理由。

答えて

0

何らかの方法でファイルを反復処理せずにデータを正しく読み取る方法はありません。大きなデータでは、ファイルベースのデータソースはディレクトリベースであり、CSVの前提はディレクトリ内のすべてのファイルが同じスキーマを持つことです。 JSONソースには.read.option("mergeSchema", true)と同等のものはありません。

ヘッダーだけをチェックする場合は、ファイルを1つずつ処理する必要があります。あなたは多くのCSVを持っている場合、配布することで、

val paths: Seq[String] = ... 
val pathsAndHeaders: Seq[(String, String)] = paths.map { path => 
    val header = sc.textFile(path).take(1).collect.head 
    (path, header) 
} 

より効率的なバージョン:あなたが欲しい方法使用して、すべてのファイルのリストを取得した後、一番簡単な方法は、次のようなものを使用してヘッダをつかむですクラスタ全体のパスが、その後、ファイルを自分で読んでする必要があります:今、あなたはパスとヘッダを持っていることを

val paths: Seq[String] = ... 
val pathsAndHeaders: Seq[(String, String)] = sc.parallelize(paths) 
    .map { path => 
    val header = // read first line of file at path 
    (path, header) 
    } 
    .collect 

は、あなたが必要とするものは何でも。たとえば、同じヘッダーを持つグループにファイルをグループ化すると、一連のパスをload()に渡すと、1回の操作でそれらを読み取ることができます。

+0

私はすでに質問に言及していますが、上で説明した方法を使用することで問題を解決できることになります。しかし、私の質問は、それがフォルダ内のファイルのリストを行うことなく達成することができる天気だった。そしてここでの主な関心事は、スパークは、1つのファイルからのみ読み込み、残りの部分は考慮しないことによって、スキーマの錯覚を作成することです。 –

+0

何らかの方法でファイルを反復処理せずにデータを正しく読み取る方法はありません。上記で提供したコードは、最も簡単なアプローチです。ファイルを同じヘッダーを持つグループにグループ化すると、一連のパスを 'load()'に渡して一度の操作で読み込むことができます。大きなデータでは、ファイルベースのデータソースはディレクトリベースであり、CSVの前提はディレクトリ内のすべてのファイルが同じスキーマを持つことです。 JSONソースに存在する '.read.option(" mergeSchema "、true)'と同等のものはありません。 – Sim

関連する問題