私はデータ処理をストリーミングするのが初めてで、とても基本的なユースケースでなければならないと感じています。カフカストリームのストリームにクールダウン/レート制限を追加するにはどうすればよいですか?
私は(User, Alert)
タプルのストリームを持っているとしましょう。私が望むのは、ユーザーあたりのストリームのレートを制限することです。私。私は一度だけユーザーにアラートを出力するストリームが必要です。次の例では、60分と言えば、ユーザーの着信アラートは飲み込むだけです。 60分後に、着信アラートが再度トリガーされます。
がステートフル変換としてaggregate
を使用したが、集計状態が時間に依存することがある:私が試した何
。しかし、得られたKTable
が集計値には変化がなくても、(変更履歴など)KTableは、このように、「速度制限」ストリームの所望の効果を達成しない、要素をダウン送信し続ける
val fooStream: KStream[String, String] = builder.stream("foobar2")
fooStream
.groupBy((key, string) => string)
.aggregate(() => "constant",
(aggKey: String, value: String, aggregate: String) => aggregate,
stringSerde,
"name")
.print
aggregate
は下流の要素を公開することを決定したときにどのように/私に
[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null)
[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null)
それは一般的に不明だ:それは次のような出力を提供します。私の元々の理解は、それは即座だということでしたが、そうではないようです。私が見る限り、ウィンドウ処理はここで助けてはなりません。
Kafka Streams DSLは現在、SparkのupdateStateByKeyまたはAkkaのstatefulMapConcatと同様のステートフル変換のこの使用例を考慮していない可能性がありますか?下位のプロセッサ/トランスフォーマAPIを使用する必要がありますか?
EDIT:Possible duplicateは集計は下流の要素を公開することを決定したときになど、いくつかの混乱を引き起こしているか、レコードのキャッシングの問題に行くん
。しかし、主な問題は、DSLで「レート制限」を達成する方法に関するものでした。 @migunoが指摘しているように、下位のプロセッサAPIに戻る必要があります。私は非常に冗長であるアプローチを貼り付け下:
val logConfig = new util.HashMap[String, String]();
// override min.insync.replicas
logConfig.put("min.insyc.replicas", "1")
case class StateRecord(alert: Alert, time: Long)
val countStore = Stores.create("Limiter")
.withKeys(integerSerde)
.withValues(new JsonSerde[StateRecord])
.persistent()
.enableLogging(logConfig)
.build();
builder.addStateStore(countStore)
class RateLimiter extends Transformer[Integer, Alert, KeyValue[Integer, Alert]] {
var context: ProcessorContext = null;
var store: KeyValueStore[Integer, StateRecord] = null;
override def init(context: ProcessorContext) = {
this.context = context
this.store = context.getStateStore("Limiter").asInstanceOf[KeyValueStore[Integer, StateRecord]]
}
override def transform(key: Integer, value: Alert) = {
val current = System.currentTimeMillis()
val newRecord = StateRecord(value._1, value._2, current)
store.get(key) match {
case StateRecord(_, time) if time + 15.seconds.toMillis < current => {
store.put(key, newRecord)
(key, value)
}
case StateRecord(_, _) => null
case null => {
store.put(key, newRecord)
(key, value)
}
}
}
}
[カフカストリームの削減方法の出力が表示されないのはなぜですか?](http://stackoverflow.com/questions/40537084/why-dont-i-see-any-output-from- the-kafka-streams-reduce-method) –
現時点ではDSLでは不可能です。 PAPIを使用すると動作します。 –
PAPI = KafkaストリームのプロセッサAPI。 –