1

SparkStreamingContextがドライバーに作成され、それをシリアル化することはできませんから、カサンドラにSparkStreamingからデータを保存することです。したがって、私たちは労働者のこのオブジェクトにアクセスすることはできません。スパークの分散された性質を利用するために、私たちは労働者のデータ/ストリームを処理していました。したがって、OOMを提供する可能性のあるすべてのデータをドライバに収集するのではなく、作業者から直接データを保存することが理にかなっています。は、それが実現可能なスパーク労働者

私は私はいくつかのビジネス・ロジックを適用する必要があり、最終的にはカサンドラにデータを格納する必要があり、このようなシナリオを持っています。だから、どうやってやるの?

DStream.foreachRDD{ x => x.foreachPartition { some logic and finally Store the data to cassandra } } 
+0

saveToCassandraは、仕事をする必要があります。しかし、データをローカルに保存するか、クラスタ内の別のマシンに保存するかは、データがどのノードに属しているかによって決まります。私も同様の問題に取り組んでいます。 – Sreekar

+0

@Sreekar私はScalaの2.10とSpark1.6に取り組んでいます。だから、どのようにそこに使用することはできませんforeachRDDの理由であることスパークコンテキスト内のカサンドラへの接続を作成することができるだろう。 – Naresh

+0

ストリーミングコンテキストオブジェクトでsparkContextを取得します。それは利用可能です。例えば:RDDのssc.sparkContext.cassandraTable()またはsaveToCassandraが動作するはずです。必要に応じて、sparkContextでホスト名とポートなどを設定できます。 – Sreekar

答えて

2

あなたは、カサンドラDBに直接JavaDStreamを保存することができますdatastaxの火花カサンドラ・コネクタ(https://github.com/datastax/spark-cassandra-connector)を使用することができます。

import static com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.javaFunctions; 

Map<String, String> columnNameMappings; 
JavaDStream<MyTableData> myDStream; 
javaFunctions(myDStream).writerBuilder("mykeyspace", "my_table", 
      CassandraJavaUtil.mapToRow(MyTableData.class, columnNameMappings)).saveToCassandra(); 
+0

スカラを教えてください。私も同じように取り組んでいます – Naresh

+0

このサンプルプログラムをscalaで確認してください。https://docs.datastax.com/en/latest-dse/datastax_enterprise/spark/sparkStreamingIntro.html – abaghel

1
val sparkConf = new SparkConf().setAppName("Test App") 
sparkConf.set("spark.cassandra.connection.host", "X.X.X.X") 
sparkConf.set("spark.cassandra.auth.username", "xxxxx")    
sparkConf.set("spark.cassandra.auth.password", "xxxxx") 

SparkConfあなたのカサンドラの接続構成を保持しているコンテキスト内のオブジェクトであるので、そこを設定します。

SparkContextStreamingContextの一部ですが、あなたは本当にそれを心配する必要はありません。あなたがSparkContextまたは現在SparkConfにアクセスする必要があるなら、あなたはこの

StreamingContext ssc = new StreamingContext(sparkConf, Seconds(1)) 
ssc.sparkContext => Your Spark Context object 
ssc.sparkContext.getConf => Your SparkConf object 

はカサンドラ部分に保存する方法に来るように、それはこのように行うことができることを行うことができます。

// messages is the InputDStream in this example 
messages.foreachRDD(x => { 
    // Write business logic 
    x.saveToCassandra("keyspace_name", "table_name") 
} 
+0

私のビジネスロジックはWorkerで動作します。だから、もう一度問題は、私はそれをcassandraに保存するためにドライバにデータを収集する必要があり、私のドライバは過負荷になるだろうということです。そう。この問題を回避するには – Naresh

+0

'SOMERdd.foreachPartition(Partition => { connection = DriverManager.getConnection(url、username、password)}' mysqlの場合と同様に、ワーカーとの接続を作成し、そこを破壊します。 – Naresh

+0

私はSpark 2.0.0への移行は、複数のストリーム(ワーカ)を作成する能力があり、すべてが同じストリーミングコンテキストを共有するので、良いと思います。また、接続の作成/終了はSparkConfオブジェクトに適切な資格情報を与えている限り、自動的にSparkによって処理されます。これはMySQLとは異なります。 – Sreekar

関連する問題