を印刷していない私は、この単純なカフカストリームカフカdirectstream DSTREAMマップは
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
// Each Kafka message is a flight
val flights = messages.map(_._2)
flights.foreachRDD(rdd => {
println("--- New RDD with " + rdd.partitions.length + " partitions and " + rdd.count() + " flight records");
rdd.map { flight => {
val flightRows = FlightParser.parse(flight)
println ("Parsed num rows: " + flightRows)
}
}
})
ssc.start()
ssc.awaitTermination()
カフカは、メッセージ、スパークRDDSとしてそれらを取得することができ、それをストリーミングを持っています。しかし、私のコードの2番目のprintlnは何も印刷しません。私はローカルの[2]モードで実行されたときにドライバのコンソールログを見て、糸クライアントモードで実行されたときに糸ログを確認しました。
私には何が欠けていますか?代わりにrdd.mapの
、うまくスパーク・ドライバ・コンソールに次のコードを印刷:
for(flight <- rdd.collect().toArray) {
val flightRows = FlightParser.parse(flight)
println ("Parsed num rows: " + flightRows)
}
しかし、私はこの飛行物体上の処理ではなく、エグゼキュータの、スパークドライバプロジェクトに起こるかもしれないことを恐れています。私が間違っている場合は私を修正してください。
おかげ
*ワーカー*エグゼキュータログを確認しましたか?おそらく、あなたの 'FlightParser'クラスが見つからないのでしょうか? –