apache-kafka-streams

    2

    1答えて

    KSQLを使用して、特定のカラムのエントリをGROUP BYでカウントするのではなく、アプリケーションをストリームするすべてのエントリに対して集計を取得できますか? 私はこのような何かを探しています:KSQLで | Count all | Count id1 | count id2 | | ---245----|----150----|----95-----| 以上のこのような: [some

    1

    1答えて

    ライブアプリケーションのtimeseriesデータを処理します。古いデータには意味がありません。私はちょうどストリームアプリケーションが開始し、以前にコミットされたオフセットからではなく受け取ったデータを処理したいと思う。再起動後にカフカストリームアプリで古いレコードを無視する正しい方法は何ですか? kafkaのコンシューマAPIでは、通常、最新のレコードにスキップするためにseekToEnd()

    1

    1答えて

    私はカフカのトピックを読んで、それに基づいていくつかの処理を行い、結果を別のトピックに保存しようとしています。 私のコードは以下のようになります。 builder .stream(settings.Streams.inputTopic) .mapValues[Seq[Product]]((e: EventRecord) ⇒ fx(e)) // Something ne

    1

    1答えて

    私はKafka Streamsでかなり新しいです。今私はこのシステムの基本原則を理解しようとしています。 これは、今、どのように動作するか私には明らかではないが、次の記事https://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/ You just use the library in

    0

    1答えて

    Kafkaストリームのドキュメントごとに理解しています。 最大可能な並列タスクは、クラスタ内のすべてのトピックのトピックの最大パーティション数に等しいです。 私はカフカクラスターで約60のトピックを持っています。各トピックには単一のパーティションしかありません。 私のカフカクラスターのためにカフカストリームとのスケーラビリティ/並列性を達成することは可能ですか? 私を助けてください。

    0

    1答えて

    スパークストリーミングとKafkaストリームのスループットを比較しています。私の結果では、Kafka StreamsはSpark Streamingよりスループットが高いと述べています。これは正しいです?それは別の方法ではありませんか? おかげ

    0

    1答えて

    私は10MiBまでのメッセージを持つKafka Streamsアプリケーションを持っています。私は、状態ストアでこれらのメッセージを永続化したいが、カフカストリームは、内部のchangelogトピックに生成するために失敗します。 2017-11-17 08:36:19,792 ERROR RecordCollectorImpl - task [4_5] Error sending record t

    0

    1答えて

    データ "A"のストリームにアクセスし、テーブル "B"に対していくつかの条件をチェックし、最後にテーブル "B"の状態を更新したいと思います。 (A,B) ====> check conditions in B ====> finally update B どのように私はカフカストリームで同じロジックを実装することができますか?ドキュメンテーションから私の観察は、データ "A"をストリーミン

    0

    1答えて

    コンフルエントなカフカで多くのアーティファクトを読んだ後、通常のチャットシステムを実装しようとします。しかし、私はいくつかの構造設計を行う際にいくつかの問題に出会った。 私のデータのデータベースとしてmysqlを使用する場合、ユーザテーブルのuser_id、メッセージテーブルのmessage_idなどの意味のあるすべてのメッセージにidを与えることができます。モデルテーブルにIDを与えた後、クライ