私の目標は、IDがイベントから抽出された最後のN idsを保持するFlinkストリーミングプログラムを持つことです。シンクはCassandraストアであるため、いつでもIDのリストを取得できます。カッサンドラはあらゆるイベントの直後に更新されることが重要です。非アクティブキーのクリーンアップFlinkストリームの状態はどのようになりますか?
これはmapWithState
(以下のコードを参照)で簡単に実装できます。しかし、このコードには重要な問題があります。状態にはuserid
が入力されています。一部のユーザーはしばらくの間アクティブになっている可能性があります。私が心配しているのは、状態ストレージが永遠に成長するということです。
アクティブでないキーのクリーンアップ状態はどのようになりますか?
case class MyEvent(userId: Int, id: String)
env
.addSource(new FlinkKafkaConsumer010[MyEvent]("vips", new MyJsonDeserializationSchema(), kafkaConsumerProperties))
.keyBy(_.userId)
.mapWithState[(Int, Seq[String]), Seq[String]] { (in: MyEvent, currentIds: Option[Seq[String]]) =>
val keepNIds = currentIds match {
case None => Seq(in.id)
case Some(cids) => (cids :+ in.id).takeRight(100)
}
((in.userId, keepNIds), Some(keepNIds))
}
.addSink { in: (Int, Seq[String]) =>
CassandraSink.appDatabase.idsTable.store(...)
}
ありがとう、私は 'RichProcessFunction'を見つけました。私は' ctx.timerService()。registerProcessingTimeTimer(expiryMillis) 'のようなコードでコールバックを登録することができます。私たちはこのデータを今のところ30日間、長時間にわたって保管したいと考えています。その間に何十億ものコールバックが追加されます。この膨大な数のコールバックをFlinkで処理できますか? –
タイマーは状態を保持し、チェックポイントされています。私はそれが可能であるべきだと思いますが、私はいくつかの負荷の下でこれをテストすることをお勧めします。 –
私は、タイマーの数をキーごとに1つに制限する方法を見つけました。 'processElement'は、最新の状態と現在の状態がまだ期限切れでなければ、新しい鍵と' onTimer'スケジュールのタイマーを開始します。これで数千万のアクティブタイマーしか手に入れません。私はそれがうまくいくと思います:) –