2016-10-20 11 views
2

REST APIを公開している怠惰な消費者に代わってメッセージを消費するという要件があります。したがって、私はKafkaのトピックからメッセージを取り出し、公開されたAPIでHTTP POST操作を行うシンクコネクタを用意する予定です。カフカ接続調整

考慮すべき重要な要因の1つにスロットリングがあります。 Sink TasksがAPIのティアSLAを満たすように調整するには、どのようなメカニズムを提案しますか。 Kafkaにはクライアントのクォータ機能がありますが、クライアントのクォータを動的に調整できるAPIリクエスト/分または秒を追跡する最適なメカニズムは何ですか?

答えて

5

REST APIのレート制限を実装する最良の方法は、必要に応じてブロックすることによって、あなたのコネクタコードにあると思いますSinkTask.put()SinkTaskのレベルのレート制限が十分であるのか、それともグローバルにする必要があるのか​​を考えたいと思うかもしれません。

あなたが検討していたKafkaクォータを使用する利点は、分散されたアスペクトが処理されることですが、現在は転送されたバイト数でしか構成できないと考えられます。

+0

そうです。 Kafkaクライアントのクォータは転送されたバイト数で構成されますが、REST APIスロットルはnoです。のリクエスト私の場合は、クラウド内の複数の作業者で複数のシンクタスクを実行していました。あなたがグローバルと言うとき、あなたはnoのカウントを維持することを暗示しています。キャッシュのようなリクエスト(redis)のリクエスト?カフカ接続経由でこれを実装するエレガントな方法はありますか? – bhalochele

+1

はい、レート制限APIリクエストに必要な共有状態を維持するために、RedisまたはMemcachedのようなものを使用します。 あなたが過度に控えめなレート制限をしても大丈夫ならば、代わりに 'put()'メソッドで取得した 'SinkTask'ごとにGuava' RateLimiter'を初期化することができます。あなたが 'RateLimiter'に与える許可の数は'(overall_limit/num_tasks) '(' SinkConnector.taskConfigs() 'からタスク設定を生成するときにタスクレベルの制限を伝えることができます)。 – shikhar

+0

Guava RateLimiterを使用する場合、複数のラックを横断して複数のワーカーにまたがって実行されているシンクタスク全体で、同じRateLimiterインスタンスが必要になると思います。つまり、タスク1とタスク2は、acquire()メソッドが呼び出されるRateLimiterの1つのインスタンスにアクセスできる必要があります。 Btw、私はあなたがキャッシュメカニズムの代わりにGuava RateLimiterを提案していると思いますか? – bhalochele

関連する問題