2016-10-11 18 views
1

私はスパークストリーミングで非常に奇妙な気がします。RDDの平均値を計算し、スパークストリーミングの平均値に基づいてこのRDDをフィルタリングします

私はタプル(String、Int)のDStreamを持っています。文字列がidで、整数が値であるとします。

マイクロバッチでは、フィールドIntの平均を計算し、この平均値に基づいて、同じmicrobatchをフィルタリングする必要があります(field2> averageなど)。だから、私はこのコードを書いた:

実際にこのコードは実行されていませんが、計算の最初の部分は大丈夫ですが、テストではありません。 このコードには汚れたものがいくつかありますが、ロジックが良いかどうかを知りたいだけです。

アドバイスありがとうございます。

答えて

0

試してみてください。

lineStreams.transform { rdd => { 
    val mean = rdd.values.map(_.toDouble).mean 
    rdd.filter(_._2.toDouble > mean) 
}} 
関連する問題