1

を読んでいない私は、チュートリアルからサンプルカフカストリームアプリケーションを作成しました:カフカストリームは、入力されたトピック

public static void main(String[] args) throws Exception { 
    Logger log = Logger.getLogger("Name"); 

    Properties props = new Properties(); 
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordprint"); 
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092"); 
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 

    final KStreamBuilder builder = new KStreamBuilder(); 
    builder.stream("onecon_postgres").print(); 

    final KafkaStreams streams = new KafkaStreams(builder, props); 
    final CountDownLatch latch = new CountDownLatch(1); 

    // attach shutdown handler to catch control-c 
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { 
     @Override 
     public void run() { 
      streams.close(); 
      latch.countDown(); 
     } 
    }); 

    try { 
     streams.start(); 
     log.info("After Start"); 
     latch.await(); 
    } catch (Throwable e) { 
     System.exit(1); 
    } 
    System.exit(0); 
    } 

残念ながら、このアプリケーションは、入力ストリームを読みません。私はPostgreSQLのJDBCソースコネクタを持っています。これは、1つのデータベースからデータを正しくストリーミングしています(このトピックのKafka Connect UIデータを参照してください)。

私が持っている問題は、私は、プロパティIPにBOOTSTRAP_SERVERS_CONFIGでIPを変更したにもかかわらず、であることは、私はなぜ知らないlocalhostのです。

[main] INFO org.apache.kafka.streams.StreamsConfig - StreamsConfig values: 
    application.id = streams-linesplit 
    application.server = 
    **bootstrap.servers = [localhost:9092]** 
    buffered.records.per.partition = 1000 
    cache.max.bytes.buffering = 10485760 
    client.id = 
    commit.interval.ms = 30000 
    connections.max.idle.ms = 540000 
    default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde 
    default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp 
    default.value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde 
    key.serde = null 
    metadata.max.age.ms = 300000 
    metric.reporters = [] 
    metrics.num.samples = 2 
    metrics.recording.level = INFO 
    metrics.sample.window.ms = 30000 
    num.standby.replicas = 0 
    num.stream.threads = 1 
    partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper 
    poll.ms = 100 
    processing.guarantee = at_least_once 
    receive.buffer.bytes = 32768 
    reconnect.backoff.max.ms = 1000 
    reconnect.backoff.ms = 50 
    replication.factor = 1 
    request.timeout.ms = 40000 
    retry.backoff.ms = 100 
    rocksdb.config.setter = null 
    security.protocol = PLAINTEXT 
    send.buffer.bytes = 131072 
    state.cleanup.delay.ms = 600000 
    state.dir = /tmp/kafka-streams 
    timestamp.extractor = null 
    value.serde = null 
    windowstore.changelog.additional.retention.ms = 86400000 
    zookeeper.connect = 

これを解決するために、トラフィックを転送するためにnetshを使用しましたが、ストリームを消費するこのアプリケーションは表示されません。

netsh interface portproxy add v4tov4 listenport=9092 listenaddress=127.0.0.1 connectport=9092 connectaddress=192.168.99.100 
+0

アプリケーションにデバッグしようとしましたか? 'StreamsThread'コンストラクタにブレークポイントを設定して、それに渡される設定を確認してください。また、あなたの設定を解析するときに 'StreamsConfig'が正しいことをしているかどうか確認してください。 –

答えて

1

残念ながら、このアプリケーションは、入力ストリームを読み取ることはありません。

あなたのKafka StreamsアプリケーションとあなたのKafkaブローカーとの間にネットワークの問題があるようです。 「カフカの流れがうまくいかない」ことは考えにくい。

また、それはあなたがより多くの情報を提供せずにあなたを助けるのは難しい:

  • あなたのカフカブローカーは何カフカバージョンを使用していますか?
  • アプリケーションで使用しているKafka(Streams)のバージョンは何ですか?
  • どのオペレーティングシステムですか?
  • ネットワーク設定とは何ですか?
    • アプリケーションを実行するマシンのIPアドレス。
    • 新しい接続をリッスンするKafkaブローカー(またはブローカー)は、どのIP +ポートにありますか?それは192.168.99.100:9092ですか?
  • アプリケーションのログには何が表示されますか? ERRORまたはWARNログメッセージが表示されますか?私はプロパティIPでBOOTSTRAP_SERVERS_CONFIGでIPを変更したにもかかわらず、

私が持っている問題は、私はなぜ知らないローカルホストされています。

私は理解していません - BOOTSTRAP_SERVERS_CONFIGlocalhost:9092に変更すると元の問題が解決されると思いますか?私はカフカのブローカーが実際に192.168.99.100:9092を聞いていることを理解しましたか?

これを解決するために、トラフィックを転送するためにnetshを使用しましたが、ストリームを消費するこのアプリケーションは表示されません。

ほとんどの場合、ポート転送は役に立ちません。 Kafkaブローカーの設定を更新しないと、ブローカーはデフォルトで「実際の」IP +ポートでのみ通信します。やや簡略化:192.168.99.100:9092で聞くように設定されたブローカは、Kafka Streamsアプリケーションを実行するマシン上のlocalhost:9092 -> 192.168.99.100:9092からポート転送を行っていても、あなたのKafka Streamsアプリケーションが送信するlocalhost:9092リクエストに応答しません。

これはちょっと役立ちます。

関連する問題