2016-12-20 8 views
5

私はScalaバージョン2.10.5のCassandra 3.0とSpark 1.6を使用しています。私は基本的な例にCassandra.Soに動作し、可能な挿入データCassandraテーブルにデータを挿入するSpark DataFrameを使用する

scala> val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40))) 
scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count")) 

を試してみたので、私はカサンドラにデータを挿入したい私は、スキーマ

val person = sc.textFile("hdfs://localhost:9000/user/hduser/person") 
import org.apache.spark.sql._ 
val schema = StructType(Array(StructField("firstName",StringType,true),StructField("lastName",StringType,true),StructField("age",IntegerType,true))) 
val rowRDD = person.map(_.split(",")).map(p => org.apache.spark.sql.Row(p(0),p(1),p(2).toInt)) 
val personSchemaRDD = sqlContext.applySchema(rowRDD, schema) 
personSchemaRDD.saveToCassandra 
を照合することによってカサンドラテーブルに挿入するワンcsvファイルを持っていました

私はSaveToCassndraを使用していますが、saveToCassandraを取得するのはpersonSchemaRDDの一部ではありません。 port.canいずれかの私にそれを行うための最善の方法を教えて:だから、別の方法で

df.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "words_copy", "keyspace" -> "test")).save() 

を試みるが、IP上のカサンドラに接続することはできませんを取得する教えてくれました。私はファイルから定期的にデータをcassandraに保存する必要があります。

答えて

4

sqlContext.applySchema(...)DataFrameを返し、DataFramesaveToCassandraメソッドを返しません。

あなたは可能性があり、それとの.write方法:

val personDF = sqlContext.applySchema(rowRDD, schema) 
personDF.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "words_copy", "keyspace" -> "test")).save() 

我々はsavetoCassandraメソッドを使用する場合は、最良の方法はケースクラスを使用して、スキーマを意識したRDDを持つことです。

case class Person(firstname:String, lastName:String, age:Int) 
val rowRDD = person.map(_.split(",")).map(p => Person(p(0),p(1),p(2).toInt) 
rowRDD.saveToCassandra(keyspace, table) 

データフレームwriteの方法が有効です。コンテキストが正しく構成されていることを確認してください。

+0

Row()の要素のいずれかをval rowRDD = input.map(_。split( "、"))に変換するにはどうすればよいですか?(p => Row(p(0)、getTimestamp私はYHY:MM:DD '' HH:mm:ss形式 – Anji

+0

@Anjiにタイムスタンプを 'jodatime.DateTime'の' java.util.Date'にマッピングする方が良いでしょう。フォーマットの問題を回避する。 – maasg

+0

com.databricks.spark.csvを使用しているときに "NA"を取るオプションがあります。原因:java.text.ParseException:解析できない番号: "NA" ' – Anji

関連する問題