背景: 私はpymongoを使用してMongoDBの中でストリーミングAPIからJSONオブジェクトを取得し、それらを格納するように設定python
モジュール(一度に25の一括挿入を)持っています。比較のため、私は同じストリームAPIのcurl
へのbashコマンドを持っており、pipe
それをmongoimport
にしています。どちらの方法も、データを別々のコレクションに格納します。最適化:モンゴ
定期的にコレクションのcount()
を監視して料金を確認します。
これまでのところ、curl | mongoimport
アプローチの背後にある約1000個のJSONオブジェクトによって、python
モジュールの遅延が確認されています。
問題: はどのように私はcurl | mongoimport
と同期して〜であることを私のpython
モジュールを最適化することができますか?
私はTwitter APIではなくサードパーティのストリーミングサービスを使用しているため、tweetstream
は使用できません。
誰かが私を助けてくれますか?
Python
モジュール:読書のための
class StreamReader:
def __init__(self):
try:
self.buff = ""
self.tweet = ""
self.chunk_count = 0
self.tweet_list = []
self.string_buffer = cStringIO.StringIO()
self.mongo = pymongo.Connection(DB_HOST)
self.db = self.mongo[DB_NAME]
self.raw_tweets = self.db["raw_tweets_gnip"]
self.conn = pycurl.Curl()
self.conn.setopt(pycurl.ENCODING, 'gzip')
self.conn.setopt(pycurl.URL, STREAM_URL)
self.conn.setopt(pycurl.USERPWD, AUTH)
self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data)
self.conn.perform()
except Exception as ex:
print "error ocurred : %s" % str(ex)
def handle_data(self, data):
try:
self.string_buffer = cStringIO.StringIO(data)
for line in self.string_buffer:
try:
self.tweet = json.loads(line)
except Exception as json_ex:
print "JSON Exception occurred: %s" % str(json_ex)
continue
if self.tweet:
try:
self.tweet_list.append(self.tweet)
self.chunk_count += 1
if self.chunk_count % 1000 == 0
self.raw_tweets.insert(self.tweet_list)
self.chunk_count = 0
self.tweet_list = []
except Exception as insert_ex:
print "Error inserting tweet: %s" % str(insert_ex)
continue
except Exception as ex:
print "Exception occurred: %s" % str(ex)
print repr(self.buff)
def __del__(self):
self.string_buffer.close()
感謝。
挿入するドキュメントに「_id」フィールドがありますか? –
@AsyaKamskyはい、そうです。 –
どのバージョンのmongoとどのバージョンのpymongoを使用していますか? –