2017-08-30 8 views
1

常に聞くDatabaseのデータをプッシュするKafka topicsのコンシューマを作成する必要があります。Javaを使用して単一のサイクルでKafkaから複数のレコードを読み取る方法

ここでの要件は次のとおりです。 - カフカから複数のレコードを1つのサイクルで読み取る場合は、1回の呼び出しで複数回ではなくdbにプッシュしようとします。

public static void kafkaConsumer(String topicName, String groupId, String autoOffsetReset, 
     String enableAutoCommit, String kafkaServers, String acks, String retries, String lingerMS, 
     String bufferMemory) throws Exception { 

    ObjectMapper mapper = new ObjectMapper(); 

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
      getKafkaParams(groupId, kafkaServers, autoOffsetReset, enableAutoCommit)); 

    consumer.subscribe(Arrays.asList(topicName)); 
    logger.info("subscibed to the topic {}", topicName); 
    cluster = Cluster.builder().addContactPoints(CASSANDRA_IPS.split(",")).build(); 
    session = cluster.connect(KEYSPACE); 

    try { 

     while (true) { 
      try { 
       ConsumerRecords<String, String> records = consumer.poll(1000); 
       for (ConsumerRecord<String, String> record : records) { 

        Model model= mapper.readValue(record.value(), Model.class); 

       try { 
         boolean flag = insertIntoDB(session, model); 
         if (flag) { 
          logger.info("************ Data Persisted Successfully ***************"); 
         } else { 
          logger.info("******* Data Persition Failed *************"); 
         } 
        } catch (Exception ex) { 
         logger.error("Exception while persisting data into DB", ex); 
        } 
       } 
      } catch (Exception ex) { 
       logger.error("Exception while reading data from kafka", ex); 
      } 
     } 
    } finally { 
     consumer.close(); 
    } 
} 
+1

だからあなたの本当の問題は、DBに複数のレコードを挿入する方法ですか? – GuangshengZuo

+0

私は1レコードを読んでいて、その1レコードをDBに挿入できますが、 'Kafka'から複数のレコードを読み込む方法と、それらを一度の呼び出しでDBに挿入する方法はありますか? – Sat

+0

kafkaからの読み方を示すコードを追加 – Natalia

答えて

1

Mysql INSERTサポートは、一度に多くの行を挿入します。このように:

INSERT INTO tbl_name (a,b,c) VALUES(1,2,3),(4,5,6),(7,8,9); 

だから、最初の配列にレコードを保存することができ、かつ、配列のサイズはBATCH_SIZEに等しいとき、あなたはあなたのinsertIntoDbメソッドに渡すことができます。そして配列をクリアして、ループを進めてください。

また、1つのポーリングからすべてのメッセージを配列に取り込み、それをinsertIntoDbに渡すこともできます。

しかし、メッセージ数が大きすぎると、Mysqlはパケットが大きすぎると不平を言います。その場合は、指定したBATCH_SIZEを使用する方が良いです。

また、1つのポーリングでメッセージ数を制限するために、消費者の "max.poll.records"設定を特定することもできます。カサンドラでこのよう

何か:

PreparedStatement ps = session.prepare("INSERT INTO messages (user_id,msg_id, title, body) VALUES (?, ?, ?, ?)"); 
BatchStatement batch = new BatchStatement(); 
batch.add(ps.bind(uid, mid1, title1, body1)); 
batch.add(ps.bind(uid, mid2, title2, body2)); 
batch.add(ps.bind(uid, mid3, title3, body3)); 
session.execute(batch); 
+0

私のDBはCassandraです。どのようにしてCassandraに複数のレコードを挿入することができますか? – Sat

+0

https://www.datastax.com/dev/blog/client-side増強イン・カッサンドラ-2-0 – GuangshengZuo

関連する問題