1

私はApache Kafkaを使用しています。私は、プロデューサがJavaでコード化され、ConsumerがScalaでコード化されたwarファイルを作成しました。プロデューサがHTMLページからデータを取得しています。私は、プロデューサによって公開されたデータのほとんどが消費者にあることがわかりますが、一部のデータが欠落しています。ここでApache Kafka JavaプロデューサScala Consumer missing streams

生産

ファイル今、私は消費者のusin上のメッセージをチェックしています1

package com.cts.rest; 

import java.util.Properties; 

import kafka.producer.ProducerConfig; 

public class Configuration { 

static ProducerConfig setKafkaProducerParameter() { 
    Properties properties = new Properties(); 
    properties.put("zk.connect", "localhost:2181"); 
    properties.put("metadata.broker.list", "localhost:9092"); 
    properties.put("serializer.class", "kafka.serializer.StringEncoder"); 
    properties.put("acks", 0); 
    ProducerConfig producerConfig = new ProducerConfig(properties); 
    return producerConfig; 
    } 

}

ファイル2

package com.cts.rest; 

import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 


public class RTTSKProducer { 

static void sendDataToProducer(String line){ 

    ProducerConfig producerConfig = configuration.setKafkaProducerParameter(); 
    Producer<String, String> producer = new Producer<String, String>(producerConfig);  

    String topic = "jsondata";  
    KeyedMessage<String, String> msg = new KeyedMessage<String, String>(topic, line); 
    System.out.print(msg); 
    producer.send(msg); 
    producer.close(); 
      } 
    } 

ための私のコードですg次のコマンド。

bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic jsondata --from-beginning 

プロデューサの設定がありません。

答えて

1

「acks」設定を増やしてより耐久性を高めることができます。最も重要なことは、以下のように、Kafkaに公開されていないメッセージをうまく処理するために、コールバック関数を使って 'send'メソッドを呼び出す必要があります:

producer.send(myRecord, 
      new Callback() { 
       public void onCompletion(RecordMetadata metadata, Exception e) { 
        if(e != null) 
         e.printStackTrace(); 
        System.out.println("The offset of the record we just sent is: " + metadata.offset()); 
       } 
      }); 
関連する問題