私はKafkaクラスタからデータを取得するためにSpark Kafkaコネクタを使用しています。それから私はJavaDStream<String>
としてデータを取得しています。データをJavaDStream<EventLog>
として取得するには、EventLog
はJava Beanですか?Spark Kafka ConnectorでオブジェクトのJavaDStreamを取得するには?
public static JavaDStream<EventLog> fetchAndValidateData(String zkQuorum, String group, Map<String, Integer> topicMap) {
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, zkQuorum, group, topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
jssc.start();
jssc.awaitTermination();
return lines;
}
私の目標は、EventLog
と同じ仕様のテーブルカサンドラにこのデータを保存することです。 Spark Cassandraコネクタは、javaFunctions(rdd).writerBuilder("ks", "event", mapToRow(EventLog.class)).saveToCassandra();
のようにinsert文にJavaRDD<EventLog>
を受け入れます。私はKafkaからJavaRDD<EventLog>
を手に入れたいです。
メッセージを文字列のペアとして使用して変換したいと考えていますか?または、JavaReceiverInputDStreamを使用しますか? EventLog型を正確にどこに持ち込みたいのですか?あなたはEventLog型を受け取り、そこからJavaDStreamを構築する受信機を定義しようとしましたか? –
Sunny
@サニー私の目的は、カサンドラにデータを書き込むことです。スパークカサンドラコネクタ 'このようなINSERT文でJavaRDD'受け付けます。 'javaFunctions(RDD).writerBuilder( "KS"、 "イベント"、mapToRow(EventLog.class))saveToCassandraを();'。私はこれらのJavaRDD をKafkaから入手したいと思います。 –
khateeb
これらのEventLogをkafkaに書き込むコードにもアクセスできますか?カスタムシリアライザが実装されていて、EventLogがシリアル化され、KafkaにEventLogとして書き込まれていますか? – Sunny