2017-09-13 10 views
1

は、私は次のようなデータを持っていると言う:Sparkで分解された構造体に列を追加するには?

{"id":1, "payload":[{"foo":1, "lol":2},{"foo":2, "lol":2}]} 

私はペイロードを爆発し、このように、それに列を追加したいと思います:

df = df.select('id', F.explode('payload').alias('data')) 
df = df.withColumn('data.bar', F.col('data.foo') * 2) 

しかし、これは3列のデータフレームのような結果になります。

  • id
  • data
  • data.bar

私が代わりにトップレベルの列を追加するので、爆発構造体に列を追加するにはどうすればよい

... data.bardata構造体の一部となることが期待?

+1

スキーマを再構築し、 'select'を使うか、' udf'を使ってデータを変更する必要があります。これらのオプションについては、https://stackoverflow.com/questions/31615657/で詳しく説明しています。新しい構造体の列をデータフレームに追加する –

+0

[データフレームに新しい構造体の列を追加する方法](https://stackoverflow.com/questions/31615657/how- Add-a-new-struct-column-to-a-dataframe) –

答えて

1
df = df.withColumn('data', f.struct(
    df['data']['foo'].alias('foo'), 
    (df['data']['foo'] * 2).alias('bar') 
)) 

これはになります:

root 
|-- id: long (nullable = true) 
|-- data: struct (nullable = false) 
| |-- col1: long (nullable = true) 
| |-- bar: long (nullable = true) 

UPDATE

def func(x): 
    tmp = x.asDict() 
    tmp['foo'] = tmp.get('foo', 0) * 100 
    res = zip(*tmp.items()) 
    return Row(*res[0])(*res[1]) 

df = df.withColumn('data', f.UserDefinedFunction(func, StructType(
    [StructField('foo', StringType()), StructField('lol', StringType())]))(df['data'])) 

P.S.

スパークはほとんどサポートしていませんインプレース opreation。

入れ墨を実行するたびに、を実行すると、実際にはが置き換えられます。

+0

これは間違いなく正しい方向に進んでいます! 'data'の内容について知ることなくこれを行う方法はありますか(もちろん' data.foo'を除く)?これをもっと明確にするために、追加の 'data.lol'カラムを追加するように質問を編集しました。 – surjikal

関連する問題