2016-05-14 9 views
0

私はPySparkでJupyter Notebookを使用しています。その中には、列の名前と型(整数、...)を持つスキーマを持つデータフレームがあります。今私はflatMapのようなメソッドを使用しますが、これは固定型をもたないタプルのリストを返します。それを達成する方法はありますか?その後PySparkフラットマップは型付き値のタプルを返します

df.printSchema() 
root 
|-- name: string (nullable = true) 
|-- ... 
|-- ... 
|-- ratings: integer (nullable = true) 

私は評価値を持ついくつかの計算(ここでは難読化)を行うにflatMapを使用します。

df.flatMap(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings)) 
y_rate.toDF().printSchema() 

そして今、私はエラーを取得:

TypeError: Can not infer schema for type:

をどのような方法にはありますスキーマを維持してmap/flatMap/reduceを使用しますか?少なくとも特定の型の値を持つタプルを返すか?

答えて

1

まず、間違った機能を使用しています。 flatMapmapflattenので、あなたのデータは次のようになりますと仮定します。したがって

sc.parallelize(['foo', 0, 'bar', 5]) 

ご覧エラー:flatMap

df = sc.parallelize([("foo", 0), ("bar", 10)]).toDF(["id", "ratings"]) 

出力は同じになります。あなたは本当にそれはあなたがmapを使用する必要があります動作するようにしたい場合:

df.rdd.map(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings)).toDF() 
## DataFrame[_1: string, _2: bigint] 

次に、DataFrame以上のマッピングはもはや2.0でサポートされていません。最初にrddを抽出する必要があります(上記のdf.rdd.mapを参照)。

最後に、PythonとJVM間でデータを渡すことは非常に効率が悪いです。対応するシリアライゼーション/デシリアライゼーションとスキーマの推論(スキーマが明示的に提供されていない場合)を使用して、PythonとJVMの間でデータを渡す必要があるだけでなく、怠惰も解消されます。

from pyspark.sql.functions import when 

df.select(df.id, when(df.ratings > 5, 5).otherwise(df.ratings)) 

なんらかの理由でプレーンなPythonコードが必要な場合は、UDFを使用する方がよいでしょう。

+0

非常に役に立ちます。あなたのサンプルコードをありがとう。私はflatMap vs Mapで部品を手に入れませんでした。 – Matthias

+1

'flatMap'は関数' RDD [T] =>(T => Iterable [U])=> RDD [U] 'です。言い換えれば、関数は 'Itereble'(Pythonタプル)を返す関数を想定し、これらの結果を連結(平坦化)します。 – zero323

+0

when/otherwiseカラムにその文の名前を付ける方法はありますか? 'df.rating(df.ratings、df.ratings 5​​,5).otherwise(df.ratings)' @ zero323 – Matthias

関連する問題