2017-05-26 13 views
1

私はスパークストリーミングを学んでいます。状態を更新したままにしておき、mapWithStateを使用して状態を更新することができました。また、コンテキスト内でチェックポイントを有効にしました。私は仕事を停止/再開する必要がある場合、私は状態を覚えていたかった。今すぐ再起動するたびに新しいカウントが開始されます。私は様々な現金、チェックポイントオプションを試して、多くの投稿を通して鮮明な画像を取得しなかった。Spark Streamingアプリケーションの再起動の間の状態を記憶する方法は?

環境: 私はSparkをローカルで開発中で、HDPサンドボックスとしても実行しています。 (私は両方の環境で試しました)。

  1. Sparkジョブを終了して再起動することは可能ですか? (プログラミング変更なし)。

  2. 可能な場合はどうすればよいですか?任意のポインタや提案が役立ちます。 (私はチークポイント、個々のRDD、ローカルのMapwithStateRDD、HDP sanboxのキャッシュを試しました)。

  3. 私が試していない唯一のオプションは、MapWithStateRDDをディスクに保存してinitialRDDとして読み取ることです。とにかくこれが正しいオプションになるとは思わない。

私は答えなしで同様の質問を見つけました。 Spark Checkpoint doesn't remember state (Java HDFS)

ありがとうございます。

コード:

def getStreamingContext(streamingApp : (SparkContext, Duration) => StreamingContext, sc : SparkContext, batchDuration: Duration) = { 
    val creatingFunc =() => streamingApp(sc, batchDuration) 
    val ssc = sc.getCheckpointDir match { 
     case Some(checkpointDir) => 
     println("Get or Create Context") 
     StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc, sc.hadoopConfiguration, createOnError = true) 
     case None => 
     print("New Context") 
     StreamingContext.getActiveOrCreate(creatingFunc) 
    } 
    sc.getCheckpointDir.foreach(cp => ssc.checkpoint(cp)) 
    println(ssc.getState()) 
    ssc 
    } 

スパークバージョン2.1.0

+0

チェックポイントを使用していますが、有効にしたとしました。StreamingContextを初期化してチェックポイントを設定するコードを表示できますか? Sparkのバージョンは何ですか? –

+0

あなたの素早い返信Jacekに感謝します。 – user2022329

+0

btw:チェックポイント付きの完全なコードを持つコース教材からエクササイズを試みましたが、同じように動作しました。私は状態をHDFに保存し、再起動時に初期化する必要があるかもしれないと思います。 – user2022329

答えて

1

私はそれが働いて得た...次のQ/Aに感謝します。 [リンク](Spark streaming not remembering previous state

Iチェックポイントディレクトリは、再起動時に状態を思い出し可能と一緒に時間を追加updateStateByKey

statefulActivity.checkpoint(分(1))

後、次の行が欠落していました。

関連する問題