0

にすべてのレコードを追加しません:スパークカサンドラコネクタは、私は、バージョン#使用していますDB

kafkaStream.foreachRDD((rdd: RDD[String]) => { 
    if(rdd.count > 0) { 
    println(java.time.LocalDateTime.now + ". Consumed: " + rdd.count() + " messages."); 

    sqlContext.read.json(rdd) 
       .select("count_metadata.tran_id") 
       .write 
       .format("org.apache.spark.sql.cassandra") 
       .options(Map("table" -> "tmp", "keyspace" -> "kspace")) 
       .mode(SaveMode.Append) 
       .save(); 
    } else { 
     println(java.time.LocalDateTime.now + ". There are currently no messages on the topic that haven't been consumed."); 
    }  
}); 

RDD数が40K程度でしか火花コネクタ:com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3

を私はカフカストリームからRDDを持っています一貫性のある457のレコードをデータベースに取り込みます。

sqlContext.read.json(rdd).select("count_metadata.tran_id").count 

も40kレコードを出力します。

cqlsh:kspace> CREATE TABLE tmp(tran_id text PRIMARY KEY); 

tran_idは、メッセージごとに一意である:

は、ここに私のtable文です。

私には何が欠けていますか?なぜ40kレコードすべてがそのテーブルを作るのではないのですか?

ログにも例外が表示されません。

答えて

1

各tran_idは一意です。

私は嘘をついた:

println(df.distinct.count); 

プリント....私たちの上流のソースにそれを持って来るために

457 

時間を。

関連する問題