2017-03-21 6 views
2

私の目標は、IDがイベントから抽出された最後のN idsを保持するFlinkストリーミングプログラムを持つことです。シンクはCassandraストアであるため、いつでもIDのリストを取得できます。カッサンドラはあらゆるイベントの直後に更新されることが重要です。非アクティブキーのクリーンアップFlinkストリームの状態はどのようになりますか?

これはmapWithState(以下のコードを参照)で簡単に実装できます。しかし、このコードには重要な問題があります。状態にはuseridが入力されています。一部のユーザーはしばらくの間アクティブになっている可能性があります。私が心配しているのは、状態ストレージが永遠に成長するということです。

アクティブでないキーのクリーンアップ状態はどのようになりますか?

case class MyEvent(userId: Int, id: String) 

env 
    .addSource(new FlinkKafkaConsumer010[MyEvent]("vips", new MyJsonDeserializationSchema(), kafkaConsumerProperties)) 
    .keyBy(_.userId) 
    .mapWithState[(Int, Seq[String]), Seq[String]] { (in: MyEvent, currentIds: Option[Seq[String]]) => 
    val keepNIds = currentIds match { 
     case None => Seq(in.id) 
     case Some(cids) => (cids :+ in.id).takeRight(100) 
    } 
    ((in.userId, keepNIds), Some(keepNIds)) 
    } 
    .addSink { in: (Int, Seq[String]) => 
    CassandraSink.appDatabase.idsTable.store(...) 
    } 

答えて

3

成長状態は重要かつ正確な観察です。あなたのキースペースが動いているなら、これは間違いなく起こります。

Flink 1.2.0がこの問題を解決するProcessFunctionを追加しました。 ProcessFunctionFlatMapFunctionに似ていますが、タイマーサービスにアクセスできます。コールバック関数が期限切れになると、onTimer()コールバック関数を呼び出すタイマーを登録できます。コールバックを使用して状態をクリーンアップすることができます。

+1

ありがとう、私は 'RichProcessFunction'を見つけました。私は' ctx.timerService()。registerProcessingTimeTimer(expiryMillis) 'のようなコードでコールバックを登録することができます。私たちはこのデータを今のところ30日間、長時間にわたって保管したいと考えています。その間に何十億ものコールバックが追加されます。この膨大な数のコールバックをFlinkで処理できますか? –

+0

タイマーは状態を保持し、チェックポイントされています。私はそれが可能であるべきだと思いますが、私はいくつかの負荷の下でこれをテストすることをお勧めします。 –

+1

私は、タイマーの数をキーごとに1つに制限する方法を見つけました。 'processElement'は、最新の状態と現在の状態がまだ期限切れでなければ、新しい鍵と' onTimer'スケジュールのタイマーを開始します。これで数千万のアクティブタイマーしか手に入れません。私はそれがうまくいくと思います:) –

関連する問題