2012-06-01 21 views
8
にストリーミングAPIからJSONをダンプ

背景: 私はpymongoを使用してMongoDBの中でストリーミングAPIからJSONオブジェクトを取得し、それらを格納するように設定pythonモジュール(一度に25の一括挿入を)持っています。比較のため、私は同じストリームAPIのcurlへのbashコマンドを持っており、pipeそれをmongoimportにしています。どちらの方法も、データを別々のコレクションに格納します。最適化:モンゴ

定期的にコレクションのcount()を監視して料金を確認します。

これまでのところ、curl | mongoimportアプローチの背後にある約1000個のJSONオブジェクトによって、pythonモジュールの遅延が確認されています。

問題: はどのように私はcurl | mongoimportと同期して〜であることを私のpythonモジュールを最適化することができますか?

私はTwitter APIではなくサードパーティのストリーミングサービスを使用しているため、tweetstreamは使用できません。

誰かが私を助けてくれますか?

Pythonモジュール:読書のための


class StreamReader: 
    def __init__(self): 
     try: 
      self.buff = "" 
      self.tweet = "" 
      self.chunk_count = 0 
      self.tweet_list = [] 
      self.string_buffer = cStringIO.StringIO() 
      self.mongo = pymongo.Connection(DB_HOST) 
      self.db = self.mongo[DB_NAME] 
      self.raw_tweets = self.db["raw_tweets_gnip"] 
      self.conn = pycurl.Curl() 
      self.conn.setopt(pycurl.ENCODING, 'gzip') 
      self.conn.setopt(pycurl.URL, STREAM_URL) 
      self.conn.setopt(pycurl.USERPWD, AUTH) 
      self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data) 
      self.conn.perform() 
     except Exception as ex: 
      print "error ocurred : %s" % str(ex) 

    def handle_data(self, data): 
     try: 
      self.string_buffer = cStringIO.StringIO(data) 
      for line in self.string_buffer: 
       try: 
        self.tweet = json.loads(line) 
       except Exception as json_ex: 
        print "JSON Exception occurred: %s" % str(json_ex) 
        continue 

       if self.tweet: 
        try: 
         self.tweet_list.append(self.tweet) 
         self.chunk_count += 1 
         if self.chunk_count % 1000 == 0 
          self.raw_tweets.insert(self.tweet_list) 
          self.chunk_count = 0 
          self.tweet_list = [] 

        except Exception as insert_ex: 
         print "Error inserting tweet: %s" % str(insert_ex) 
         continue 
     except Exception as ex: 
      print "Exception occurred: %s" % str(ex) 
      print repr(self.buff) 

    def __del__(self): 
     self.string_buffer.close() 

感謝。

+0

挿入するドキュメントに「_id」フィールドがありますか? –

+0

@AsyaKamskyはい、そうです。 –

+0

どのバージョンのmongoとどのバージョンのpymongoを使用していますか? –

答えて

1

はStringIOをライブラリーを処分したされています。WRITEFUNCTIONコールバックhandle_dataは、この場合、すべての行で呼び出され、JSONを直接ロードします。ただし、データに2つのオブジェクトが含まれていることがあります。申し訳ありませんが、curlコマンドに私たちの資格情報が含まれているので、私が使用するコマンドを投稿できません。しかし、私が言ったように、これはストリーミングAPIに適用される一般的な問題です。


def handle_data(self, buf): 
    try: 
     self.tweet = json.loads(buf) 
    except Exception as json_ex: 
     self.data_list = buf.split('\r\n') 
     for data in self.data_list: 
      self.tweet_list.append(json.loads(data))  
3

元々コードにバグがありました。

   if self.chunk_count % 50 == 0 
        self.raw_tweets.insert(self.tweet_list) 
        self.chunk_count = 0 

あなたはchunk_countをリセットしますが、tweet_listはリセットしません。したがって、2回目には100個のアイテムを挿入しようとします(50個の新しいアイテムと50個のアイテムは既にDBに送られていました)。これを修正しましたが、引き続きパフォーマンスの違いが見られます。

全体のバッチサイズの物が赤いニシンであることが判明しました。私はjsonの大きなファイルを使って試してみましたが、mongoimport経由でPythonを読み込んで読み込んでいましたが、Pythonは常に高速でした(安全モードでも - 以下を参照)。

コードを詳しく見てみると、実際にストリーミングAPIがデータをチャンクで渡しているという問題があることがわかりました。それらのチャンクを取り出してデータベースに入れるだけです(mongoimportがやっていることです)。あなたのPythonがストリームを分割してリストに追加し、定期的にMongoにバッチを送信する余分な作業は、おそらく私が見るものとあなたが見るものの違いです。

は(あなたの引数にしてhandle_dataために、このスニペットを試してみてください)注意すべき

def handle_data(self, data): 
    try: 
     string_buffer = StringIO(data) 
     tweets = json.load(string_buffer) 
    except Exception as ex: 
     print "Exception occurred: %s" % str(ex) 
    try: 
     self.raw_tweets.insert(tweets) 
    except Exception as ex: 
     print "Exception occurred: %s" % str(ex) 

一つのことは、あなたのpython inserts are not running in "safe mode"がということです - あなたはあなたのinsert文に引数safe=Trueを追加することによって、これを変更する必要があります。この場合、失敗した挿入には例外が発生し、try/catchはエラーを表示して問題を公開します。

それはどちらかのパフォーマンスに多くの費用がかかりません - 私は現在、テストを実行しているし、約5分後に、2つのコレクションのサイズが14120 14113.

+0

btw、あなたのコードを試しました - 修正では、Pythonはmongoimportと比べてデータを約2倍速く挿入しています。デフォルトでは「安全な」挿入がオフになっているからです。安全な書き込みを有効にすることで(safe = Trueを挿入する)Pythonの挿入はmongoimport時間の約75%でした。 –

+0

それらを指摘してくれてありがとう!私は必要な変更を加えました(上記のコードも更新):self.chunk_count = 0の後に "self.tweet_list = []"を追加し、バッチサイズを1000に増やしました。 curl mongoimport comboは5718にあります(4000:5662)。どんな洞察? –

+0

+1素晴らしいコメント! –