Spark StreamingにSpark 1.4を使用しています。 KafkaはSpark Streamのデータソースです。saveAsNewAPIHadoopFileを使用中にファイルが上書きされる
記録は毎秒カフカで公開されています。私たちの要件は、カフカで公開されたレコードを毎分1つのフォルダに保存することです。ストリームは5秒ごとにレコードを読み取ります。たとえば、1200 PMと1201 PMの間に公開されたレコードは、フォルダ「1200」に格納されます。 1201PMから1202PMの間でフォルダ "1201"のようになります。
ストリームは5秒ごとにデータを処理しているので
//First Group records in RDD by date
stream.foreachRDD (rddWithinStream -> {
JavaPairRDD<String, Iterable<String>> rddGroupedByDirectory = rddWithinStream.mapToPair(t -> {
return new Tuple2<String, String> (targetHadoopFolder, t._2());
}).groupByKey();
// All records grouped by folders they will be stored in
// Create RDD for each target folder.
for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) {
JavaPairRDD <String, Iterable<String>> rddByKey = rddGroupedByDirectory.filter(groupedTuples -> {
return groupedTuples._1().equals(hadoopFolder);
});
// And store it in Hadoop
rddByKey.saveAsNewAPIHadoopFile(directory, String.class, String.class, TextOutputFormat.class);
}
を次のように私が書いたコードがあり、saveAsNewAPIHadoopFile分で呼び出さ複数回取得します。これにより、「Part-00000」ファイルが毎回上書きされます。
"directory"パラメータで指定されたディレクトリでは、saveAsNewAPIHadoopFileは、私がsinlgeワーカーノードを持っていてもpart-0000Nファイルを作成し続けることを期待していました。
助けを借りてください。
ありがとうございました。
が、次のようにあなたが各
RDD
を保存することができます:Spark documentationを参照すると、それにアクセスすると、「Dir_Nameは存在しません」と表示されます – JSR29