2017-04-13 6 views
0

私はKafka 0.10を使用しています。私のIoTデバイスがログを投稿するトピックlogsを持っています。私のメッセージの鍵はdevice-idなので、同じデバイスのすべてのログは同じパーティションにあります。 。Kafkaは逆順でメッセージを消費します

私はapi /devices/{id}/tail-logsを持っており、コールが行われた瞬間に1つのデバイスのN個の最後のログを表示する必要があります。

現在、デバイスのログを含むパーティションの最初(つまり最も古いログ)から、現在のタイムスタンプに達するまで、非常に不安定な方法で実装されています(ただし動作​​します)。私が得ることができれば

Aより効率的な方法は次のようになり、現在の最新のオフセット、その後

(私は私が探しているだけのデバイスのそれらを保つために、いくつかのメッセージをフィルタリングする必要があります)後方メッセージを消費それはカフカでできますか?どのようにこの問題を解決することができない場合は? (もっと重い解決策は、弾性検索にリンクされたカフカ接続をしてからelasticsearchをクエリすることですが、これに2つのコンポーネントを追加するにはちょっと残酷に見える...)

+0

私は100%私はあなたのアプリケーションを完全に理解しています。基本的にトピックまたはパーティションからのN個の最後のメッセージを意味するN個の最後のログを表示していますか? 「ログ」にトピックがありますか、複数のパーティションがある場合は「最後のNメッセージ」をどのように定義しますか?これはタイムスタンプにどのように関連していますか?あなたは「あなたが現在のタイムスタンプに達するまで、最初から」と言っています。後方を読むのはそれほど簡単ではありませんが、 '.seek()'と '.endOffsets()'と '.offsetForTimestamp()'があり、あなたは完全に答えを出すことができます。シナリオはより良い –

+0

確かに私の問題は明確ではなく、私は編集しました.1つのパーティションから最後のNメッセージを読んでいます。 –

+0

パーティションには複数の 'device-id'のログがあるかもしれません。十分である。 Kafkaストリームとインタラクティブクエリを使用してAPIをステートフルにするには、参考にしてください:http://docs.confluent.io/current/streams/developer-guide.html#interactive-queriesどのカフカバージョンを使用しますか? 0.10.0、0.10.1、または0.10.2? –

答えて

1

0.10.2で、私はKafka Streamsアプリケーションを書くことをお勧めします。アプリケーションはステートフルになり、ステートは最後のN個のレコード/ログを保持しますdevice-id - 新しいデータが入力トピックに書き込まれた場合、Kafka Streamsアプリケーションは状態を更新します(トピック全体を再読み込みする必要はありません)。 )。

さらに、アプリケーションはまた、あなたが(「API /devices/{id}/tail-logsInteractive Queries機能を使用して要求する機能。

をこのように、私は各要求のための答えを再計算しなければならないステートレスなアプリケーションを構築するが、ステートフルなアプリケーションを構築しないことすべての可能なリクエスト(つまり、すべてdevice-id)に対して結果を熱心に計算して(結果を自動的に更新する)、要求が入ったときにすでに計算された結果を返します。

+1

インタラクティブな質問へのリンクを見てくれてありがとう、あなたのコメント(私もこの答えに入れることができると思います)では、これは手袋のようにこのニーズに合っています –

関連する問題