私はスパークをデータ処理に使用しているプロジェクトに取り組んでいます。私のデータは現在処理されており、データをNeo4jにロードする必要があります。 Neo4jにロードした後、私はそれを使って結果を紹介します。PythonでSpark RDDをNeo4jにロード
Pythonでプログラミングを実装したかったのです。しかし、私はネット上のライブラリやサンプルを見つけることができませんでした。リンクや図書館などの例を参考にしてください。
私のRDDはPairedRDDです。すべてのタプルで、私は関係を作り出さなければなりません。単純化の目的のために
PairedRDD
Key Value
Jack [a,b,c]
、私は
Key value
Jack a
Jack b
Jack c
にRDDを変換それから私はウィリアムの回答に基づいて
Jack->a
Jack->b
Jack->c
間の関係を作成する必要があり、私ができる午前リストを直接ロードする。しかし、このデータはサイファーエラーを投げています。
私はこのように試してみました:
def writeBatch(b):
print("writing batch of " + str(len(b)))
session = driver.session()
session.run('UNWIND {batch} AS elt MERGE (n:user1 {user: elt[0]})', {'batch': b})
session.close()
def write2neo(v):
batch_d.append(v)
for hobby in v[1]:
batch_d.append([v[0],hobby])
global processed
processed += 1
if len(batch) >= 500 or processed >= max:
writeBatch(batch)
batch[:] = []
max = userhobbies.count()
userhobbies.foreach(write2neo)
Bはリストのリストです。 Unwinded eltは、キーと値の2つの要素elt [0]、elt [1]のリストです。
エラー
ValueError: Structure signature must be a single byte value
おかげアドバンス。
回答がうまく見えますが、巨大なデータセットはどうですか。あなたが言ったようにバッチ処理で巨大なデータセットのために同じものを実装する方法。 –
バッチ処理の例を使って自分の答えを更新しました –
キー値のペアを解くときにエラーが発生しました。 session.run( 'UNWIND {バッチ} AS elt MERGE(n:user1 {ユーザー:elt [0]})'、{'batch':b})。 elt [0]がキーで、値のリストが値にあります。どのようにしてこのすべてをサイファーにすることができますか? –