私はScalaバージョン2.10.5のCassandra 3.0とSpark 1.6を使用しています。私は基本的な例にCassandra.Soに動作し、可能な挿入データCassandraテーブルにデータを挿入するSpark DataFrameを使用する
scala> val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
を試してみたので、私はカサンドラにデータを挿入したい私は、スキーマ
val person = sc.textFile("hdfs://localhost:9000/user/hduser/person")
import org.apache.spark.sql._
val schema = StructType(Array(StructField("firstName",StringType,true),StructField("lastName",StringType,true),StructField("age",IntegerType,true)))
val rowRDD = person.map(_.split(",")).map(p => org.apache.spark.sql.Row(p(0),p(1),p(2).toInt))
val personSchemaRDD = sqlContext.applySchema(rowRDD, schema)
personSchemaRDD.saveToCassandra
を照合することによってカサンドラテーブルに挿入するワンcsvファイルを持っていました
私はSaveToCassndraを使用していますが、saveToCassandraを取得するのはpersonSchemaRDDの一部ではありません。 port.canいずれかの私にそれを行うための最善の方法を教えて:だから、別の方法で
df.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "words_copy", "keyspace" -> "test")).save()
を試みるが、IP上のカサンドラに接続することはできませんを取得する教えてくれました。私はファイルから定期的にデータをcassandraに保存する必要があります。
Row()の要素のいずれかをval rowRDD = input.map(_。split( "、"))に変換するにはどうすればよいですか?(p => Row(p(0)、getTimestamp私はYHY:MM:DD '' HH:mm:ss形式 – Anji
@Anjiにタイムスタンプを 'jodatime.DateTime'の' java.util.Date'にマッピングする方が良いでしょう。フォーマットの問題を回避する。 – maasg
com.databricks.spark.csvを使用しているときに "NA"を取るオプションがあります。原因:java.text.ParseException:解析できない番号: "NA" ' – Anji