2016-04-19 11 views
2

このエラーは発生していますが、理由はわかりません。 は基本的に私はこのコードからerroringています:pyspark: 'PipelinedRDD'オブジェクトは反復可能ではありません

データはRDDで、私のヘルパーは次のように定義され
a = data.mapPartitions(helper(locations)) 

def helper(iterator, locations): 
     for x in iterator: 
      c = locations[x] 
      yield c 

(場所は、データポイントの配列だけである)私は表示されません を何が問題なのですか?しかし、私もpysparkで最善ではないので、誰かが私に 'PipelinedRDD'オブジェクトがこのコードから反復可能でない理由を教えてください。

+0

に応じてコードを修正するのに役立ちます

result_ll = result.map(lambda elem: list(elem)) 
にマップ機能を使用して反復を置き換えます。 Plsはhttp://stackoverflow.com/questions/25914789/how-do-i-iterate-rdds-in-apache-spark-scalaを見ています – Mohan

+0

@モハン:私は今考えを得ると思うが、私はまだです同じエラーが発生します。私は今これを呼んでいます:a = data.mapPartitions(lambda iterator:helper(iterator、locations))。私は間違って何をしていますか? – deeformvp

答えて

1

マップとラムダ関数を使用してRDDを反復することができます。 'PipelinedRDD' オブジェクトは、これに代えて反復可能

ない:私は[結果にELEMためのリスト(ELEM)]

lines1 = sc.textFile("\..\file1.csv") 
lines2 = sc.textFile("\..\file2.csv") 

pairs1 = lines1.map(lambda s: (int(s), 'file1')) 
pairs2 = lines2.map(lambda s: (int(s), 'file2')) 

pair_result = pairs1.union(pairs2) 

pair_result.reduceByKey(lambda a, b: a + ','+ b) 

result = pair.map(lambda l: tuple(l[:1]) + tuple(l[1].split(','))) 
result_ll = [list(elem) for elem in result] 

---> result_ll =以下の方法を用いてパイプラインRDDを通して はTypeErrorを繰り返しました私は希望は、これはあなたがあなたのやり方でRDDに反復することはできません

関連する問題