0
データベースからjdbcを読み込んでSparkアプリケーションから消費するコネクタを作成しました。アプリケーションはデータベースのデータをよく読みますが、最初の10行しか読み込まれず、残りの部分は無視されるようです。どのようにして休息を取るべきなので、すべてのデータを計算できます。ここでKafka - 10行以上の読み方
は私の火花コードです:
val brokers = "http://127.0.0.1:9092"
val topics = List("postgres-accounts2")
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
//sparkConf.setMaster("spark://sda1:7077,sda2:7077")
sparkConf.setMaster("local[2]")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[Record]))
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
// Create direct kafka stream with brokers and topics
//val topicsSet = topics.split(",")
val kafkaParams = Map[String, Object](
"schema.registry.url" -> "http://127.0.0.1:8081",
"bootstrap.servers" -> "http://127.0.0.1:9092",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val messages = KafkaUtils.createDirectStream[String, Record](
ssc,
PreferConsistent,
Subscribe[String, Record](topics, kafkaParams)
)
val data = messages.map(record => {
println(record) // print only first 10
// compute here?
(record.key, record.value)
})
data.print()
// Start the computation
ssc.start()
ssc.awaitTermination()
ここで私のコードにcount()を入れるべきですか? count()はLongを返しませんが、DStream [Long]は返します。 –
あなたは正しいです。私はdata.count().print()でチェックし、10以上を表示しています。ありがとう:) –
@ J.Done問題はありません。 – Shaido