2017-06-16 6 views
2

Spark JavaStreamingContextで動作するプログラムがあります。私は、DStreamを使用するときに認められる出力操作がほんの少しあることを知りました。print()です。 これはJavaSparkStreamingContextでのクエリの実行

private static void analyzeHashtags() throws InterruptedException { 
    JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper_server, kafka_consumer_group, topics); 
    JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc); 
    lines.print(); 
    jssc.start(); 
    jssc.awaitTermination(); 

} 

は、今私は以下のように、このコードにクエリ操作を追加したいコードの一部です:

private static void analyzeHashtags() throws InterruptedException, SQLException { 
    JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper_server, kafka_consumer_group, topics); 
    JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc); 
    lines.print(); 
    String hashtag = "#dummy"; int frequencies = 59; 
    String cql = " CREATE (n:Hashtag {name:'"+hashtag+"', freq:"+frequencies+"})"; 
    st.executeUpdate(cql); 
    jssc.start(); 
    jssc.awaitTermination(); 
} 

しかし、このコードは一度だけクエリを実行します。私はそれがループするたびに実行したいと思います。 どうすればこのことが可能ですか?前もって感謝します。

答えて

2

DStreamで任意の操作を実行するには、foreachRDDを使用します。基礎となるrddで表される各バッチ間隔でのデータへのアクセスを提供します。

のJava/Scalaの擬似(MIX)コード:

JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new 
Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc); 
lines.foreachRDD{ rdd => 
    .. do something with the RDD here... 
} 

通常、do somethingはRDDのデータ上で動作します。 foreachPartitionなどのRDD機能を使用して、分散してそのデータを操作することができます。

ただし、ここではローカルのneo4j接続を使用していることを考慮し、各ストリーミング間隔のデータがそれほど大きくない場合は、データをドライバに収集してローカルで操作できます。完全かつ有用な答えを

lines.foreachRDD{ rdd => 
    val localDataCollection = rdd.collect 
    localDataCollection.foreach{ keywordFreqPair => 
     val cql = "CREATE (n:Hashtag {name:'"+keywordFreqPair._1+"', freq:"+keywordFreqPair._2+"})" 
     st.executeUpdate(cql) 
} 
+0

ありがとう:データが分散しそう相(reduceBykey

を減らし、foreachRDD一部になるだろう、すでに経過しているとしてそれは、このケースでフィット感になりそうです。 私は 'foreachRDD'をJavaで実装する方法を知りません(idk Scala)。ラムダ式を使うと、 'lines.foreachRDD(rdd - >(...')の代わりにドットを使う関数を書くべきでしょうか? – sirdan

+1

私はSpark StreamingでScalaを使うことを個人的にお勧めします。 'foreachRDD'ラムダのJava変換については、私はSpark Streamingサンプルのpkgで例を見つけることができると思います。例:https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/ JavaSqlNetworkWordCount.java – maasg

+0

ありがとう、これはたくさん役立ちます – sirdan