私はスパークストリーミングを学んでいます。状態を更新したままにしておき、mapWithStateを使用して状態を更新することができました。また、コンテキスト内でチェックポイントを有効にしました。私は仕事を停止/再開する必要がある場合、私は状態を覚えていたかった。今すぐ再起動するたびに新しいカウントが開始されます。私は様々な現金、チェックポイントオプションを試して、多くの投稿を通して鮮明な画像を取得しなかった。Spark Streamingアプリケーションの再起動の間の状態を記憶する方法は?
環境: 私はSparkをローカルで開発中で、HDPサンドボックスとしても実行しています。 (私は両方の環境で試しました)。
Sparkジョブを終了して再起動することは可能ですか? (プログラミング変更なし)。
可能な場合はどうすればよいですか?任意のポインタや提案が役立ちます。 (私はチークポイント、個々のRDD、ローカルのMapwithStateRDD、HDP sanboxのキャッシュを試しました)。
私が試していない唯一のオプションは、MapWithStateRDDをディスクに保存してinitialRDDとして読み取ることです。とにかくこれが正しいオプションになるとは思わない。
私は答えなしで同様の質問を見つけました。 Spark Checkpoint doesn't remember state (Java HDFS)
ありがとうございます。
コード:
def getStreamingContext(streamingApp : (SparkContext, Duration) => StreamingContext, sc : SparkContext, batchDuration: Duration) = {
val creatingFunc =() => streamingApp(sc, batchDuration)
val ssc = sc.getCheckpointDir match {
case Some(checkpointDir) =>
println("Get or Create Context")
StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc, sc.hadoopConfiguration, createOnError = true)
case None =>
print("New Context")
StreamingContext.getActiveOrCreate(creatingFunc)
}
sc.getCheckpointDir.foreach(cp => ssc.checkpoint(cp))
println(ssc.getState())
ssc
}
スパークバージョン2.1.0
チェックポイントを使用していますが、有効にしたとしました。StreamingContextを初期化してチェックポイントを設定するコードを表示できますか? Sparkのバージョンは何ですか? –
あなたの素早い返信Jacekに感謝します。 – user2022329
btw:チェックポイント付きの完全なコードを持つコース教材からエクササイズを試みましたが、同じように動作しました。私は状態をHDFに保存し、再起動時に初期化する必要があるかもしれないと思います。 – user2022329