1
私は2つのストリームをマージすると、そのうちの一つは、(頻繁にはない最新情報と静的データのような)ステートフルする必要がありますしようとしている:Apacheのスパークは、(updateStateByKey後にマージ)
SparkConf conf = new SparkConf().setAppName("Test Application").setMaster("local[*]");
JavaStreamingContext context = new JavaStreamingContext(conf, Durations.seconds(10));
context.checkpoint(".");
JavaDStream<String> dataStream = context.socketTextStream("localhost", 9998);
JavaDStream<String> refDataStream = context.socketTextStream("localhost", 9999);
JavaPairDStream<String, String> pairDataStream = dataStream.mapToPair(e -> {
String[] tmp = e.split(" ");
return new Tuple2<>(tmp[0], tmp[1]);
});
JavaPairDStream<String, String> pairRefDataStream = refDataStream.mapToPair(e -> {
String[] tmp = e.split(" ");
return new Tuple2<>(tmp[0], tmp[1]);
}).updateStateByKey((Function2<List<String>, Optional<String>, Optional<String>>) (strings, stringOptional) -> {
if (!strings.isEmpty()) {
return Optional.of(strings.get(0));
}
return Optional.absent();
});
pairDataStream.join(pairRefDataStream).print();
context.start();
context.awaitTermination();
私は最初のストリームに1 aaa
を書くときすぐにすべてがうまく動作する2番目に、1 111
、私はマージの結果を参照してください。しかし、1分後に最初のストリームに1 bbb
と書くと、何も見えません。
updateStateByKey()
は何を正しく理解していますか?または私は間違っていますか?