2016-03-22 9 views
1

Spark StreamingにSpark 1.4を使用しています。 KafkaはSpark Streamのデータソースです。saveAsNewAPIHadoopFileを使用中にファイルが上書きされる

記録は毎秒カフカで公開されています。私たちの要件は、カフカで公開されたレコードを毎分1つのフォルダに保存することです。ストリームは5秒ごとにレコードを読み取ります。たとえば、1200 PMと1201 PMの間に公開されたレコードは、フォルダ「1200」に格納されます。 1201PMから1202PMの間でフォルダ "1201"のようになります。

ストリームは5秒ごとにデータを処理しているので

//First Group records in RDD by date 
stream.foreachRDD (rddWithinStream -> { 
    JavaPairRDD<String, Iterable<String>> rddGroupedByDirectory = rddWithinStream.mapToPair(t -> { 
    return new Tuple2<String, String> (targetHadoopFolder, t._2()); 
}).groupByKey(); 
// All records grouped by folders they will be stored in 


// Create RDD for each target folder. 
for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) { 
    JavaPairRDD <String, Iterable<String>> rddByKey = rddGroupedByDirectory.filter(groupedTuples -> { 
    return groupedTuples._1().equals(hadoopFolder); 
    }); 

// And store it in Hadoop 
    rddByKey.saveAsNewAPIHadoopFile(directory, String.class, String.class, TextOutputFormat.class); 
} 

を次のように私が書いたコードがあり、saveAsNewAPIHadoopFile分で呼び出さ複数回取得します。これにより、「Part-00000」ファイルが毎回上書きされます。

"directory"パラメータで指定されたディレクトリでは、saveAsNewAPIHadoopFileは、私がsinlgeワーカーノードを持っていてもpart-0000Nファイルを作成し続けることを期待していました。

助けを借りてください。

ありがとうございました。

答えて

1

この場合、自分で出力パスとファイル名を作成する必要があります。増分ファイルの命名は、出力操作がDStream(それぞれRDDではない)で直接呼び出された場合にのみ機能します。

stream.foreachRDDの引数関数は、各マイクロバッチについてTimeの情報を得ることができます。私がしようとすると、私のtimeToDirNameのfuncが(DIR +時間)で、実行後、それはHDFS内のディレクトリを示し

stream.foreachRDD((rdd, time) -> { 
    val directory = timeToDirName(prefix, time) 
    rdd.saveAsNewAPIHadoopFile(directory, String.class, String.class, TextOutputFormat.class); 
}) 
+0

が、次のようにあなたが各RDDを保存することができます

def foreachRDD(foreachFunc: (RDD[T], Time) ⇒ Unit) 

Spark documentationを参照すると、それにアクセスすると、「Dir_Nameは存在しません」と表示されます – JSR29

関連する問題