2016-09-15 15 views
0

私はスパークをデータ処理に使用しているプロジェクトに取り組んでいます。私のデータは現在処理されており、データをNeo4jにロードする必要があります。 Neo4jにロードした後、私はそれを使って結果を紹介します。PythonでSpark RDDをNeo4jにロード

Pythonでプログラミングを実装したかったのです。しかし、私はネット上のライブラリやサンプルを見つけることができませんでした。リンクや図書館などの例を参考にしてください。

私のRDDはPairedRDDです。すべてのタプルで、私は関係を作り出さなければなりません。単純化の目的のために
PairedRDD

Key Value 
Jack [a,b,c] 

、私は

Key value 
Jack a 
Jack b 
Jack c 

にRDDを変換それから私はウィリアムの回答に基づいて

Jack->a  
Jack->b 
Jack->c 

間の関係を作成する必要があり、私ができる午前リストを直接ロードする。しかし、このデータはサイファーエラーを投げています。

私はこのように試してみました:

def writeBatch(b): 
    print("writing batch of " + str(len(b))) 
    session = driver.session() 
    session.run('UNWIND {batch} AS elt MERGE (n:user1 {user: elt[0]})', {'batch': b}) 
    session.close() 

def write2neo(v): 
    batch_d.append(v) 
    for hobby in v[1]: 
     batch_d.append([v[0],hobby]) 

    global processed 
    processed += 1 
    if len(batch) >= 500 or processed >= max: 
     writeBatch(batch) 
     batch[:] = [] 


max = userhobbies.count() 
userhobbies.foreach(write2neo) 

Bはリストのリストです。 Unwinded eltは、キーと値の2つの要素elt [0]、elt [1]のリストです。

エラー

ValueError: Structure signature must be a single byte value 

おかげアドバンス。

答えて

3

あなたのRDD、例にforeachを行うことができます:私は、しかし、バッチに書き込みを機能を改善する

from neo4j.v1 import GraphDatabase, basic_auth 
driver = GraphDatabase.driver("bolt://localhost", auth=basic_auth("",""), encrypted=False) 
from pyspark import SparkContext 

sc = SparkContext() 
dt = sc.parallelize(range(1, 5)) 

def write2neo(v): 
    session = driver.session() 
    session.run("CREATE (n:Node {value: {v} })", {'v': v}) 
    session.close() 


dt.foreach(write2neo) 

が、この単純なスニペットは、基本的な実装例と

UPDATEのために働いていますOFバッチ処理は書いて

sc = SparkContext() 
batch = [] 
max = None 
processed = 0 

def writeBatch(b): 
    print("writing batch of " + str(len(b))) 
    session = driver.session() 
    session.run('UNWIND {batch} AS elt CREATE (n:Node {v: elt})', {'batch': b}) 
    session.close() 

def write2neo(v): 
    batch.append(v) 
    global processed 
    processed += 1 
    if len(batch) >= 500 or processed >= max: 
     writeBatch(batch) 
     batch[:] = [] 

dt = sc.parallelize(range(1, 2136)) 
max = dt.count() 
dt.foreach(write2neo) 

から との結果

16/09/15 12:25:47 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 
writing batch of 500 
writing batch of 500 
writing batch of 500 
writing batch of 500 
writing batch of 135 
16/09/15 12:25:47 INFO PythonRunner: Times: total = 279, boot = -103, init = 245, finish = 137 
16/09/15 12:25:47 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1301 bytes result sent to driver 
16/09/15 12:25:47 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 294 ms on localhost (1/1) 
16/09/15 12:25:47 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/09/15 12:25:47 INFO DAGScheduler: ResultStage 1 (foreach at /Users/ikwattro/dev/graphaware/untitled/writeback.py:36) finished in 0.295 s 
16/09/15 12:25:47 INFO DAGScheduler: Job 1 finished: foreach at /Users/ikwattro/dev/graphaware/untitled/writeback.py:36, took 0.308263 s 
+0

回答がうまく見えますが、巨大なデータセットはどうですか。あなたが言ったようにバッチ処理で巨大なデータセットのために同じものを実装する方法。 –

+0

バッチ処理の例を使って自分の答えを更新しました –

+0

キー値のペアを解くときにエラーが発生しました。 session.run( 'UNWIND {バッチ} AS elt MERGE(n:user1 {ユーザー:elt [0]})'、{'batch':b})。 elt [0]がキーで、値のリストが値にあります。どのようにしてこのすべてをサイファーにすることができますか? –

関連する問題