複数の列の値を更新するが、スパークが急速に変化し、多くの回答が古くなっているように見えるデータセット全体を返すための最良の方法を特定するのに少し苦労しています。特定の列のPySpark更新値
私は次のようにデータフレームを作成する小規模なクラスタ上でスパーク2.1を実行している:
df = spark.read.options(header="true",sep = '|').csv(path = 'file:///usr//local//raw_data//somefile.txt')
print df.columns
['ID','field1','field2','field3','value'] #there are actually many more columns, this is just an example
私はフィールド1、フィールド2およびフィールド3に以下のマッピング関数を適用されますが、データセット全体を保持する必要が
def mappingFunction(val,dict):
if val in dict:
return dict(val)
else:
return val
非常に単純化し、私はパンダにそうようにこれを行うことができます:
df['field1'] = df['field1'].map(mapDict)
df['field2'] = df['field2'].map(mapDict)
df['field3'] = df['field3'].map(mapDict)
私はn pyspark、df.rdd.map()の機能がありますが、これはこれに近づく "時代遅れ"のように思えます。さらに、基底のデータセットを既にカラムで分割しているので、 RDDに戻る必要があります。
また、pyspark.sql.functions.udf(f、returnType = StringType)も参照してください。これは私が使いたいと思うようです。
私の質問は以下のとおりです。
誰かがUDFを定義すると、このインスタンスに行くための正しい方法であることを確認してもらえますか?
もしそうなら、一度に複数の列にUDFを適用するにはどうすればよいですか?私は行を繰り返し処理するので、一度に3つの列すべてにマッピング関数を適用するのが最適なクエリ設計のようですが、他のすべてのコンテキストでそれを行う方法がわかりません。やっている。
これらの値を更新して、完全なデータセットを返すにはどうすればよいですか?私がやっているすべての集計/操作は、更新された列の値を使用する必要があります。
洞察力がありがとう!
ありがとう!非常に役立ちます!最後の質問 - 私はこれらの更新を "永続的"にしたい、それで私は他の集計/計算を実行できる。今すぐ最後の出力はちょうど.show()関数です。最後の行を次のように置き換えますか: df = df.select(*(lookup_udf(col(c)))エイリアス(c)df.columnsの中でcのcの場合はcsを、cの場合はc)collect() – flyingmeatball
df1 = df.select(..)のような最後に '.show '、' .show() 'で終わって、データがどのように変換されたかを表示します。最後に' collect() 'を使わないと、データがドライバノードに持ち込まれます。 – mtoto
Duh - 私は知っていました1つありがとう! – flyingmeatball