2017-02-01 5 views
1

私はデータ処理をストリーミングするのが初めてで、とても基本的なユースケースでなければならないと感じています。カフカストリームのストリームにクールダウン/レート制限を追加するにはどうすればよいですか?

私は(User, Alert)タプルのストリームを持っているとしましょう。私が望むのは、ユーザーあたりのストリームのレートを制限することです。私。私は一度だけユーザーにアラートを出力するストリームが必要です。次の例では、60分と言えば、ユーザーの着信アラートは飲み込むだけです。 60分後に、着信アラートが再度トリガーされます。

がステートフル変換としてaggregateを使用したが、集計状態が時間に依存することがある:私が試した何

。しかし、得られたKTableが集計値には変化がなくても、(変更履歴など)KTableは、このように、「速度制限」ストリームの所望の効果を達成しない、要素をダウン送信し続ける

val fooStream: KStream[String, String] = builder.stream("foobar2") 
fooStream 
    .groupBy((key, string) => string) 
    .aggregate(() => "constant", 
    (aggKey: String, value: String, aggregate: String) => aggregate, 
    stringSerde, 
    "name") 
    .print 

aggregateは下流の要素を公開することを決定したときにどのように/私に

[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null) 
[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null) 

それは一般的に不明だ:それは次のような出力を提供します。私の元々の理解は、それは即座だということでしたが、そうではないようです。私が見る限り、ウィンドウ処理はここで助けてはなりません。

Kafka Streams DSLは現在、SparkのupdateStateByKeyまたはAkkaのstatefulMapConcatと同様のステートフル変換のこの使用例を考慮していない可能性がありますか?下位のプロセッサ/トランスフォーマAPIを使用する必要がありますか?

EDIT:Possible duplicateは集計は下流の要素を公開することを決定したときになど、いくつかの混乱を引き起こしているか、レコードのキャッシングの問題に行くん

。しかし、主な問題は、DSLで「レート制限」を達成する方法に関するものでした。 @migunoが指摘しているように、下位のプロセッサAPIに戻る必要があります。私は非常に冗長であるアプローチを貼り付け下:

val logConfig = new util.HashMap[String, String](); 
    // override min.insync.replicas 
    logConfig.put("min.insyc.replicas", "1") 

    case class StateRecord(alert: Alert, time: Long) 

    val countStore = Stores.create("Limiter") 
    .withKeys(integerSerde) 
    .withValues(new JsonSerde[StateRecord]) 
    .persistent() 
    .enableLogging(logConfig) 
    .build(); 
    builder.addStateStore(countStore) 

    class RateLimiter extends Transformer[Integer, Alert, KeyValue[Integer, Alert]] { 
    var context: ProcessorContext = null; 
    var store: KeyValueStore[Integer, StateRecord] = null; 

    override def init(context: ProcessorContext) = { 
     this.context = context 
     this.store = context.getStateStore("Limiter").asInstanceOf[KeyValueStore[Integer, StateRecord]] 
    } 

    override def transform(key: Integer, value: Alert) = { 
     val current = System.currentTimeMillis() 
     val newRecord = StateRecord(value._1, value._2, current) 
     store.get(key) match { 
     case StateRecord(_, time) if time + 15.seconds.toMillis < current => { 
      store.put(key, newRecord) 
      (key, value) 
     } 
     case StateRecord(_, _) => null 
     case null => { 
      store.put(key, newRecord) 
      (key, value) 
     } 
     } 
    } 
    } 
+0

[カフカストリームの削減方法の出力が表示されないのはなぜですか?](http://stackoverflow.com/questions/40537084/why-dont-i-see-any-output-from- the-kafka-streams-reduce-method) –

+0

現時点ではDSLでは不可能です。 PAPIを使用すると動作します。 –

+0

PAPI = KafkaストリームのプロセッサAPI。 –

答えて

2

のは、私は(User, Alert)タプルのストリームを持っているとしましょう。私が望むのは、ユーザーあたりのストリームのレートを制限することです。私。私は一度だけユーザーにアラートを出力するストリームが必要です。次の例では、60分と言えば、ユーザーの着信アラートは飲み込むだけです。 60分後に、着信アラートが再度トリガーされます。

これは現在、カフカストリームのDSLを使用しているときは不可能です。その代わりに、下位レベルのProcessor APIを使用して、そのような動作を手動で実装することができます(また、そうする必要があります)。

FYI:このような機能(「トリガー」と呼ばれることも多い)をDSLに追加するかどうかについて、カフカコミュニティで議論してきました。これまでのところ、そのような機能性には当面決定していない。

aggregateがダウンストリームの要素を公開する方法/時期については、一般に不明です。私の元々の理解は、それは即座だということでしたが、そうではないようです。

はい、それはKafka 0.10.0.0の初期の動作でした。それ以来、どのバージョンを使用しているのかわからないため、レコードキャッシュを導入しました。レコードキャッシングを無効にすると、最初の動作に戻ってしまいますが、レコードキャッシングがレートリミットのための(間接的な)ノブを与えることは理解できます。おそらくキャッシュを有効にしたままにしておきたいと思うでしょう。

残念ながら、Apache Kafkaドキュメントはまだレコードキャッシュをカバーしていませんが、その間にhttp://docs.confluent.io/current/streams/developer-guide.html#memory-managementを読むことをお勧めします。

+0

ご協力いただきありがとうございます。私はこれが現在DSLでは可能ではないと思っています。このトピックの周りにコミュニティが持っていたディスカッションへのリンクがありますか?プロセッサーAPIは十分にいいですが、他のフレームワークのように抽象化できる共通のユースケースのようです。 私の回避策で質問を更新しました – nambrot

+0

ディスカッション:私は頭の上から覚えていません。おそらくkafka-devメーリングリストの議論の中にあります。 –

関連する問題