2017-08-30 24 views
2

私はプロデューサー&コンシューマーサイドでトランザクションのセマンティクスを持つ必要があるkafkaユースケースに取り組んでいます。私はkafkaトランザクションAPI 0.11を使用してトランザクションメッセージをカフカクラスターに公開することができますが、 ..プロパティファイルで、私は問題に直面してる側は...私はisolation.level=read_committedを設定している。しかし、私は、メッセージがisolation.level=read_uncommittedで消費されているが、これは望ましくない見ることができましたit..Iを消費することはできませんよKafkaトランザクションのプロデューサーとコンシューマー

プロデューサーコード

package com.org.kafkaPro; 

import java.io.File; 
import java.io.FileNotFoundException; 
import java.io.FileReader; 
import java.io.IOException; 
import java.net.URL; 
import java.text.ParseException; 
import java.text.SimpleDateFormat; 
import java.util.Date; 
import java.util.Properties; 

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.common.errors.OutOfOrderSequenceException; 
import org.apache.kafka.common.errors.ProducerFencedException; 

import kafka.common.AuthorizationException; 
import kafka.common.KafkaException; 

    public class ProducerWithTx 
    { 


     public static void main(String args[]) throws FileNotFoundException { 
      URL in = ProducerWithTx.class.getResource("producertx.properties"); 

      Properties props = new Properties(); 

      try { 
       props.load(new FileReader(new File(in.getFile()))); 
      } catch (IOException e1) { 
       // TODO Auto-generated catch block 
       e1.printStackTrace(); 
      } 


      Paymnt pay1= new Paymnt(); 
      pay1.setAccid(1); 
      pay1.setAccountstate("y"); 
      pay1.setAccountzipcode(111); 
      pay1.setBankid(12); 
      pay1.setCreditcardtype(15); 
      pay1.setCustomerid("2"); 
      SimpleDateFormat ft = new SimpleDateFormat ("yyyy-MM-dd"); 
      Date t = null; 
      try { 
       t = ft.parse("2017-11-10"); 
      } catch (ParseException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      pay1.setPeriodid(t); 

      String timeStamp = new SimpleDateFormat("yyyy.MM.dd:HH:mm:ss").format(new Date()); 
      props.put("transactional.id", "accid=" + pay1.getAccid() + " custid=" +pay1.getCustomerid()+ " timestmp=" +timeStamp); 
      KafkaProducer<String, Paymnt> producer = new KafkaProducer(props); 
      producer.initTransactions(); 
      try{ 
       producer.beginTransaction(); 



       //RecordMetadata metadata=producer.send((ProducerRecord<String, Paymnt>) new ProducerRecord<String, Paymnt>("test",pay1)).get(); 

       producer.send((ProducerRecord<String, Paymnt>) new ProducerRecord<String, Paymnt>("test",pay1)); 
       producer.commitTransaction(); 

       //System.out.println("written to" +metadata.partition()); 

      } 
      catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e){ 

       // We can't recover from these exceptions, so our only option is to close the producer and exit. 
       producer.close(); 
      } 
      catch(KafkaException e) { 
       // For all other exceptions, just abort the transaction and try again. 
       producer.abortTransaction(); 
      } 
      producer.close(); 
     } 

    } 

producertx.properties

yの
metadata.broker.list=localhost:9092 
bootstrap.servers=localhost:9092 
acks=all 
retries=1 
batch.size=16384 
linger.ms=1 
buffer.memory=33554432 
key.serializer=org.apache.kafka.common.serialization.StringSerializer 
value.serializer=com.org.kafkaPro.PaySerializer 
#transactional.id=1 
enable.idempotence=true 
num.partitions=3 

消費者

package com.org.kafkaPro; 

import java.io.File; 
import java.io.FileReader; 
import java.io.IOException; 
import java.net.URL; 
import java.util.ArrayList; 
import java.util.List; 
import java.util.Properties; 

public class Consumer { 

    private static List<ConsumerMultiThreaded> consumersGroup; 


    public static void main(String args[]) throws IOException { 


     URL in = ProducerWithTx.class.getResource("consumer.properties"); 

     Properties props = new Properties(); 

     try { 
      props.load(new FileReader(new File(in.getFile()))); 
     } catch (IOException e1) { 
      // TODO Auto-generated catch block 
      e1.printStackTrace(); 
     } 

     consumersGroup=new ArrayList<ConsumerMultiThreaded>(); 
     ConsumerMultiThreaded con1= new ConsumerMultiThreaded(props); 
     ConsumerMultiThreaded con2=new ConsumerMultiThreaded(props); 
     ConsumerMultiThreaded con3=new ConsumerMultiThreaded(props); 
     ConsumerMultiThreaded con4=new ConsumerMultiThreaded(props); 

     consumersGroup.add(con1); 
     consumersGroup.add(con2); 
     consumersGroup.add(con3); 
     consumersGroup.add(con4); 

     for (ConsumerMultiThreaded consumer : consumersGroup) { 

      Thread t=new Thread(consumer); 
      t.start(); 

     } 

     while(true){ 
      try { 
       Thread.sleep(100000); 
      } catch (InterruptedException ie) { 

      } 


     } 
    } 
} 

消費者のRunnable

public class ConsumerMultiThreaded implements Runnable { 

private final AtomicBoolean closed = new AtomicBoolean(false); 
    private final KafkaConsumer<String, Paymnt> consumer; 
    private final int minBatchSize =3; 
    private final List<ConsumerRecord<String, Paymnt>> buffer; 


    public ConsumerMultiThreaded(Properties props){ 
     this.consumer= new KafkaConsumer<String, Paymnt>(props); 
     buffer = new ArrayList(minBatchSize); 
    } 

    @Override 
    public void run() { 
     try { 
      consumer.subscribe(Arrays.asList("test")); 
      while (!closed.get()) { 
       ConsumerRecords<String,Paymnt> records = consumer.poll(10000); 

       for (ConsumerRecord<String, Paymnt> record : records) { 
        buffer.add(record); 
       } 

       /*for (ConsumerRecord<String, Paymnt> record : records) 
       { 
        System.out.println("record consumed by Thread " +Thread.currentThread().getId() +" value is " +record.value().toString()); 
       }*/ 
       if(buffer.size()>=minBatchSize){ 
        for (TopicPartition partition : records.partitions()) { 
         List<ConsumerRecord<String, Paymnt>> partitionRecords = records.records(partition); 
         for (ConsumerRecord<String, Paymnt> record : partitionRecords) { 
          System.out.println("record consumed by Thread " +Thread.currentThread().getId() +"from partition" +record.partition() +"offset" +record.offset() + "value: " + record.value().toString()); 
         } 
         long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); 
         consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); 
         buffer.clear(); 
        } 
       } 
      } 

     } catch (WakeupException e) { 
      // Ignore exception if closing 
      if (!closed.get()) throw e; 
     } 
     finally { 
      consumer.close(); 
     } 

    } 

    public void shutdown() { 
     closed.set(true); 
     consumer.wakeup(); 
    } 

} 

consumer.properties

bootstrap.servers=localhost:9092 
session.timeout.ms=30000 
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
value.deserializer=com.org.kafkaPro.PayDeserializer 
enable.auto.commit=false 
auto.offset.reset=earliest 
group.id=test 
isolation.level=read_committed 

ごhelp..Thankあなたに感謝

答えて

2

あなたが#transactional.id=1を使用している私たちのプロデューサーのプロパティ(これはあなたが問題に言及した通りです)は、#というシンボルが記載されています。これは問題を引き起こす可能性があります。

トピックと__transaction_stateトピックのログセグメントをダンプすることができない場合は、そこから問題のあるものを簡単にデバッグできます。

+0

私のコードを見て、カフカプロデューサを初期化すると実行時にトランザクションIDを入れています。それ以外の場合は、producer.beginTransaction()を呼び出すとトランザクションIDが見つからないと不平を言う可能性があります。 –

+1

あなたのカフカブローカーはどのOSにあるのですか?彼らはいくつかの問題があるよりも窓にある場合。 プロデューサのコンソールデバッグログを共有できますか。 ダンプログセグメントを共有します。 'kafka-run-class.bat kafka.tools.DumpLogSegments --files <.index、.log、および.timestamp>' dumpを実行する場合は、 '--transaction-log-decoder'のようなputフラグを使用します。トピックログ。本当の問題を教えてくれたら助かります。 –

+1

と消費者のデバッグログもあります。 –

関連する問題