2017-03-07 5 views
0

私は、mapWithStateを使用してRDDの状態を追跡するスパークストリーミングアプリケーションを実行しています。 アプリケーションが数分間細かい実行されますが、その後、私はmapWithStateRDDのタイムアウトを設定しているにもかかわらず、直線的に時間をかけてスパークアプリケーションが増加のメモリ使用量を観測spark workerのメモリ使用量が時間とともに増加するのはなぜですか?

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 373 

でクラッシュ。以下のコードスニペットとメモリ使用量を見てください -

val completedSess = sessionLines 
        .mapWithState(StateSpec.function(trackStateFunction _) 
        .numPartitions(80) 
        .timeout(Minutes(5))) 

enter image description here

各RDDの明示的なタイムアウトがある場合は、なぜ、メモリが時間の経過とともに直線的に増加する必要がありますか?

メモリを増やしてみましたが問題ありません。私は何が欠けていますか?

編集 - 参照

のコードデフtrackStateFunction(batchTime:時間、キー:文字列、値:オプションの[文字列]、状態:状態[(ブール、一覧[文字列]、ロング)]):オプション[ mapwithstateの情報によると(ブール、一覧[文字列])] = {

def updateSessions(newLine: String): Option[(Boolean, List[String])] = { 
    val currentTime = System.currentTimeMillis()/1000 

    if (state.exists()) { 
     val newLines = state.get()._2 :+ newLine 

     //check if end of Session reached. 
     // if yes, remove the state and return. Else update the state 
     if (isEndOfSessionReached(value.getOrElse(""), state.get()._4)) { 
     state.remove() 
     Some(true, newLines) 
     } 
     else { 
     val newState = (false, newLines, currentTime) 
     state.update(newState) 
     Some(state.get()._1, state.get()._2) 
     } 
    } 
    else { 
     val newState = (false, List(value.get), currentTime) 
     state.update(newState) 
     Some(state.get()._1, state.get()._2) 
    } 
    } 

    value match { 
    case Some(newLine) => updateSessions(newLine) 
    case _ if state.isTimingOut() => Some(true, state.get()._2) 
    case _ => { 
     println("Not matched to any expression") 
     None 
    } 
    } 
} 
+1

あなたはどの程度のトラフィックを持っていますか?どのくらいのRAM /ディスク?もっと情報が必要です。 –

+1

また、チェックポイントはどのくらいですか? –

+1

私は4人の労働者(8コア、32 GB RAM、それぞれ128 GB SSD)のクラスタを持っています。着信トラフィックはKinesisストリームからのもので、10-15 MB/sです。バッチ間隔は10秒です。チェックポイントの間隔は60秒です – cmbendre

答えて

1

: 州立仕様 RDDとして、初期状態 - あなたは、いくつかの店から初期状態をロードしてからストリーミングジョブを開始することができますその状態で

パーティションの数 - キー値の状態のdstreamはキーで分割されています。以前の状態のサイズの見積もりが良い場合は、それに応じて分割するパーティションの数を指定することができます。

パーティショナー - カスタムパーティショナーを提供することもできます。デフォルトのパーティショナーはハッシュ・パーティショナーです。キー空間をよく理解している場合は、デフォルトのハッシュ・パーティショナーより効率的な更新を行うことができるカスタム・パーティショナーを提供できます。

タイムアウト - これにより、特定の期間、値が更新されないキーが状態から削除されます。これは、古いキーで状態をクリーンアップするのに役立ちます。

したがってタイムアウトは、アップデートしていないキーを使用してしばらくしてからクリーニングを行うだけです。エグゼキュータには十分なメモリが割り当てられていないため、メモリは完全に実行され、最終的にブロックされます。これにより、MetaDataFetchFailed例外が発生します。メモリを増やすと、あなたはエグゼクティブを意味することを願っています。エグゼキュータのメモリを増やしても、ストリームはまだ続行されているため、おそらく動作しません。 MapWithStateでは、セッションラインに入力dstreamと同じレコード数が含まれます。だからこれを解決するには、あなたのdstreamを小さくすることです。ストリーミング文脈では、あなたが最も可能性が高い。この

ヴァルSSC =新しいStreamingContext(SC、秒(batchIntervalSeconds))を解決するバッチ間隔を設定することができます

もたまにはスナップショットとチェックポイントを作ることを忘れないでください。スナップショットを使用すると、以前の失われたストリームの情報を他の計算に使用できます。うまくいけば、これはより多くの情報のために助けられました:https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.htmlhttp://asyncified.io/2016/07/31/exploring-stateful-streaming-with-apache-spark/

関連する問題