2017-10-26 11 views
0

私はカフカで自分の手を汚し始めました。私はthisに行った。それは、カフカストリームDSLのデータ/トピック管理のみを示しています。誰もkafkaストリームのProcessor APIのための同じ種類のデータ管理のためのリンクを共有できますか?私は特別にプロセッサAPIのユーザーと内部のトピック管理に興味があります。カフカストリームプロセッサーAPIを使用したユーザートピック管理

TopologyBuilder builder = new TopologyBuilder(); 

// add the source processor node that takes Kafka topic "source-topic" as input 
builder.addSource("Source", "source-topic") 

ストリームプロセッサが同じデータを消費する前に、このソーストピックに入力データを入力する場所はどこですか?

要するに、プロデューサのトピックへの書き込みのように、ストリームを使用してkafka「ソース」トピックに書き込むことはできますか?またはストリームはトピックの並列消費のためだけですか? 私は、「KafkaのStreams APIはKafkaのプロデューサーと消費者のクライアントの上に構築されている」と考えるべきです。

+0

を経由して、あなたは正確に達成したいものを説明することができ、取り込むことができますか? – subzero

+0

@Subhransu:私は入力の各行をユーザー定義のトピックの単一のメッセージとして配置し、次にストリームを使用してシンクまでビジネスロジックを処理したいと考えています。私はソーストピックに入れるデータを持っていますが、それをプログラム的に行う方法はありません。 – sumit

+0

私はここで私の答えを見つけたと思う[このリンク](https://stackoverflow.com/questions/38106863/kafka-streams-how-to-write-to-a-topic) – sumit

答えて

0

はい、KafkaProducerを使用して、KStreamにフィードするソーストピックの入力を生成する必要があります。

しかし、中間のトピックが

  • KafkaStreams#to
  • KafkaStreams#through
0

jax(Java Excel API)を使用して、Excelファイルからカフカトピックに書き込むプロデューサを作成できます。 次に、そのトピックを消費し、別のトピックに生成するkafkaストリームアプリケーションを作成します。 また、context.getTopic()を使用して、プロセッサが受信しているトピックを取得できます。 次に、複数のif文を設定して、process()関数内でそのトピックのプロセスロジックを呼び出します。

+0

あなたはカフカストリームを使って話題に書き込めないと言っていますか?私はカフカプロデューサーを明示的に書く必要がありますか? – sumit

+0

私は、あなたがExcelファイルからカフカのトピックに書きたい場合は、そのためにプロデューサーを使うべきだと言います。 – subzero

+0

私は、あなたがそれを行うためにプロデューサーを使うべきであることを、あなたがエクセルファイルからカフカのトピックに書きたいと思っていると言います。また、Kafkaストリームアプリケーションを作成したい場合は、プロデューサによって生成されたトピックから消費し、さらに分析するために処理済みのレコードを出力する必要があります。これは、単一のKafkaストリームアプリケーションを使用して、または複数のスレッドを開始する複数のyを使用して消費することができます。 – subzero

関連する問題