2016-04-05 9 views
2

小さなプログラムを実行して、ウィンドウからディレクトリからストリーミングログを読み込もうとしました。ログはlogs_2016-04-05のlogsフォルダにあります。サンプルログは以下のとおりです。私は、ストリーミングされているものであればログに結果を表示するだけのアプリケーションにしたいと思っています。それは走っていますか?ログファイルからrddを印刷しようとしています

 SparkConf conf = new SparkConf().setAppName("Log Analyzer").setMaster("local[*]"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 

    JavaStreamingContext jssc = new JavaStreamingContext(sc, 
      SLIDE_INTERVAL); 


    JavaDStream<String> logData = jssc.textFileStream("C:/logs/"); 


     JavaDStream<ApacheAccessLog> accessLogsDStream = logData.flatMap(
      line -> { 
       List<ApacheAccessLog> list = new ArrayList<>(); 
       try { 
        list.add(ApacheAccessLog.parseFromLogLine(line)); 
        return list; 
       } catch (RuntimeException e) { 
        return list; 
       } 
      }).cache(); 
    JavaDStream<ApacheAccessLog> windowDStream 
      = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL); 
    windowDStream.foreachRDD(accessLogs -> { 
     System.out.println(accessLogs.count()); 
     if (accessLogs.count() == 0) { 
      System.out.println("No access logs in this time intervalsddssdsd"); 
      return null; 
     } 
    jssc.start();    
    jssc.awaitTermination(); 



    Logs sample : 

    64.242.88.10 - - [05/Apr/2016:10:27:07-0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291 
    64.242.88.10 - - [05/Apr/2016:10:27:07-0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291 
    64.242.88.10 - - [05/Apr/2016:10:27:07-0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291 
    64.242.88.10 - - [05/Apr/2016:10:27:07-0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291 

結果:

16/04/05 12:42:25 INFO scheduler.JobScheduler: Started JobScheduler 
    16/04/05 12:42:30 INFO dstream.FlatMappedDStream: Slicing from 1459840330000 ms to 1459840350000 ms (aligned to 1459840330000 ms and 1459840350000 ms) 
    16/04/05 12:42:30 INFO dstream.FlatMappedDStream: Time 1459840340000 ms is invalid as zeroTime is 1459840340000 ms and slideDuration is 10000 ms and difference is 0 ms 
    16/04/05 12:42:31 INFO dstream.FileInputDStream: Finding new files took 549 ms 
    16/04/05 12:42:31 INFO dstream.FileInputDStream: New files at time 1459840350000 ms: 

    16/04/05 12:42:31 INFO dstream.FlatMappedDStream: Persisting RDD 2 for time 1459840350000 ms to StorageLevel(false, true, false, false, 1) at time 1459840350000 ms 
    16/04/05 12:42:31 INFO scheduler.JobScheduler: Added jobs for time 1459840350000 ms 
    16/04/05 12:42:31 INFO scheduler.JobScheduler: Starting job streaming job 1459840350000 ms.0 from job set of time 1459840350000 ms 
    16/04/05 12:42:31 INFO spark.SparkContext: Starting job: count at Streaming2.java:76 
    16/04/05 12:42:31 INFO spark.SparkContext: Job finished: count at Streaming2.java:76, took 0.121043739 s 
    0 
    16/04/05 12:42:31 INFO spark.SparkContext: Starting job: count at Streaming2.java:77 
    16/04/05 12:42:31 INFO spark.SparkContext: Job finished: count at Streaming2.java:77, took 2.408E-5 s 
    No access logs in this time intervalsddssdsd 
    16/04/05 12:42:31 INFO scheduler.JobScheduler: Finished job streaming job 1459840350000 ms.0 from job set of time 1459840350000 ms 
    16/04/05 12:42:31 INFO scheduler.JobScheduler: Total delay: 1.973 s for time 1459840350000 ms (execution: 0.555 s) 
    16/04/05 12:42:32 INFO dstream.FileInputDStream: Cleared 0 old files that were older than 1459840310000 ms: 
    16/04/05 12:42:40 INFO dstream.FlatMappedDStream: Slicing from 1459840340000 ms to 1459840360000 ms (aligned to 1459840340000 ms and 1459840360000 ms) 
    16/04/05 12:42:40 INFO dstream.FlatMappedDStream: Time 1459840340000 ms is invalid as zeroTime is 1459840340000 ms and slideDuration is 10000 ms and difference is 0 ms 
    16/04/05 12:42:40 INFO dstream.FileInputDStream: Finding new files took 2 ms 
    16/04/05 12:42:40 INFO dstream.FileInputDStream: New files at time 1459840360000 ms: 

    16/04/05 12:42:40 INFO dstream.FlatMappedDStream: Persisting RDD 6 for time 1459840360000 ms to StorageLevel(false, true, false, false, 1) at time 1459840360000 ms 
    16/04/05 12:42:40 INFO scheduler.JobScheduler: Added jobs for time 1459840360000 ms 
    16/04/05 12:42:40 INFO scheduler.JobScheduler: Starting job streaming job 1459840360000 ms.0 from job set of time 1459840360000 ms 
    16/04/05 12:42:40 INFO spark.SparkContext: Starting job: count at Streaming2.java:76 
    16/04/05 12:42:40 INFO spark.SparkContext: Job finished: count at Streaming2.java:76, took 3.158E-5 s 
    0 
    16/04/05 12:42:40 INFO spark.SparkContext: Starting job: count at Streaming2.java:77 
    16/04/05 12:42:40 INFO spark.SparkContext: Job finished: count at Streaming2.java:77, took 4.5003E-5 s 
    No access logs in this time intervalsddssdsd 
    16/04/05 12:42:40 INFO scheduler.JobScheduler: Finished job streaming job 1459840360000 ms.0 from job set of time 1459840360000 ms 
    16/04/05 12:42:40 INFO scheduler.JobScheduler: Total delay: 0.034 s for time 1459840360000 ms (execution: 0.013 s) 
    16/04/05 12:42:40 INFO rdd.UnionRDD: Removing RDD 3 from persistence list 
    16/04/05 12:42:40 INFO storage.BlockManager: Removing RDD 3 
    16/04/05 12:42:40 INFO dstream.FileInputDStream: Cleared 0 old files that were older than 1459840320000 ms: 
    16/04/05 12:42:50 INFO dstream.FlatMappedDStream: Slicing from 1459840350000 ms to 1459840370000 ms (aligned to 1459840350000 ms and 1459840370000 ms) 
    16/04/05 12:42:50 INFO dstream.FileInputDStream: Finding new files took 0 ms 
    16/04/05 12:42:50 INFO dstream.FileInputDStream: New files at time 1459840370000 ms: 

    16/04/05 12:42:50 INFO dstream.FlatMappedDStream: Persisting RDD 10 for time 1459840370000 ms to StorageLevel(false, true, false, false, 1) at time 1459840370000 ms 
    16/04/05 12:42:50 INFO scheduler.JobScheduler: Added jobs for time 1459840370000 ms 

答えて