0

JavaPairInputDStreamを作成し、消費されたデータをCassandraテーブルに保存しようとしました。しかし、問題に直面し、コードで開始する方法がわからない:これは私がSparkStreamingのために書かれているコードであるSpark Streaming Cassandraテーブルへの保存

package com.test.anna.KafkaSpark; 

import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapColumnTo; 
import static com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.*; 
import java.util.Arrays; 
import java.util.Collections; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Set; 
import java.util.function.Function; 

import com.datastax.driver.core.Session; 
import org.apache.spark.SparkConf; 
import org.apache.spark.SparkContext; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.FlatMapFunction; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 

import com.datastax.spark.connector.cql.CassandraConnector; 
import com.datastax.spark.connector.japi.CassandraJavaUtil; 
import com.datastax.spark.connector.writer.RowWriterFactory; 

import kafka.serializer.StringDecoder; 
import scala.Tuple2; 
import java.util.Map; 

public class SparkStreamingConsumer { 

    public static void main(String[] args) { 
     // TODO Auto-generated method stub 
     SparkConf conf = new SparkConf() 
       .setAppName("kafka-sandbox") 
       .setMaster("local[*]"); 
     JavaSparkContext sc = new JavaSparkContext(conf); 
     JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(20000)); 

     // TODO: processing pipeline 
     Map<String, String> kafkaParams = new HashMap(); 
     kafkaParams.put("metadata.broker.list", "localhost:9092"); 
     kafkaParams.put("zookeeper.connect","localhost:2181"); 
     Set<String> topics = Collections.singleton("test6"); 
     System.out.println("Size of topic--->>>>"+topics.size()); 
     JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, 
       String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); 

     directKafkaStream.foreachRDD(rdd -> { 
      System.out.println("Message Received "+rdd.values().take(1)); 
      System.out.println("--- New RDD with " + rdd.partitions().size() 
       + " partitions and " + rdd.count() + " records"); 
      rdd.foreach(record -> System.out.println(record._2)); 
      }); 

     directKafkaStream.foreachRDD(rdd ->{    
      rdd.foreachPartition(item ->{ 
       while (item.hasNext()) {  


     ssc.start(); 
     ssc.awaitTermination(); 
    } 

} 
親切

私はカサンドラのテーブルにこのデータを保存する方法を知っているが、小片コードは多くの助けになります:)

ありがとうございました。

+0

ドキュメントはかなりうまくカバーしています。https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md – maasg

答えて

0

使用Datastaxの火花カサンドラコネクタspark-cassandra、あなたはここにspark-cassandra connector jar

ここからjarファイルを取得することができ、サンプルコードが

ある
import com.datastax.driver.core.Session; 
import com.datastax.spark.connector.cql.CassandraConnector; 
import org.apache.spark.SparkConf; 

SparkConf conf = new SparkConf(); 
conf.setAppName(APP_NAME); 
conf.setMaster(NODE); 
conf.set("spark.cassandra.connection.host", CASSANDRA_HOST); 
conf.set("spark.cassandra.auth.username", CASSANDRA_USER); 
conf.set("spark.cassandra.auth.password", CASSANDRA_PASS); 

final JavaSparkContext jpc = new JavaSparkContext(conf); 
final CassandraConnector connector=CassandraConnector.apply(jpc.getConf()); 
final Session session=connector.openSession(); 

あなたは鍵空間

にデータを保存するため、この セッションオブジェクトを使用することができます
+0

これは私が使ったことです。カッサンドラのテーブルに簡単に挿入できます。私はスパークストリーミングから来たデータをカッサンドラに直接取り込むことができるものを探しています。 –

関連する問題