目的:sparkストリーミングでデータを読み込み、キャンドルラーにデータを保存 By:Java Spark cassandra connector 1.6 データ入力:単純なjsonラインオブジェクト{"id": "1"、 "field1": " 。Javaスパークストリーミングとキャッサンドラ
**JavaPairReceiverInputDStream**<String, String> messages =
KafkaUtils.createStream(ssc,
targetKafkaServerPort, targetTopic, topicMap);
**JavaDStream** list = messages.map(new Function<Tuple2<String,String>,List<Object>>(){
public List<Object> call( Tuple2<String,String> tuple2){
List<Object> **list**=new ArrayList<Object>();
Gson gson = new Gson();
MyClass myclass = gson.fromJson(tuple2._2(), MyClass.class);
myclass.setNewData("new_data");
String jsonInString = gson.toJson(myclass);
list.add(jsonInString);
return list;
}
});
次:VALUE1スパークストリーミングによりカフカから読み取るためのJavaクラスi've}
、データが読み出され、その後、カサンドラに保存処理
ここでの主なコードでありますコードが正しくありません:
**javaFunctions**(list)
.writerBuilder("schema", "table", mapToRow(JavaDStream.class))
.saveToCassandra();
「javaFunctions」方法はJavaRDDオブジェクトと「リスト」JavaDStreamで期待しているので...
I'dはJavaRDDにJavaDStreamをキャストする必要がありますが、私は右の方法を見つけるドント。 ..
助けてください?