0
にすべてのレコードを追加しません:スパークカサンドラコネクタは、私は、バージョン#使用していますDB
kafkaStream.foreachRDD((rdd: RDD[String]) => {
if(rdd.count > 0) {
println(java.time.LocalDateTime.now + ". Consumed: " + rdd.count() + " messages.");
sqlContext.read.json(rdd)
.select("count_metadata.tran_id")
.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "tmp", "keyspace" -> "kspace"))
.mode(SaveMode.Append)
.save();
} else {
println(java.time.LocalDateTime.now + ". There are currently no messages on the topic that haven't been consumed.");
}
});
RDD数が40K程度でしか火花コネクタ:com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3
を私はカフカストリームからRDDを持っています一貫性のある457のレコードをデータベースに取り込みます。
sqlContext.read.json(rdd).select("count_metadata.tran_id").count
も40kレコードを出力します。
cqlsh:kspace> CREATE TABLE tmp(tran_id text PRIMARY KEY);
tran_idは、メッセージごとに一意である:
は、ここに私のtable文です。
私には何が欠けていますか?なぜ40kレコードすべてがそのテーブルを作るのではないのですか?
ログにも例外が表示されません。