私は現在、別の変換で使用するために、Sparkアプリケーションで1分あたりのリクエストを追跡しようとしています。私もRDDを使用していた試みるものを - 変換で変数を使用する場合しかしながら、以下のコードは、私はそれがparellizationで何かを持っていると仮定しスパークストリーミングで分ごとにリクエストを追跡
var rpm: Long = 0
val requestsPerMinute = stream.countByWindow(Seconds(60), Seconds(5)).foreachRDD(rdd => {
rdd.foreach(x => {
rpm = x
})
})
stream.foreachRDD { rdd =>
rdd.foreach(x => {
//do something including parameter rpm
})
}
0の元々設定した値よりも別の値になることはありませんまたはプレーン変数の代わりにブロードキャストを使用します。しかし、その結果コードは実行されませんでした。
SparkStreamingでこれを達成するには、どのような方法が推奨されますか?
EDIT: 着信オブジェクトにタイムスタンプが付いていれば、何かに役立ちます。