1

私はApache Kafkaを使用してETLデータパイプラインを実装しています。私はKafka Connectを抽出と読み込みに使用しました。カスタム変換にKafkaストリームを使用する

Connectはソースデータを読み込み、カフェのトピックの実際のデータをJSONの形式で使用します。

変換フェーズで私はカフカのトピックからJSONデータを読み込み、必要なカスタムビジネスロジックに基づいてSQLクエリーに変換し、カフカのトピックを出力する必要があります。

今のところ、トピックから読み込んで変換を行い、出力トピックに書き込むプロデューサコンシューマアプリケーションを作成しました。

カフカストリームAPIを使用して同じことを達成できますか?もしそうなら、いくつかのサンプルを提供してください。

+1

Robinが述べたように、Kafka Streams APIでは確かに可能です。ここでは例を見つけることができます:https://github.com/confluentinc/kafka-streams-examples –

答えて

2

Kafka StreamsまたはKSQLを確認してください。 KSQLはKafka Streamsの上で動作し、あなたが話している種類の集約を作成する非常に簡単な方法を提供します。ここで

SELECT PAGE_ID,COUNT(*) FROM PAGE_CLICKS WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY PAGE_ID 

KSQL

にデータのストリームの集計を行うための一例だでより多くのを参照してください:あなたは実際にはカフカの話題であるKSQLの出力を取り、ということをストリーミングすることができ https://www.confluent.io/blog/using-ksql-to-analyse-query-and-transform-data-in-kafka

カフカ接続Elasticsearch、Cassandraなど。

関連する問題