2017-08-22 7 views
3

スパークストラクチャードストリーミング(バージョン2.2.0)では、クエリがの出力モードとしてmapGroupsWithStateのクエリを使用している場合、Sparkがデータ構造java.util.ConcurrentHashMapを使用してメモリ内の状態データを保存しているようです。誰かが私に詳細に説明することはできますか?状態データが大きくなり、メモリが足りなくなったら何が起こるのですか?また、spark設定パラメータを使用して、メモリに状態データを格納するための制限を変更することは可能ですか?状態データが増加している場合、Spark Structured Streamingはインメモリ状態をどのように処理しますか?

+0

Sparkでメモリを設定するには、 - 'spark.driver.memory'と' spark.executor.memory' configsを使用できます。参考までに - https://spark.apache.org/docs/latest/configuration.html#available-properties – himanshuIIITian

答えて

1

誰かがexecutorがOOM例外でクラッシュします

もはや状態 データが成長し、十分なメモリがないときに何が起こることを詳細に私に説明できます。 mapGroupWithStateでは、状態の追加と削除を担当しているため、メモリを割り当てることができないデータでJVMを圧倒すると、プロセスがクラッシュします。

メモリにステートデータを格納するための制限を、spark configパラメータを使用して変更することはできますか?

メモリに格納するバイト数を制限することはできません。繰り返しますが、mapGroupsWithStateの場合は、タイムアウトの設定や状態の削除など、JVMをOOMにさせないように状態を管理する必要があります。 aggコンビネータのようにSparkがあなたのために状態を管理するステートフルアグリゲーションについて言えば、watermarkを使用して状態を制限することができます。watermarkは、タイムフレームが経過すると古いデータをメモリから削除します。

関連する問題