Spark JavaStreamingContext
で動作するプログラムがあります。私は、DStreamを使用するときに認められる出力操作がほんの少しあることを知りました。print()
です。 これはJavaSparkStreamingContextでのクエリの実行
private static void analyzeHashtags() throws InterruptedException {
JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper_server, kafka_consumer_group, topics);
JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc);
lines.print();
jssc.start();
jssc.awaitTermination();
}
は、今私は以下のように、このコードにクエリ操作を追加したいコードの一部です:
private static void analyzeHashtags() throws InterruptedException, SQLException {
JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper_server, kafka_consumer_group, topics);
JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc);
lines.print();
String hashtag = "#dummy"; int frequencies = 59;
String cql = " CREATE (n:Hashtag {name:'"+hashtag+"', freq:"+frequencies+"})";
st.executeUpdate(cql);
jssc.start();
jssc.awaitTermination();
}
しかし、このコードは一度だけクエリを実行します。私はそれがループするたびに実行したいと思います。 どうすればこのことが可能ですか?前もって感謝します。
ありがとう:データが分散しそう相(
reduceBykey
)を減らし、
foreachRDD
一部になるだろう、すでに経過しているとしてそれは、このケースでフィット感になりそうです。 私は 'foreachRDD'をJavaで実装する方法を知りません(idk Scala)。ラムダ式を使うと、 'lines.foreachRDD(rdd - >(...')の代わりにドットを使う関数を書くべきでしょうか? – sirdan私はSpark StreamingでScalaを使うことを個人的にお勧めします。 'foreachRDD'ラムダのJava変換については、私はSpark Streamingサンプルのpkgで例を見つけることができると思います。例:https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/ JavaSqlNetworkWordCount.java – maasg
ありがとう、これはたくさん役立ちます – sirdan