2016-05-29 6 views
1

私は2つのストリームをマージすると、そのうちの一つは、(頻繁にはない最新情報と静的データのような)ステートフルする必要がありますしようとしている:Apacheのスパークは、(updateStateByKey後にマージ)

SparkConf conf = new SparkConf().setAppName("Test Application").setMaster("local[*]"); 
JavaStreamingContext context = new JavaStreamingContext(conf, Durations.seconds(10)); 
context.checkpoint("."); 
JavaDStream<String> dataStream = context.socketTextStream("localhost", 9998); 
JavaDStream<String> refDataStream = context.socketTextStream("localhost", 9999); 

JavaPairDStream<String, String> pairDataStream = dataStream.mapToPair(e -> { 
    String[] tmp = e.split(" "); 
    return new Tuple2<>(tmp[0], tmp[1]); 
}); 

JavaPairDStream<String, String> pairRefDataStream = refDataStream.mapToPair(e -> { 
    String[] tmp = e.split(" "); 
    return new Tuple2<>(tmp[0], tmp[1]); 
}).updateStateByKey((Function2<List<String>, Optional<String>, Optional<String>>) (strings, stringOptional) -> { 
    if (!strings.isEmpty()) { 
     return Optional.of(strings.get(0)); 
    } 
    return Optional.absent(); 
}); 

pairDataStream.join(pairRefDataStream).print(); 


context.start(); 
context.awaitTermination(); 

私は最初のストリームに1 aaaを書くときすぐにすべてがうまく動作する2番目に、1 111、私はマージの結果を参照してください。しかし、1分後に最初のストリームに1 bbbと書くと、何も見えません。

updateStateByKey()は何を正しく理解していますか?または私は間違っていますか?

答えて

3

updateStateByKeyあなたが求めているとおりのものです。特に、現在のウィンドウでは、(Optional.absent();を返す)忘れて、それを指示する(strings.isEmpty())データが含まれていない場合:

if (!strings.isEmpty()) { 
    return Optional.of(strings.get(0)); 
} 
return Optional.absent(); 

何あなたはおそらくしたいことは、以前の状態に戻すことである一方で:

if (!strings.isEmpty()) { 
    return Optional.of(strings.get(0)); 
} 
return stringOptional; 
関連する問題