1
私はsparkで時間ウィンドウで受け取った行数とその合計バイト数を求めています。これらの行は、時間の各ウィンドウの最後にあります。ウィンドウ内の行数とウィンドウのサイズをバイト単位で計算する - Spark Streaming
一方、私のコードは各行でカウントされ、グローバルにはカウントされません。誰かが自分のコードで何が間違っているか教えてもらえますか?
words.window(windowDuration, slideDuration).foreachRDD...
words.countByWindow(windowDuration, slideDuration).print();
問題は、あなたが和の値をリセットしているされています
public class SocketDriver implements Serializable {
private static final Pattern BACKSLASH = Pattern.compile("\n");
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: SocketDriver <hostname> <port>");
System.exit(1);
}
final String hostname = args[0];
final int port = Integer.parseInt(args[1]);
final String appName = "SocketDriver";
final String master = "local[2]";
final Duration batchDuration = Durations.seconds(1);
final Duration windowDuration = Durations.seconds(30);
final Duration slideDuration = Durations.seconds(3);
final String checkpointDirectory = Files.createTempDirectory(appName).toString();
SparkConf sparkConf = new SparkConf()
.setAppName(appName)
.setMaster(master);
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, batchDuration);
streamingContext.checkpoint(checkpointDirectory);
JavaReceiverInputDStream<String> lines = streamingContext.socketTextStream(hostname, port, StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = lines.flatMap(word -> Arrays.asList(BACKSLASH.split(word)).iterator());
words.window(windowDuration, slideDuration).foreachRDD((VoidFunction<JavaRDD<String>>)
rdd -> rdd.foreach((VoidFunction<String>)
line -> {
double bytes = 0;
int sum = 0;
double frequency = 0.0;
sum += 1;
bytes += line.getBytes().length;
frequency += bytes/sum;
System.out.println("windowDuration: " + windowDuration.milliseconds()/1000 + " seconds " + " : " + "slideDuration: " + slideDuration.milliseconds()/1000 + " seconds " + " : " +
"total messages : " + sum + " total bytes : " + bytes + " frequency : " + frequency);
})
);
words.countByWindow(windowDuration, slideDuration).print();
streamingContext.start();
streamingContext.awaitTerminationOrTimeout(60000);
streamingContext.stop();
}
}
解決策をテストしてお知らせします。 – Zizou
非常にうまく動作します。ストリーミングについてのSparkのドキュメンテーションは、あなたが持っているとうれしいでしょう。 – Zizou
本を読んで、それは非常によく書かれている:https://www.amazon.com/Pro-Spark-Streaming-Real-Time-Analytics/dp/1484214803 –