私はKafkaからデータを読み取る必要があるスパークアプリケーションに取り組んでいます。私はプロデューサーがメッセージを投稿していたKafkaトピックを作成しました。コンソールのコンシューマから、メッセージが正常に送信されたことを確認しました。Spark Streamingアプリケーションでカフカのレコード数がカウントされないのはなぜですか?
私はKafkaからデータを読み込むための短いスパークアプリケーションを作成しましたが、データを取得していません。この問題を解決する方法について
def main(args: Array[String]): Unit = {
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("SparkConsumer").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
process(lines) // prints the number of records in Kafka topic
ssc.start()
ssc.awaitTermination()
}
private def process(lines: DStream[String]) {
val z = lines.count()
println("count of lines is "+z)
//edit
lines.foreachRDD(rdd => rdd.map(println)
// <-- Why does this **not** print?
)
任意の提案: 後、私が使用したコードはありますか?
****** EDITは****
私も実際のコードで
lines.foreachRDD(rdd => rdd.map(println)
を使用していたが、それも機能していません。私はポスト:Kafka spark directStream can not get dataで述べたように保存期間を設定しました。しかし、まだ問題が存在します。あなたのprocess
実際のコードでOutput演算子も使用しました。元の質問を編集して、まだ問題が存在することを示しました。今はどんな提案? – Alok
Hehe、出力なしの演算子を別のものに「交換」しています:)まず 'lines.count()'の代わりに 'lines.count()。print'を使用してください。私はコンソールに10レコードをプリントアウトすることを確信しています。 RDDの場合、 'rdd.foreach(println)'(変換である 'rdd.map(println) 'ではなく)を使用してください。楽しむ! :) –