UDFで実行できない複雑な機能を使用して、pysparkのhiveContextデータフレーム列を更新できますか?複雑な機能を持つpysparkデータフレーム列を更新する
私は、多くの列を含むデータフレームを持っていますが、そのうちの2つの列はタイムスタンプとデータと呼ばれます。データのタイムスタンプを特定の条件を満たす場合は、データのJSON文字列からタイムスタンプを取得し、タイムスタンプ列を更新する必要があります。私はそのデータフレームが不変であることを知っていますが、古いデータフレームのすべての列を保持しながら、タイムスタンプ列を更新する新しいデータフレームをどうにか構築することは可能ですか?
私がやりたいものを示すコード:
def updateTime(row):
import json
THRESHOLD_TIME = 60 * 30
client_timestamp = json.loads(row['data'])
client_timestamp = float(client_timestamp['timestamp'])
server_timestamp = float(row['timestamp'])
if server_timestamp - client_timestamp <= THRESHOLD_TIME:
new_row = ..... # copy contents of row
new_row['timestamp'] = client_timestamp
return new_row
else:
return row
df = df.map(updateTime)
私はタプルに行の内容をマッピングし、その後.toDF(バックデータフレームに変換することを考えた)が、私は見つけることができません行の内容をタプルにコピーして列名を戻す方法です。
「UDF」を使用するとどうなりますか? –
多分この記事は助けることができます:http://www.sparktutorials.net/using-sparksql-udfs-to-create-date-times-in-spark-1.5 –
私はHDFの代わりにUDFを意味して申し訳ありません...タイプミス.. 。 – SK2