2017-05-23 11 views
0

私の開発環境はcentos7、hbase 1.2.5、happybase 1.1.0、python 2.7、PyCharm、hadoop 2.7.3、spark 2.1です。 私は大きなデータソフトウェアを開発しています。 HBaseテーブルに値を入れる必要があります。値はSpark RDDの値です。コードは次のとおりです。happybaseを通じてhbaseテーブルに値を入れる方法は?

import happybase 
from pyspark import SparkContext, SparkConf 

connection = happybase.Connection('localhost') 
table = connection.table('tablename') 
conf = SparkConf().setAppName("myFirstSparkApp").setMaster("local") 
sc = SparkContext(conf=conf) 
distFile = sc.textFile("/inputFilePath/") 
newLines = distFile.filter(lambda x: 'filter":' in x) 
newLines = newLines.map(lambda line:line.split('"')) 
# The following line is working. Insert a row into the table. 
table.put(b'row-key0', {'billCode:': '222', 'trayCode:': '222', 'pipeline:': '333'}) 
# But the following line is not working. what is wrong? Why? 
newLines.foreach(lambda x: table.put(b'row-key', {'billCode:': x[7], 'trayCode:': x[3], 'pipeline:': x[11]})) 

最終行コードが機能していません。エラーメッセージは以下のとおりです。

はImportError: pickle.PicklingError cybinという名前のモジュール:はImportError::オブジェクトをシリアル化できませんでした

cybinという名前のモジュールは、私が火花+ happybase +のpythonの新しい開発者ですありません。それを解決する方法?親切に助けが必要です。ありがとうございました。

+0

エラーメッセージを読んでください - それはあなたの質問と一緒には行きません – Drako

+0

この行のコードはデバッグでは機能していません。 newLines.foreach(lambda x:table.put(b'row-key '、{' billCode: ':x [7]、' trayCode: ':x [3]、' pipeline: ':x [11] })) –

答えて

0

ここに簡単な例があります。

import happybase 
from pyspark import SparkContext, SparkConf 
conf = SparkConf().setAppName("App").setMaster("local") 
sc = SparkContext(conf=conf) 
rdd = parallelize([("a","1"),("b","2")]) 
def func(x): 
    conn = happybase.Connection('localhost') 
    table = conn.table("table_name") 
    table.put(x[0],{"cf:c":x[1]}) 
    conn.close() 
rdd.foreach(func) 

でもない完璧な、あなたはhttp://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd 幸運を参照することができます。

関連する問題