私は、mapWithStateを使用してRDDの状態を追跡するスパークストリーミングアプリケーションを実行しています。 アプリケーションが数分間細かい実行されますが、その後、私はmapWithStateRDDのタイムアウトを設定しているにもかかわらず、直線的に時間をかけてスパークアプリケーションが増加のメモリ使用量を観測spark workerのメモリ使用量が時間とともに増加するのはなぜですか?
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 373
でクラッシュ。以下のコードスニペットとメモリ使用量を見てください -
val completedSess = sessionLines
.mapWithState(StateSpec.function(trackStateFunction _)
.numPartitions(80)
.timeout(Minutes(5)))
各RDDの明示的なタイムアウトがある場合は、なぜ、メモリが時間の経過とともに直線的に増加する必要がありますか?
メモリを増やしてみましたが問題ありません。私は何が欠けていますか?
編集 - 参照
のコードデフtrackStateFunction(batchTime:時間、キー:文字列、値:オプションの[文字列]、状態:状態[(ブール、一覧[文字列]、ロング)]):オプション[ mapwithstateの情報によると(ブール、一覧[文字列])] = {
def updateSessions(newLine: String): Option[(Boolean, List[String])] = {
val currentTime = System.currentTimeMillis()/1000
if (state.exists()) {
val newLines = state.get()._2 :+ newLine
//check if end of Session reached.
// if yes, remove the state and return. Else update the state
if (isEndOfSessionReached(value.getOrElse(""), state.get()._4)) {
state.remove()
Some(true, newLines)
}
else {
val newState = (false, newLines, currentTime)
state.update(newState)
Some(state.get()._1, state.get()._2)
}
}
else {
val newState = (false, List(value.get), currentTime)
state.update(newState)
Some(state.get()._1, state.get()._2)
}
}
value match {
case Some(newLine) => updateSessions(newLine)
case _ if state.isTimingOut() => Some(true, state.get()._2)
case _ => {
println("Not matched to any expression")
None
}
}
}
あなたはどの程度のトラフィックを持っていますか?どのくらいのRAM /ディスク?もっと情報が必要です。 –
また、チェックポイントはどのくらいですか? –
私は4人の労働者(8コア、32 GB RAM、それぞれ128 GB SSD)のクラスタを持っています。着信トラフィックはKinesisストリームからのもので、10-15 MB/sです。バッチ間隔は10秒です。チェックポイントの間隔は60秒です – cmbendre