2016-07-20 7 views
0

目的:sparkストリーミングでデータを読み込み、キャンドルラーにデータを保存 By:Java Spark cassandra connector 1.6 データ入力:単純なjsonラインオブジェクト{"id": "1"、 "field1": " 。Javaスパークストリーミングとキャッサンドラ

**JavaPairReceiverInputDStream**<String, String> messages = 
      KafkaUtils.createStream(ssc, 
        targetKafkaServerPort, targetTopic, topicMap); 

    **JavaDStream** list = messages.map(new Function<Tuple2<String,String>,List<Object>>(){ 
     public List<Object> call( Tuple2<String,String> tuple2){ 
      List<Object> **list**=new ArrayList<Object>(); 

      Gson gson = new Gson(); 
      MyClass myclass = gson.fromJson(tuple2._2(), MyClass.class); 
      myclass.setNewData("new_data"); 
      String jsonInString = gson.toJson(myclass); 
      list.add(jsonInString); 
      return list; 
     } 
    }); 

次:VALUE1スパークストリーミングによりカフカから読み取るためのJavaクラスi've}

、データが読み出され、その後、カサンドラに保存処理

ここでの主なコードでありますコードが正しくありません:

**javaFunctions**(list) 
      .writerBuilder("schema", "table", mapToRow(JavaDStream.class)) 
      .saveToCassandra(); 

「javaFunctions」方法はJavaRDDオブジェクトと「リスト」JavaDStreamで期待しているので...

I'dはJavaRDDにJavaDStreamをキャストする必要がありますが、私は右の方法を見つけるドント。 ..

助けてください?

答えて

0

のは 輸入静的com.datastax.spark.connector.japi.CassandraStreamingJavaUtilを使用してみましょう。*の代わりにcom.datastax.spark.connector.japi.CassandraJavaUtilの。*

0

ummmmない本当に...何I' ...

dStream.foreachRDD(new Function<JavaRDD<MyObject>, Void>() { 
     @Override 
     public Void call(JavaRDD<MyObject> rdd) throws Exception { 
      if (rdd != null) { 
       javaFunctions(rdd) 
         .writerBuilder("schema", "table", mapToRow(MyObject.class)) 
         .saveToCassandra(); 
       logging(" --> Saved data to cassandra",1,null); 
      } 

      return null; 
     } 
    }); 

希望が役に立つことにする:後dsStreamを作成foreachRDDを使用しているやりました

関連する問題