2017-11-16 8 views
1

私はcaを転送しようとしています。 10GBのjsonデータ(私の場合はつぶやき)をarangodbのコレクションに追加します。また、私はそれのためにJOBLIBを使用しようとしている:データをarangodbに転送しようとしたときにデータが混乱します

from ArangoConn import ArangoConn 
import Userdata as U 

import encodings 
from joblib import Parallel,delayed 
import json 
from glob import glob 
import time 


def progress(total, prog, start, stri = ""): 
    if(prog == 0): 
     print("") 
     prog = 1; 
    perc = prog/total 
    diff = time.time() - start 
    rem = (diff/prog) * (total - prog) 
    bar = "" 
    for i in range(0,int(perc*20)): 
     bar = bar + "|" 
    for i in range(int(perc*20),20): 
     bar = bar + " " 
    print("\r"+"progress: " + "[" + bar + "] " + str(prog) + " of " + 
    str(total) + ": {0:.1f}% ".format(perc * 100) + "- " + 
    time.strftime("%H:%M:%S", time.gmtime(rem)) + " " + stri, end="") 

def processfile(filepath): 
    file = open(filepath,encoding='utf-8') 
    s = file.read() 
    file.close() 
    data = json.loads(s) 
    Parallel(n_jobs=12, verbose=0, backend="threading" 
    (map(delayed(ArangoConn.createDocFromObject), data)) 

files = glob(U.path+'/*.json') 
i = 1 
j = len(files) 
starttime = time.time() 
for f in files: 
    progress(j,i,starttime,f) 
    i = i+1 
    processfile(f) 

from pyArango.connection import Connection 
import Userdata as U 
import time 


class ArangoConn: 
    def __init__(self,server,user,pw,db,collectionname): 
     self.server = server 
     self.user = user 
     self.pw = pw 
     self.db = db 
     self.collectionname = collectionname 
     self.connection = None 
     self.dbHandle = self.connect() 
     if not self.dbHandle.hasCollection(name=self.collectionname): 
      coll = self.dbHandle.createCollection(name=collectionname) 
     else: 
      coll = self.dbHandle.collections[collectionname] 
     self.collection = coll 

    def db_createDocFromObject(self, obj): 
     data = obj.__dict__() 
     doc = self.collection.createDocument() 
     for key,value in data.items(): 
      doc[key] = value 

     doc._key= str(int(round(time.time() * 1000))) 
     doc.save() 

    def connect(self): 
     self.connection = Connection(arangoURL=self.server + ":8529",   
     username=self.user, password=self.pw) 

     if not self.connection.hasDatabase(self.db): 
      db = self.connection.createDatabase(name=self.db) 
     else: 
      db = self.connection.databases.get(self.db) 
     return db 

    def disconnect(self): 
     self.connection.disconnectSession() 


    def getAllData(self): 

     docs = [] 
     for doc in self.collection.fetchAll(): 
      docs.append(self.doc_to_result(doc)) 
     return docs 


    def addData(self,obj): 
      self.db_createDocFromObject(obj) 

    def search(self,collection,search,prop): 
     docs = [] 
     aql = """FOR q IN """+collection+""" FILTER q."""+prop+""" LIKE 
      "%"""+search+"""%" RETURN q""" 
     results = self.dbHandle.AQLQuery(aql, rawResults=False, batchSize=1) 
     for doc in results: 
      docs.append(self.doc_to_result(doc)) 
     return docs 


    def doc_to_result(self,arangodoc): 
     modstore = arangodoc.getStore() 
     modstore["_key"] = arangodoc._key 
     return modstore 

    def db_createDocFromJson(self,json): 

     for d in json: 
      doc = self.collection.createDocument() 
      for key,value in d.items(): 
       doc[key] = value 
      doc._key = str(int(round(time.time() * 1000))) 
      doc.save() 



    @staticmethod 
    def createDocFromObject(obj): 
     c = ArangoConn(U.url, U.user, U.pw, U.db, U.collection) 
     data = obj 
     doc = c.collection.createDocument() 
     for key, value in data.items(): 
      doc[key] = value 
     doc._key = doc["id"] 
     doc.save() 
     c.connection.disconnectSession() 

それはちょっとそのように動作します。私の問題は、データベースに格納されているデータが何らかの形で混ざり合っていることです。

Screenshot

あなたはスクリーンショット「ID」と「id_str」で見ることができるのと同じではありません - 彼らはする必要がありますよう。

  • 私はので、私はつぶやきのIDへのキーを設定しているため、スレッドのいくつかの点で現在データベースのデフォルトのキーは を「衝突する」ことを考えた:私はこれまでの調査何

  • 私はそれを複数のスレッドなしでしようとしました。スレッドは私がデータベースに送信するデータを見 問題

  • ではないようです...すべてが

しかし、すぐに、私は、DBとの通信として罰金のようですデータが混ざります。

私の教授は、pyarangoの何かがスレッドセーフではなく、データを混乱させると思っていましたが、スレッディングが問題にはならないとは思いません。

この現象が発生する可能性のあるアイデアはありません。 アイデアはありますか?

答えて

2

スクリーンショットは、以下の値を示しています

id  : 892886691937214500 
id_str : 892886691937214465 

それはどこかに値が安全後者の値を表すことができない二重IEEE754、に変換される途中のように見えます。そのため、変換のために潜在的に精度の低下があります。

(JavaScriptが任意の数が0xFFFFFFFFを超える値を入力するIEEE754が2倍使用している)Node.jsの中に簡単な例が、これはおそらく問題の原因であることを示しています。変換が起こるんどこ

$ node 
> 892886691937214500 
892886691937214500 
> 892886691937214465 
892886691937214500 

そこで質問です。 Pythonクライアントプログラムが期待値をArangoDBに正しく送信しているかどうか、または変換済み/切り捨てられた値をすでに送信しているかどうかを確認できますか?

一般に、0x7fffffffffffffffを超える整数は、ArangoDBに格納されるときに切り捨てられるか、IEEE754倍に変換されます。これは文字列の中に数値を格納することで回避できますが、もちろん2つの数値文字列を比較すると2つの数値を比較する場合とは異なる結果になります("10" < "9"10 > 9)。

+0

変換はarangodbで行われます。私はそれを送信する直前にデータをチェックしたので、私はそこにはっきりと確信しています。だから私ができるのは文字列として保存することだけです。 –

+0

あなたはデータを保存してArangoDBからそれらを取り戻すために使用しているArangoDB APIを知っていますか? これは、変換/切り捨てがあるかどうか、そしてどこでどこにあるのかを調べるのに役立ちます。ありがとう! – stj

+0

私は現在Arangodb 3.1.28を使用していますが、アップグレードが問題ない場合は、それが役に立ちます。私は[pyarango](http://bioinfo.iric.ca/~daoudat/pyArango/)を使っています。私はこれの魔法のバージョンを知りません:D –

関連する問題