2017-02-09 11 views
0

私はCassandraでJavaスパークストリーミングをしようとしています。私はScalaでも同じことをしましたが、私はJavaでどのように進めるべきかわかりません。ウェブは、私にJavaのスパークストリーミングとCassandraの例を教えてくれませんでした。Java Spark Streaming with Cassandra

は、いくつかのいずれかは、Javaで以下のScalaのコードを持っているかを見せてもらえ:

import org.apache.spark.streaming.dstream.ConstantInputDStream 

val ssc = new StreamingContext(conf, Seconds(10)) 

val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu") 

val dstream = new ConstantInputDStream(ssc, cassandraRDD) 

dstream.foreachRDD{ rdd => 
    // any action will trigger the underlying cassandra query, using collect to have a simple output 
    println(rdd.collect.mkString("\n")) 
} 
ssc.start() 
ssc.awaitTermination() 

すべてのヘルプは高く評価されます。ありがとう

答えて

1

あなたのforeachRDD変換では、あなたのデータをcassandraテーブル形式で変換できます。

JavaRDD<TestBean> cassandraRDD = testRDD 
       .flatMap(new FlatMapFunction<Tuple2<String, List<Map<String, Object>>>, TestBean>() { 

        private static final long serialVersionUID = 1L; 

        @Override 
        public Iterable<TestBean> call(Tuple2<String, List<Map<String, Object>>> tuple) throws Exception { 

         return rawData; 
        } 
       }); 

      javaFunctions(jsonRDD).writerBuilder(CASSANDRA_KEYSPACE,CASSANDRA_TABLE, mapToRow(TestBean.class)).saveToCassandra(); 
+0

これらは私のimport文 インポート静的com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctionsです。 import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow; –