JavaPairInputDStreamを作成し、消費されたデータをCassandraテーブルに保存しようとしました。しかし、問題に直面し、コードで開始する方法がわからない:これは私がSparkStreamingのために書かれているコードであるSpark Streaming Cassandraテーブルへの保存
:
package com.test.anna.KafkaSpark;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapColumnTo;
import static com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import com.datastax.driver.core.Session;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.datastax.spark.connector.japi.CassandraJavaUtil;
import com.datastax.spark.connector.writer.RowWriterFactory;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
import java.util.Map;
public class SparkStreamingConsumer {
public static void main(String[] args) {
// TODO Auto-generated method stub
SparkConf conf = new SparkConf()
.setAppName("kafka-sandbox")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(20000));
// TODO: processing pipeline
Map<String, String> kafkaParams = new HashMap();
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("zookeeper.connect","localhost:2181");
Set<String> topics = Collections.singleton("test6");
System.out.println("Size of topic--->>>>"+topics.size());
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
directKafkaStream.foreachRDD(rdd -> {
System.out.println("Message Received "+rdd.values().take(1));
System.out.println("--- New RDD with " + rdd.partitions().size()
+ " partitions and " + rdd.count() + " records");
rdd.foreach(record -> System.out.println(record._2));
});
directKafkaStream.foreachRDD(rdd ->{
rdd.foreachPartition(item ->{
while (item.hasNext()) {
ssc.start();
ssc.awaitTermination();
}
}
親切
私はカサンドラのテーブルにこのデータを保存する方法を知っているが、小片コードは多くの助けになります:)
ありがとうございました。
ドキュメントはかなりうまくカバーしています。https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md – maasg