2016-05-31 15 views
1

コードの目的は、並列化の利点を得るためにRDDで動作する "myFunc"メソッドに基づいていくつかのロジックをロードすることです。pysparkに関数を渡す

次の行: df_rdd = ParallelBuild()()マップ(ラムダ行:行)を実行し(ParallelBuild()myFuncという).persist() をR = df_rdd.map

は私に出口を与え

r.countを()私に与えます:

TypeError: 'JavaPackage' object is not callable 

顕著なものであることが確認された:0読書Googleはスパークが何らかのアクションが効果をトリガーすると、私は追加して遅延評価であることが示唆 R = df_rdd。地図(Parall elBuild()。myFunc)

"pipelinedrdd"には何が表示されているのかわかりませんが、何らかの変換がありますか?

runメソッドを削除して実装したときの面白い点は、data = [(1、 'a')、(1、 'b' (3、 'y')、(1、 'f')] df = sqlContext.createDataFrame(data(3、 'r')、(4、 'a')、 、schema = ['uid'、 'address_uid'])

私の主な機能では、物事はうまくいきました。しかし明らかに私のコードのモジュール化された部分が緩んでいます。

コード:

from pyspark import SparkContext 
from pyspark.sql import SQLContext, HiveContext 
import csv, io, StringIO 
from pyspark.sql.functions import * 
from pyspark.sql import Row 
from pyspark.sql import * 
from pyspark.sql import functions as F 
from pyspark.sql.functions import asc, desc 
sc = SparkContext("local", "Summary Report") 
sqlContext = SQLContext(sc) 


class ParallelBuild(object): 

def myFunc(self, s): 
    l = s.split(',') 
    print l[0], l[1] 
    return l[0] 


def list_to_csv_str(x): 
    output = StringIO.StringIO("") 
    csv.writer(output).writerow(x) 
    return output.getvalue().strip() 

def run(self): 
    data = [(1,'a'), (1,'b'), (1,'c'), (2,'d'), (3,'r'), (4,'a'), (2,'t'), (3,'y'), (1,'f')] 
    df = sqlContext.createDataFrame(data, schema= ['uid', 'address_uid']) 
    return df 


if __name__ == "__main__": 

    df_rdd = ParallelBuild().run().map(lambda line: line).persist() 
    r = df_rdd.map(ParallelBuild().myFunc) 
    r.count() 

答えて

0

オクラホマので、あなたの主な質問は、 "なぜ、印刷は何もないですか?" され答えには2つの部分があります。

  1. 分散コンピューティングでは実際にはprintできません。したがって、あなたの関数myFuncはドライバに何も印刷しません。この理由はかなり複雑なので、なぜSparkで印刷が実際には機能しないのかについては、this pageにお問い合わせください。しかし

r.count()9をプリントアウトする必要があり呼び出します。なぜそれは働かないのですか?

  1. あなたの機能myFuncはあまり意味がありません。 r = df_rdd.map(ParallelBuild().myFunc)に電話すると、df_rddになります。しかし、これはすでにDataFrameです。このDataFrameの各行はタイプであり、df_rdd.first()を呼び出すとRow(uid=1, address_uid=u'a')が得られます。 myFuncで行っていることは、splitを使用しようとしていますが、splitは文字列オブジェクト用で、のオブジェクトがあります。なぜこれがエラーを投げているのかわかりませんが、オブジェクトではsplitを呼び出すことはできません。 r = df_rdd.map(lambda x: x[0])の行に沿ってさらに何かを考えてみてください。

myFuncに電話すると何かが混乱するため、r.count()は機能しません。


サイドノート:

df_rdd = ParallelBuild().run().map(lambda line: line).persist()。実行中の.map(lambda line: line)は何もしません。 lineを変更していないので、mapジョブを実行しないでください。コードをdf_rdd = ParallelBuild().run().persist()

にする