2017-09-01 7 views
1

kafkaコンシューマの出力をExcelファイルに定期的にダンプする必要があります。私は次のコードを使用します:kafka(kafka-python)をtxtファイルにダンプする

from kafka import KafkaConsumer 
from kafka import KafkaProducer 
import json,time 
from xlutils.copy import copy  
from xlrd import open_workbook 
import pandas 

consumer = KafkaConsumer(bootstrap_servers='localhost:9092') 
KafkaConsumer() 
consumer.subscribe("test") 

rowx=0 
colx=0 

for msg in consumer: 
     book_ro = open_workbook("twitter.xls") 
     book = copy(book_ro) # creates a writeable copy 
     sheet1 = book.get_sheet(0) # get a first sheet 
     sheet1.write(rowx,colx, msg[6]) 
     book.save("twitter.xls") 

私の問題は、コードが効率的でないということです。各メッセージに対して、私はExcelファイルを開いて書いて保存する必要があります。一度Excelを開いて書いてから閉じる(メッセージのバッチとforループではない)方法はありますか? tnx

+0

なぜファイルを閉じますか? –

答えて

0

はい、各メッセージのオープン、書き込み、保存、クローズは非効率的です。バッチで行うことができます。しかしそれでも消費するループでそれを行う必要があります。

msg_buffer = [] 
buffer_size = 100 
for msg in consumer: 
     msg_buffer.append(msg[6]) 
     if len(msg_buffer) >= buffer_size: 
      book_ro = open_workbook("twitter.xls") 
      book = copy(book_ro) # creates a writeable copy 
      for _msg in msg_buffer: 
       sheet1 = book.get_sheet(0) # get a first sheet 
       sheet1.write(rowx,colx, _msg) 
      book.save("twitter.xls") 
      msg_buffer = [] 

あなたは、それがnobatchよりも100倍速いと考えることができます。コメントを

UPDATE:

はい、通常、私たちは永遠にこのループにとどまるだろう、それは内部的に、新しいメッセージをフェッチハートビートを送信し、オフセットをコミットするためにポーリングを使用しています。そしてあなたの目的がこのトピックのメッセージを消費してメッセージを保存している場合、それは長い実行ループでなければなりません。

これはkafka-pythonデザインなので、メッセージを消費する場合やconsumer.poll()を使用する場合はこのように使用してください。

は、消費者がイテレータオブジェクトであるので、あなたが、for msg in consumer:を使用しなかった理由については、そのクラスが、それは根本的なレコードを取得するためにフェッチャを使用して、__iter____next__実装しています。参照可能な実装の詳細https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/group.py

+0

ノートをありがとう。私たちは永遠に "for msg in consume loop"にとどまっているのだろうか?この場合、あなたのコードはOKです。または任意のコールバック関数があります(クライアントが本当にメッセージを受け取ったとき、sthを実行する関数を呼び出します)。コンシューマオブジェクトのタイプは何ですか?それはリストか何ですか? – user2867237

+0

答えはudpatedです。 – GuangshengZuo

関連する問題