1

Kafka Streamsアプリケーションの複数のインスタンスを実行すると、最初のインスタンスだけが正しくメッセージを受信して​​います。しかし、新しいインスタンスを開始すると、メッセージは受信されません。複数のインスタンスでKafkaストリーミングが動作しない

この問題を修正するための提案はありますか?ここで

は、私はあなたがこの問題を持っていると信じて、私のカフカ・ストリーミング・アプリ

package test.kafkastream; 

import java.util.Properties; 

import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.common.serialization.Serdes; 
import org.apache.kafka.streams.KafkaStreams; 
import org.apache.kafka.streams.StreamsConfig; 
import org.apache.kafka.streams.processor.TopologyBuilder; 

public class Main { 

    public static void main(String[] args) { 
     Properties props = new Properties(); 
     props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor"); 
     //props.put(ConsumerConfig.GROUP_ID_CONFIG, "streams-wordcount-processor"); 

     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.38:45983,192.168.2.112:45635,192.168.2.116:39571"); 
     //props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 

     props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
     props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
     //props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class); 


     // setting offset reset to earliest so that we can re-run the demo code 
     // with the same pre-loaded data 
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 

     TopologyBuilder builder = new TopologyBuilder(); 

     builder.addSource("Source", "topic6"); 

     builder.addProcessor("Process", new ProcessMessage(), "Source"); 

     KafkaStreams streams = new KafkaStreams(builder, props); 
     streams.start(); 
    } 

} 

であり、ここでは私のプロデューサー

package test.kafkamesos; 

import java.util.Date; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.concurrent.ExecutionException; 

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.common.serialization.ByteArraySerializer; 

public class Producer { 

    public static void main(String[] args) throws InterruptedException, ExecutionException { 
     Map<String, Object> producerConfig = new HashMap<String, Object>(); 
     producerConfig.put("bootstrap.servers", "192.168.2.38:45983,192.168.2.112:45635,192.168.2.116:39571"); 
     //producerConfig.put("bootstrap.servers", "localhost:9092"); 

     // optional: 
     producerConfig.put("metadata.fetch.timeout.ms", "3000"); 
     producerConfig.put("request.timeout.ms", "3000"); 
     // ... other options: 
     // http://kafka.apache.org/documentation.html#producerconfigs 
     ByteArraySerializer serializer = new ByteArraySerializer(); 
     KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<byte[], byte[]>(producerConfig, serializer, 
       serializer); 

     int i = 0; 
     while (true) { 
      String message = "{data:success,g:" + i + "}"; 
      ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>("topic6", message.getBytes()); 
      kafkaProducer.send(record).get(); 
      System.out.println("sending " + message); 
      Thread.sleep(1000); 
      i++; 
     } 
    } 
} 

と私のDockerfile

FROM openjdk:8-jre 
COPY ./target/*-with-dependencies.jar /jars/service-jar.jar 
CMD java -cp /jars/service-jar.jar test.kafkastream.Main 
+0

私はあなたがコードではなく、コード自体へのリンクを追加したので、それがあると仮定することができます。それにもかかわらず、それが理由なら、それは述べられるべきです。 –

+0

@AleksandarStojadinovicありがとうございます。私は今すぐコードを追加します.... –

答えて

3

カフカブローカー理由あなたが消費しているトピックのパーティションが1つだけ設定されています(topic6)。コンフルエントのブログから:アプリケーションが10個の パーティションを持つ単一のトピックから読み込む場合例えば

、あなたはあなたのアプリケーション (の10個のインスタンスまで実行することができますが、さらにインスタンスを実行するが、これらのことに注意してくださいアイドル状態になります)。 要約では、トピック・パーティションの数は、Streams APIアプリケーションの並列処理の上限であり、アプリケーションの実行インスタンス数は です。

出典:https://www.confluent.io/blog/elastic-scaling-in-kafka-streams/

+0

はい、これは問題です。ありがとうございました。 –

関連する問題