2017-05-15 10 views
1

私はsparkで時間ウィンドウで受け取った行数とその合計バイト数を求めています。これらの行は、時間の各ウィンドウの最後にあります。ウィンドウ内の行数とウィンドウのサイズをバイト単位で計算する - Spark Streaming

一方、私のコードは各行でカウントされ、グローバルにはカウントされません。誰かが自分のコードで何が間違っているか教えてもらえますか?

  1. words.window(windowDuration, slideDuration).foreachRDD...

  2. words.countByWindow(windowDuration, slideDuration).print();

問題は、あなたが和の値をリセットしているされています

public class SocketDriver implements Serializable { 

private static final Pattern BACKSLASH = Pattern.compile("\n"); 

public static void main(String[] args) throws Exception { 

    if (args.length < 2) { 
     System.err.println("Usage: SocketDriver <hostname> <port>"); 
     System.exit(1); 
    } 

    final String hostname = args[0]; 
    final int port = Integer.parseInt(args[1]); 

    final String appName = "SocketDriver"; 
    final String master = "local[2]"; 

    final Duration batchDuration = Durations.seconds(1); 
    final Duration windowDuration = Durations.seconds(30); 
    final Duration slideDuration = Durations.seconds(3); 
    final String checkpointDirectory = Files.createTempDirectory(appName).toString(); 


    SparkConf sparkConf = new SparkConf() 
            .setAppName(appName) 
            .setMaster(master); 

    JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, batchDuration); 
    streamingContext.checkpoint(checkpointDirectory); 

    JavaReceiverInputDStream<String> lines = streamingContext.socketTextStream(hostname, port, StorageLevels.MEMORY_AND_DISK_SER); 

    JavaDStream<String> words = lines.flatMap(word -> Arrays.asList(BACKSLASH.split(word)).iterator()); 

    words.window(windowDuration, slideDuration).foreachRDD((VoidFunction<JavaRDD<String>>) 
      rdd -> rdd.foreach((VoidFunction<String>) 
        line -> { 
         double bytes = 0; 
         int sum = 0; 
         double frequency = 0.0; 
         sum += 1; 
         bytes += line.getBytes().length; 
         frequency += bytes/sum; 

         System.out.println("windowDuration: " + windowDuration.milliseconds()/1000 + " seconds " + " : " + "slideDuration: " + slideDuration.milliseconds()/1000 + " seconds " + " : " + 
           "total messages : " + sum + " total bytes : " + bytes + " frequency : " + frequency); 
        }) 
    ); 

    words.countByWindow(windowDuration, slideDuration).print(); 



    streamingContext.start(); 
    streamingContext.awaitTerminationOrTimeout(60000); 
    streamingContext.stop(); 
} 


} 

答えて

0

問題は、次の最初の文にあり各行のバイト数。これは、質問に記載されているように、1行のバイト数を示しています。

は次で上記二つの文を置き換えることにより、所望の機能を実現することができる:

//counts will have elements of the form (1, numberOfBytesInALine)  
JavaPairDStream<Integer, Integer> counts = words.mapToPair(new PairFunction<String, Integer, Integer>() { 
    @Override 
    public Tuple2<Integer, Integer> call(final String line) { 
     return new Tuple2<Integer, Integer>(1, line.getBytes().length)); 
    } 
}); 

//countOfWindow will have single element of the form (totalNumberOfLines, totalNumberOfBytes) 
JavaDStream<Tuple2<Integer, Integer>> countOfWindow = counts.reduceByWindow(new Function2<Tuple2<Integer, Integer>,Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { 
    @Override 
    public Tuple2<Integer, Integer> call(final Tuple2<Integer, Integer> a , final Tuple2<Integer, Integer> b) { 
     return new Tuple2<Integer, Integer>(a._1 + b._1, a._2 + b._2)); 
    } 
} 
,windowDuration,slideDuration); 
countOfWindow.print(); 

トリックは、その行のバイト1及び数を整数に各行を変換することでした。その後、それを減らすと、1は行数に合計され、1行あたりのバイト数は合計バイト数になります。

+0

解決策をテストしてお知らせします。 – Zizou

+0

非常にうまく動作します。ストリーミングについてのSparkのドキュメンテーションは、あなたが持っているとうれしいでしょう。 – Zizou

+0

本を読んで、それは非常によく書かれている:https://www.amazon.com/Pro-Spark-Streaming-Real-Time-Analytics/dp/1484214803 –

関連する問題