apache-kafka-streams

    2

    1答えて

    私は4台のカフカクラスタのメトリックを監視しています。私は入力アプリケーションを使ってKafkaにメッセージを書いています。そして、Kafka Streamsアプリケーションはそれらのメッセージを処理し、それらをジオロケーション変数で区切られた新しいKafkaトピックに書き戻します。 不確定な時間(通常2〜3日)はクラスタが問題なく実行され、メトリックでは何も不審なものは報告されず、メトリックka

    -1

    1答えて

    --property print.key=trueこれを私のkafka-cosole-consumerコマンドで削除すると、コンソールにはnullが表示されません....しかし、それは私が使用している表示目的だけではありません。 ...カフカからの私の出力は、それがを受け入れることはできません.....私のストリームの原子炉は非常にだけ {"timestamp":"2017-10-04T10:43

    0

    1答えて

    私はカスタムプロセスを持っているので、プロセッサapiを使用してカフカストリームを作成しようとしています。私はプロセッサが異なる数のパーティションを持つ複数のトピックにリスニングしていたときに問題に直面しました。私は、グローバルな状態ストアを作成する必要があることに気付きました。 プロセッサを追加する方法と、複数のトピックをリッスンしながらグローバルステートストアを追加する方法を知る必要があります

    0

    1答えて

    Kafkaのドキュメントでは、コンシューマ設定のenable.auto.commitのデフォルト値はtrueです。しかし、私は間違っています。 私のKafkaストリームアプリケーションでは、これをtrueに変更して、props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), true)として自動

    0

    1答えて

    kafkaストリームコードで何かを試していて、データを分割した後に1msの間、遅延やthread.sleep()のようなものを追加したいと思っていました。私はそれをやっている? KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> textlines = builder.stream("INTOPIC");

    2

    2答えて

    私は、カフカとの並列化の基本的なアプローチがパーティション化を利用することを理解しています。しかし、私は、6つのパーティションしかない既存のインフラストラクチャを活用しなければならないという点で特別な状況があり、毎秒数百万と数百万のレコードを処理する必要があります。 各kstreamコンシューマに読み込みを行い、同時に単一のパーティションから負荷を均等に分配する方法で、さらに最適化する方法はありま

    0

    1答えて

    私は、Kafkaトピックから消費されたデータを集約するためにApache Kafkaストリーミングを使用しています。集約は、それ自体が消費され、結果がDBに格納された別のトピックにシリアル化されます。私が推測するようなかなり古典的な使用例。 集計コールの結果は、Kafkaの変更履歴「トピック」によってバックアップされたKTableを作成しています。 これは実際にはそれよりも複雑ですが、(平均を計算

    -1

    1答えて

    私は車の位置に関するデータを保存しているカフカのトピックを持っています。私はストリームを書く必要があり、それは私に各車の最後の4座標だけを表示する。どのように私はカフカストリームでこれを行うことができますか?