2017-09-29 16 views
0

次のコードは以下のとおりです。 Scalaバージョン:2.11。 スパークバージョン:2.0.2.6 カサンドラバージョン:cqlsh 5.0.1 |カサンドラ3.11.0.1855 | DSE 5.1.3 | CQL仕様3.4.4 |ネイティブプロトコルv4SparkスカラCassandra CSVをcassandraに挿入

私はCSVから読み込み、Cassandra Tableに書き込もうとしています。私はScalaとSparkを初めて使用しています。

Exception in thread "main" java.lang.IllegalArgumentException: Multiple constructors with the same number of parameters not allowed. 

カサンドラ表

cqlsh:TDATA> DESC

表tdata.mapをCREATE( SNOのint型のPRIMARYをマップ私が間違っているのどこ

import org.apache.spark.sql.SparkSession 
import org.apache.log4j.{Level, Logger} 
import com.datastax 

import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 
import com.datastax.spark.connector._ 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.{Row, SparkSession} 
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType} 
import org.apache.spark.sql._ 
import com.datastax.spark.connector.UDTValue 
import com.datastax.spark.connector.mapper.DefaultColumnMapper 


object dataframeset { 

    def main(args: Array[String]): Unit = { 

    // Cassandra Part 

    val conf = new SparkConf().setAppName("Sample1").setMaster("local[*]") 
    val sc = new SparkContext(conf) 
    sc.setLogLevel("ERROR") 
    val rdd1 = sc.cassandraTable("tdata", "map") 

    rdd1.collect().foreach(println) 

    // Scala Read CSV Part 
    Logger.getLogger("org").setLevel(Level.ERROR) 
    Logger.getLogger("akka").setLevel(Level.ERROR) 
    val spark1 = org.apache.spark.sql.SparkSession 
     .builder() 
     .master("local") 
     .appName("Spark SQL basic example") 
     .getOrCreate() 

    val df = spark1.read.format("csv") 
     .option("header","true") 
     .option("inferschema", "true") 
     .load("/Users/tom/Desktop/del2.csv") 
    import spark1.implicits._ 
     df.printSchema() 
     val dfprev = df.select(col = "Year","Measure").filter("Category = 'Prevention'") 

//  dfprev.collect().foreach(println) 
     val a = dfprev.select("YEAR") 
     val b = dfprev.select("Measure") 

     val collection = sc.parallelize(Seq(a,b)) 
    collection.saveToCassandra("tdata", "map", SomeColumns("sno", "name")) 

    spark1.stop() 

    } 

} 

エラー私を修正してくださいKEY、 名前テキスト;

私は何かが欠けていることを知っています。特に、データフレーム全体をワンショットでCassandraに書き込もうとしています。どちらをやるべきかわからない。

おかげカサンドラへ トム

+0

なぜドンのようなsomethinを使用してカサンドラに接続するために、スパークconfの中で有効になっている場合は、カサンドラのホスト、ユーザ名とパスワードを定義する必要があります'あなたはcqlshのコピーコマンドを使用していますか? https://docs.datastax.com/en/cql/3.1/cql/cql_reference/copy_r.html –

+0

CSV全体が必要ないため、フィルタと変換を適用する前に適用する必要があります。 –

答えて

1

あなたが直接(スパーク2.xでデータセット[行])のデータフレームを書き込むことができます。

認証が

val conf = new SparkConf(true) 
    .set("spark.cassandra.connection.host", "CASSANDRA_HOST") 
    .set("spark.cassandra.auth.username", "CASSANDRA_USERNAME")    
    .set("spark.cassandra.auth.password", "CASSANDRA_PASSWORD") 

OR

val spark1 = org.apache.spark.sql.SparkSession 
     .builder() 
     .master("local") 
     .config("spark.cassandra.connection.host", "CASSANDRA_HOST") 
     .config("spark.cassandra.auth.username", "CASSANDRA_USERNAME")    
     .config("spark.cassandra.auth.password", "CASSANDRA_PASSWORD") 
     .appName("Spark SQL basic example") 
     .getOrCreate() 

val dfprev = df.filter("Category = 'Prevention'").select(col("Year").as("yearAdded"),col("Measure").as("Recording")) 

dfprev .write 
    .format("org.apache.spark.sql.cassandra") 
    .options(Map("table" -> "map", "keyspace" -> "tdata")) 
    .save() 

Dataframe in spark-cassandra-connector

+0

こんにちは、ありがとう。簡単な質問ですCSVの「Year」は「YearAdded」列に移動し、「Measure」は「Recording」列に移動する必要があるため、列名を使用してデータをマップするにはどうすればよいですか。 –

+0

sparkの '.alias'または' .as' APIを使用してカラム名を変更することができます。私は自分の答えを更新しました –

関連する問題