2017-08-04 7 views
1

私は現在、別の変換で使用するために、Sparkアプリケーションで1分あたりのリクエストを追跡しようとしています。私もRDDを使用していた試みるものを - 変換で変数を使用する場合しかしながら、以下のコードは、私はそれがparellizationで何かを持っていると仮定しスパークストリーミングで分ごとにリクエストを追跡

var rpm: Long = 0 


val requestsPerMinute = stream.countByWindow(Seconds(60), Seconds(5)).foreachRDD(rdd => {  
    rdd.foreach(x => { 
    rpm = x  
    })  
}) 

stream.foreachRDD { rdd => 
    rdd.foreach(x => { 
    //do something including parameter rpm 
    }) 

} 

0の元々設定した値よりも別の値になることはありませんまたはプレーン変数の代わりにブロードキャストを使用します。しかし、その結果コードは実行されませんでした。

SparkStreamingでこれを達成するには、どのような方法が推奨されますか?

EDIT: 着信オブジェクトにタイムスタンプが付いていれば、何かに役立ちます。

答えて

2

は、Sparkストリーミングでは、2つの実行レベルが存在するクラスタで実行ドライバに実行される動作の

  • スケジューリング、及び、RDDSに
  • 分散計算、

両方のレベルへのアクセスを提供する操作は、transformforeachRDDの2つです。これらの操作では、ドライバのコンテキストへのアクセス権があり、RDDへの参照があり、RDDへの参照があります。問題の特定の場合において

、ローカル変数を更新するため、操作が運転者のコンテキスト内で実行されなければならない:

オリジナル場合
val requestsPerMinute = stream.countByWindow(Seconds(60), Seconds(5)) 
requestsPerMinute.foreachRDD{ rdd => 
    val computedRPM = rdd.collect()(0) // this gets the data locally 
    rpm = computedRPM  
} 

rdd.foreach(x => { 
    rpm = x  
}) 

クロージャ。 f(x): Long => Unit = rpm = xがシリアライズされ、クラスタ上で実行されます。副作用はリモートコンテキストで適用され、操作の終了後に失われます。ドライバレベルでは、変数の値は決して変更されません。

また、リモート実行に副作用機能を使用することはお勧めできません。

関連する問題