私はkafkaサーバーからdStreamを作成し、そのストリームでいくつかの変換を実行しようとしています。私はストリームが空の場合にキャッチを含める(if(!rdd.partitions.isEmpty)
);ただし、カフカのトピックにイベントが公開されていない場合でも、else
ステートメントには決して到達しません。sparkストリーミング `if(!rdd.partitions.isEmpty)`が機能しない
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
stream.foreachRDD { rdd =>
if(!rdd.partitions.isEmpty) {
val message = rdd.map((x$2) => x$2._2).collect().toList.map(parser)
val val = message(0)
} else println("empty stream...")
ssc.start()
ssc.awaitTermination()
}
KafkaUtils.createDirectStream
ではなくcreateStream
を使用しているとき、私は、ストリームが空であるかどうかを確認するために使用する必要があり、代替の文がありますか?