producer

    2

    1答えて

    を通過した私はカフカ0.10.1.1を使用して、テスト生産を書くが、私はproducer.sendonCompletionコールバックでエラーが出ますよメッセージ期限切れデュオを送信します設定: 経過testtopic-2による30004のMSの1つのレコードを期限切れ:timeout.ms=30000, linger.ms=5, batch.size=1000 は、次のエラーメッセージが表示され

    0

    1答えて

    カフカについていくつか質問があります。もし誰かが私に助けてくれたら、私はとても感謝しています。 は、事前にありがとう:) Q1を)私はパーティションはカフカブローカー間で分割されていることを知っています。しかし、分割は何に基づいていますか?たとえば、3つのブローカと6つのパーティションがある場合、各ブローカに2つのパーティションがあることを確認する方法はありますか?この分割は現在カフカでどのように

    0

    1答えて

    これまでに投稿した質問にあるいくつかの提案に従ってみましたが、完全な解決策を得ることができません。次のコードを使用する: @Produces @Dependent @RestClientResourceConnector public <T> RestClientProxy<T> getStatusResource(InjectionPoint injectionPoint) throws

    4

    1答えて

    2ノードのカフカクラスタ(EC2インスタンス)があり、各ノードは別々のブローカーとして使用されています。リーダーインスタンスで次のコマンドを使用してプロデューサを実行すると、 kafka-console-producer.sh --broker-list localhost:9092 --topic test 次のエラーが発生します。 kaka-topics.shでトピックをリスト test

    0

    2答えて

    私はBlockingQueueでConsumer-Producerの問題を実装しようとしていました。何らかの目的でこれを行うために、私はファイル検索ツールを書くことにしました。 私は、検索メカニズムが再帰的に動作していると判断し、すべての新しいディレクトリに検索の速度を上げるための新しいスレッドプールが追加される予定です。 私の問題は、最終的にスレッドを検索する(消費者)を停止するメカニズムを実装

    1

    1答えて

    私はEC2にKafkaソフトウェアをインストールしました。私の問題は、AWS外からのブローカーへの接続です。それはすべて私の中から働く。 私は、ブローカーとkafka-console-producerとconsumerの両方の仕事を(同じサーバーから)開始できます。私はポート2181と9092を遠隔地に開いていて、どこからプロデューサーを使用したいのですか。だから私の開発(ローカル)マシンから。も

    0

    1答えて

    Cloud Foundryでは、非ssl url( "kafkaURL:9092")へのメッセージを生成できます。しかし、それはssl url( "kafkaURL:9093")では機能しません。 カフカサーバーバージョン0.10.0.1とクライアントバージョン0.10.0.0。ここで がプロパティある私が使用: props.put(org.apache.kafka.clients.produce

    0

    1答えて

    なぜkafka 0.10コンソールプロデューサーがkafka 0.9にメッセージを送信できないのですか? バージョン0.9(サーバー側)でkafkaコンソールコンシューマーを開始します。 私はカフカコンソールプロデューサーをバージョン0.10(クライアント側)から開始しています。 次に、私はプロデューサーの例外を受けました(下記)。 新しいプロデューサが古いコンシューマにメッセージを送信できない可

    0

    1答えて

    Flinkストリーミングワークフローは、Kafkaにメッセージを公開します。 KafkaProducerの「再試行」メカニズムは、内部バッファーにメッセージが追加されるまで開始されません。 その前に例外がある場合、KafkaProducerはその例外をスローし、Flinkはそれを処理していないようです。この場合、データが失われます。 関連FLINKコード(FlinkKafkaProducerBas

    0

    1答えて

    メッセージをシリアル化する前に傍受しようとしていますが、すでにProducerInterceptorというインターフェイスがあり、レコードを変更するために使用できます。そのインターフェイスを実装し、データを変更するクラスを作成した後、新しいクラスを配置する必要がある場合は、いくつかのファイルを変更する必要がありますか?