3

私はkafkaサーバーからdStreamを作成し、そのストリームでいくつかの変換を実行しようとしています。私はストリームが空の場合にキャッチを含める(if(!rdd.partitions.isEmpty));ただし、カフカのトピックにイベントが公開されていない場合でも、elseステートメントには決して到達しません。sparkストリーミング `if(!rdd.partitions.isEmpty)`が機能しない

val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) 

stream.foreachRDD { rdd => 
    if(!rdd.partitions.isEmpty) { 

     val message = rdd.map((x$2) => x$2._2).collect().toList.map(parser) 

     val val = message(0) 

    } else println("empty stream...") 

    ssc.start() 
    ssc.awaitTermination() 

} 

KafkaUtils.createDirectStreamではなくcreateStreamを使用しているとき、私は、ストリームが空であるかどうかを確認するために使用する必要があり、代替の文がありますか?

答えて

3

根本的なパーティションは、実際の要素を持っているかどうかを確認するためのチェック追加利用RDD.isEmpty代わりのRDD.partitions.isEmpty

stream.foreachRDD { rdd => 
    if(!rdd.isEmpty) { 
    // Stuff 
    } 
} 

理由RDD.partitions.isEmptyが動作していないがRDD内部のパーティションが存在するということですが、そのパーティションそれ自体は空です。しかしArray[Partition]であるpartitionsの視点からは、それは空ではない。

関連する問題