2017-04-07 16 views
3

私は、REST-APIをポーリングし、カフカのトピックにJSONレスポンスをシンクするために使用することができカフカコネクトAPIを使用してカスタムソースコネクタを実装しています。今私はSourceTaskのポーリング間隔を実現する方法、JDBC Connectorが提供する方法を知りたいと思います。どこかでスレッドをスリープ状態にする必要がありますが、どこでこれを行う必要がありますか?ポーリング間隔

答えて

2

。最初poll()の呼び出しでは、フィールドはまだそのために構成REST-APIはポーリングを取得、初期化されていません。この最初の呼び出しでは、longフィールドは現在のタイムスタンプで初期化されます。次のすべてのpoll()呼び出しでは、前の呼び出しのこのタイムスタンプがチェックされます。以前poll()からの経過ミリ秒の量が2回のポーリングの間、設定された間隔よりも小さい場合は、私は、設定済みのミリ秒が経過しているので、スリープ状態にスレッドを送ります。私が探しているまさにありません

0

はmax.poll.interval.msを使用してください。

リンクの下に参照してください:私は、タイムスタンプを格納するタイプlongのプライベートフィールドを追加することによって、私のSourceTask実装では、このユースケースを解決

https://kafka.apache.org/documentation/

+0

。消費者は、例えば、グループを去る前_max.poll.interval.ms_は、_poll_呼び出し間の最大ギャップを規定しますスレッドがブロックされている場合タイムアウトのように機能します。しかし、_poll_メソッドが呼び出される頻度を制御するソリューションを探しています。デフォルトでは、継続的に、私は_poll_をコネクタスレッドから呼び出すソリューションを探しています。 5分ごとに – Mabi