2016-08-03 15 views
0

私はSpark 1.5.2をPython3で使用しています。私はpysparkに2つのデータフレームを持っています。今、私はnew_dfで新しい値でold_dfにいくつかの行を更新したい部分的にスパークデータフレームを更新する(一部の行を更新する)

old_df = 

src | rank 
------ | ------ 
    a| 1 
    b| 1 
    c| 1 
    d| 1 
    e| 1 
    f| 1 
    g| 1 

new_df = 
src|    rank 
---|----------------- 
    a|    0.5 
    b|0.3333333333333333 
    c|1.6666666666666665 
    d|    1.5 

:彼らはのように見えます。私が思いついた解決策は、最初の二つのデータフレームを連結して、dropduplicatesを実行することである

src | rank 
------ | ------ 
    a|    0.5 
    b|0.3333333333333333 
    c|1.6666666666666665 
    d|    1.5 
    e| 1 
    f| 1 
    g| 1 

:私の目標は、どのように見える、新しいデータフレームを生成することです。間違ったデータフレームになった「ドロップ」アクションを実行するときに

new_df = new_df.unionAll(old_df).dropDuplicates(['src']) 

しかし、私の失望に、スパークは、最初のレコードを保持しませんでした。

解決する方法はありますか?あるいは、仕事をやり遂げるための別の方法はありますか?

+0

あなたの例は私にとって完璧に機能しますが、あなたのソリューションには何が問題なのですか? –

+0

'dropduplicates'アクションは、重複したレコードをランダムに削除するようです。この動作はSparkのバージョンに依存するようです。私の解決策はSpark 1.6.2で動作しますが、Spark 1.5.2では失敗します。 –

答えて

0

あなたは...

import org.apache.spark.sql.funtions._ 

odl_df.join(new_df, "src") 
    .withColumn("finalRank", 
     when(new_df("rank").isNull, odl_df("rank")) 
      .otherwise(new_df("rank")) 
    .drop(new_df("rank")) 
    .drop(odl_df("rank")) 
    .withColumnRenamed("finalRank", "rank") 

新しいランクは、最終的なDFで常にこのアサートをSQL関数でこれを解決して参加することができます。

+0

はい、 'when'節が機能します。ありがとうございました。 –

関連する問題