私はcaを転送しようとしています。 10GBのjsonデータ(私の場合はつぶやき)をarangodbのコレクションに追加します。また、私はそれのためにJOBLIBを使用しようとしている:データをarangodbに転送しようとしたときにデータが混乱します
from ArangoConn import ArangoConn
import Userdata as U
import encodings
from joblib import Parallel,delayed
import json
from glob import glob
import time
def progress(total, prog, start, stri = ""):
if(prog == 0):
print("")
prog = 1;
perc = prog/total
diff = time.time() - start
rem = (diff/prog) * (total - prog)
bar = ""
for i in range(0,int(perc*20)):
bar = bar + "|"
for i in range(int(perc*20),20):
bar = bar + " "
print("\r"+"progress: " + "[" + bar + "] " + str(prog) + " of " +
str(total) + ": {0:.1f}% ".format(perc * 100) + "- " +
time.strftime("%H:%M:%S", time.gmtime(rem)) + " " + stri, end="")
def processfile(filepath):
file = open(filepath,encoding='utf-8')
s = file.read()
file.close()
data = json.loads(s)
Parallel(n_jobs=12, verbose=0, backend="threading"
(map(delayed(ArangoConn.createDocFromObject), data))
files = glob(U.path+'/*.json')
i = 1
j = len(files)
starttime = time.time()
for f in files:
progress(j,i,starttime,f)
i = i+1
processfile(f)
と
from pyArango.connection import Connection
import Userdata as U
import time
class ArangoConn:
def __init__(self,server,user,pw,db,collectionname):
self.server = server
self.user = user
self.pw = pw
self.db = db
self.collectionname = collectionname
self.connection = None
self.dbHandle = self.connect()
if not self.dbHandle.hasCollection(name=self.collectionname):
coll = self.dbHandle.createCollection(name=collectionname)
else:
coll = self.dbHandle.collections[collectionname]
self.collection = coll
def db_createDocFromObject(self, obj):
data = obj.__dict__()
doc = self.collection.createDocument()
for key,value in data.items():
doc[key] = value
doc._key= str(int(round(time.time() * 1000)))
doc.save()
def connect(self):
self.connection = Connection(arangoURL=self.server + ":8529",
username=self.user, password=self.pw)
if not self.connection.hasDatabase(self.db):
db = self.connection.createDatabase(name=self.db)
else:
db = self.connection.databases.get(self.db)
return db
def disconnect(self):
self.connection.disconnectSession()
def getAllData(self):
docs = []
for doc in self.collection.fetchAll():
docs.append(self.doc_to_result(doc))
return docs
def addData(self,obj):
self.db_createDocFromObject(obj)
def search(self,collection,search,prop):
docs = []
aql = """FOR q IN """+collection+""" FILTER q."""+prop+""" LIKE
"%"""+search+"""%" RETURN q"""
results = self.dbHandle.AQLQuery(aql, rawResults=False, batchSize=1)
for doc in results:
docs.append(self.doc_to_result(doc))
return docs
def doc_to_result(self,arangodoc):
modstore = arangodoc.getStore()
modstore["_key"] = arangodoc._key
return modstore
def db_createDocFromJson(self,json):
for d in json:
doc = self.collection.createDocument()
for key,value in d.items():
doc[key] = value
doc._key = str(int(round(time.time() * 1000)))
doc.save()
@staticmethod
def createDocFromObject(obj):
c = ArangoConn(U.url, U.user, U.pw, U.db, U.collection)
data = obj
doc = c.collection.createDocument()
for key, value in data.items():
doc[key] = value
doc._key = doc["id"]
doc.save()
c.connection.disconnectSession()
それはちょっとそのように動作します。私の問題は、データベースに格納されているデータが何らかの形で混ざり合っていることです。
あなたはスクリーンショット「ID」と「id_str」で見ることができるのと同じではありません - 彼らはする必要がありますよう。
私はので、私はつぶやきのIDへのキーを設定しているため、スレッドのいくつかの点で現在データベースのデフォルトのキーは を「衝突する」ことを考えた:私はこれまでの調査何
。
私はそれを複数のスレッドなしでしようとしました。スレッドは私がデータベースに送信するデータを見 問題
ではないようです...すべてが
しかし、すぐに、私は、DBとの通信として罰金のようですデータが混ざります。
私の教授は、pyarangoの何かがスレッドセーフではなく、データを混乱させると思っていましたが、スレッディングが問題にはならないとは思いません。
この現象が発生する可能性のあるアイデアはありません。 アイデアはありますか?
変換はarangodbで行われます。私はそれを送信する直前にデータをチェックしたので、私はそこにはっきりと確信しています。だから私ができるのは文字列として保存することだけです。 –
あなたはデータを保存してArangoDBからそれらを取り戻すために使用しているArangoDB APIを知っていますか? これは、変換/切り捨てがあるかどうか、そしてどこでどこにあるのかを調べるのに役立ちます。ありがとう! – stj
私は現在Arangodb 3.1.28を使用していますが、アップグレードが問題ない場合は、それが役に立ちます。私は[pyarango](http://bioinfo.iric.ca/~daoudat/pyArango/)を使っています。私はこれの魔法のバージョンを知りません:D –