2017-11-14 5 views
0

データベースからjdbcを読み込んでSparkアプリケーションから消費するコネクタを作成しました。アプリケーションはデータベースのデータをよく読みますが、最初の10行しか読み込まれず、残りの部分は無視されるようです。どのようにして休息を取るべきなので、すべてのデータを計算できます。ここでKafka - 10行以上の読み方

は私の火花コードです:

val brokers = "http://127.0.0.1:9092" 
val topics = List("postgres-accounts2") 
val sparkConf = new SparkConf().setAppName("KafkaWordCount") 
//sparkConf.setMaster("spark://sda1:7077,sda2:7077") 
sparkConf.setMaster("local[2]") 
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
sparkConf.registerKryoClasses(Array(classOf[Record])) 

val ssc = new StreamingContext(sparkConf, Seconds(2)) 
ssc.checkpoint("checkpoint") 

// Create direct kafka stream with brokers and topics 
//val topicsSet = topics.split(",") 

val kafkaParams = Map[String, Object](
    "schema.registry.url" -> "http://127.0.0.1:8081", 
    "bootstrap.servers" -> "http://127.0.0.1:9092", 
    "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", 
    "value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer", 
    "group.id" -> "use_a_separate_group_id_for_each_stream", 
    "auto.offset.reset" -> "earliest", 
    "enable.auto.commit" -> (false: java.lang.Boolean) 
) 

val messages = KafkaUtils.createDirectStream[String, Record](
    ssc, 
    PreferConsistent, 
    Subscribe[String, Record](topics, kafkaParams) 
) 

val data = messages.map(record => { 
    println(record) // print only first 10 
    // compute here? 
    (record.key, record.value) 
}) 

data.print() 

// Start the computation 
ssc.start() 
ssc.awaitTermination() 

答えて

3

私は問題はそのスパークにあると信じて怠惰であり、唯一の実際に使用されているデータを読み込みます。

デフォルトでは、printはストリームの最初の10要素を表示します。 2つのコードに加えて、コードには他のアクションが含まれていないため、10行以上のデータを読み取る必要はありません。 countまたは別の操作を使用して、動作していることを確認してください。

+0

ここで私のコードにcount()を入れるべきですか? count()はLongを返しませんが、DStream [Long]は返します。 –

+0

あなたは正しいです。私はdata.count().print()でチェックし、10以上を表示しています。ありがとう:) –

+0

@ J.Done問題はありません。 – Shaido

関連する問題