コードの目的は、並列化の利点を得るためにRDDで動作する "myFunc"メソッドに基づいていくつかのロジックをロードすることです。pysparkに関数を渡す
次の行: df_rdd = ParallelBuild()()マップ(ラムダ行:行)を実行し(ParallelBuild()myFuncという).persist() をR = df_rdd.map
は私に出口を与え
r.countを()私に与えます:
TypeError: 'JavaPackage' object is not callable
顕著なものであることが確認された:0読書Googleはスパークが何らかのアクションが効果をトリガーすると、私は追加して遅延評価であることが示唆 R = df_rdd。地図(Parall elBuild()。myFunc)
"pipelinedrdd"には何が表示されているのかわかりませんが、何らかの変換がありますか?
runメソッドを削除して実装したときの面白い点は、data = [(1、 'a')、(1、 'b' (3、 'y')、(1、 'f')] df = sqlContext.createDataFrame(data(3、 'r')、(4、 'a')、 、schema = ['uid'、 'address_uid'])
私の主な機能では、物事はうまくいきました。しかし明らかに私のコードのモジュール化された部分が緩んでいます。
コード:
from pyspark import SparkContext
from pyspark.sql import SQLContext, HiveContext
import csv, io, StringIO
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql import *
from pyspark.sql import functions as F
from pyspark.sql.functions import asc, desc
sc = SparkContext("local", "Summary Report")
sqlContext = SQLContext(sc)
class ParallelBuild(object):
def myFunc(self, s):
l = s.split(',')
print l[0], l[1]
return l[0]
def list_to_csv_str(x):
output = StringIO.StringIO("")
csv.writer(output).writerow(x)
return output.getvalue().strip()
def run(self):
data = [(1,'a'), (1,'b'), (1,'c'), (2,'d'), (3,'r'), (4,'a'), (2,'t'), (3,'y'), (1,'f')]
df = sqlContext.createDataFrame(data, schema= ['uid', 'address_uid'])
return df
if __name__ == "__main__":
df_rdd = ParallelBuild().run().map(lambda line: line).persist()
r = df_rdd.map(ParallelBuild().myFunc)
r.count()