2016-03-18 5 views
1

sparkを使用して非常に大きなデータセットを処理しています。データはavroファイルとして保存されます。データはディレクトリ構造(/ input/yyyy/MM/dd/HH /)にも編成されています。たとえば、今日のavroファイルは、/ input/2016/03/18/00〜/ input/2016/03/18/23にあります。空のavroファイルを100個作成するスパーク

ここ2年間のデータを処理すると、多くのavroファイルが処理されます。私は色相を使用して、出力の内部で行くとき今

val inputRDD = sc.load("/input", "com.databricks.spark.avro").rdd 
val outputRDD = inputRDD.map(foo).filter(_.isDefined).flatMap(x => x).join(anotherRDD).map { 
    case (a, (b, (c, d))) => (a, (b, c, d)) 
}.join(yetAnotherRDD).filter { 
    case (a, ((b, c, d), (e, f))) => Math.abs(a - b) <= 2000 
}.map { 
    case (a, ((b, c, d), (e, f))) => Row(a, d) 
} 
val outputDF = sc.createDataframe(outputRDD, outputSchema) 
outputDF.save(s"/output/${datePath(date)}", "com.databricks.spark.avro") 

を次のように

データ処理コードがあります。私は181ページを見て、各ページに私は多くの空のavroファイルを見る。

すべてのファイルが空であるわけではありませんが、空のファイルが非常に多いです。

空のファイルが必要ない場合はどうすればいいですか?

答えて

1

各入力ファイルは少なくとも1つのRDDを生成します(ファイルが大きい場合、複数の入力シーケンスで読み込み、複数のRDDを作成する可能性があります)。 アプリケーションでは、これらのRDDでフィルタを実行するため、かなりのRDDがすべて空になっている可能性があります。その理由は、すべての行が除外されているためです。 DataFrameを保存すると、各RDDは別のPARTファイルに保存されるため、空のRDDは空のRDDファイルを生成します。 これを回避するには、.coalesce(n)を使用します。これにより、RDDの数が削減されます。 は、だからあなたの最後の行に、このような何かを試してみてください:COALESCEで使用する

outputDF.coalesce(200).save(s"/output/${datePath(date)}", "com.databricks.spark.avro") 

数は、データのサイズに大きく依存しています。 RDDが多すぎると、ドライバとの通信オーバーヘッドのために多くのパフォーマンスが失われます。 RDDの数が少なすぎると、使用可能なエグゼキュータをすべて使用していない可能性があります。その結果、最適なパフォーマンスが低下します。

+0

私はチェックしましたが、スパーク1.3.0にはコアーレス機能がありません。私は彼らが後でそれをデータフレームに追加したと思います。 –

+1

この場合、 '.repartition(200)'を使うことができます。 Sparkの後のバージョンでは '.coalesce(200)'がより良く機能することに注意してください。 –

関連する問題