2016-05-10 7 views
0

UDFで実行できない複雑な機能を使用して、pysparkのhiveContextデータフレーム列を更新できますか?複雑な機能を持つpysparkデータフレーム列を更新する

私は、多くの列を含むデータフレームを持っていますが、そのうちの2つの列はタイムスタンプとデータと呼ばれます。データのタイムスタンプを特定の条件を満たす場合は、データのJSON文字列からタイムスタンプを取得し、タイムスタンプ列を更新する必要があります。私はそのデータフレームが不変であることを知っていますが、古いデータフレームのすべての列を保持しながら、タイムスタンプ列を更新する新しいデータフレームをどうにか構築することは可能ですか?

私がやりたいものを示すコード:

def updateTime(row): 
    import json 

    THRESHOLD_TIME = 60 * 30 
    client_timestamp = json.loads(row['data']) 
    client_timestamp = float(client_timestamp['timestamp']) 
    server_timestamp = float(row['timestamp']) 
    if server_timestamp - client_timestamp <= THRESHOLD_TIME: 
     new_row = ..... # copy contents of row 
     new_row['timestamp'] = client_timestamp 
     return new_row 
    else: 
     return row 

df = df.map(updateTime) 

私はタプルに行の内容をマッピングし、その後.toDF(バックデータフレームに変換することを考えた)が、私は見つけることができません行の内容をタプルにコピーして列名を戻す方法です。

+0

「UDF」を使用するとどうなりますか? –

+0

多分この記事は助けることができます:http://www.sparktutorials.net/using-sparksql-udfs-to-create-date-times-in-spark-1.5 –

+0

私はHDFの代わりにUDFを意味して申し訳ありません...タイプミス.. 。 – SK2

答えて

0

あなたがパラメータとしてタイムスタンプを受信して​​、新しい処理済みのタイムスタンプを返すために、あなたのupdateTime機能を適応させる場合は、UDFを作成して、データフレームの列に直接それを使用することができます。

from pyspark.sql.functions import * 
from pyspark.sql.types import TimestampType 

myUDF = udf(updateTime, TimestampType()) 
df = df.withColumn("timestamp", myUDF(col("timestamp")) 

しかし、あなたのケースでは、私はそれはもう少し複雑だと思う:

from pyspark.sql.functions import * 
from pyspark.sql.types import TimestampType 

myUDF = udf(getClientTime, TimestampType()) 
client_timestamp = myUDF(col("data")) 
server_timestamp = col("timestamp") 
condition = server_timestamp.cast("float") - client_timestamp.cast("float") <= THRESHOLD_TIME  

newCol = when(condition, client_timestamp).otherwise(server_timestamp) 
newDF = df.withColumn("new_timestamp", newCol) 

この第二のアプローチでは、機能getClientTimedata列から値を受け取り、この値のために、クライアントのタイムスタンプを返します。次に、この情報を含む新しい列(client_timestamp)を作成するために使用することができます。最後にwhenを使用して、server_timestamp列と新しく作成されたclient_timestamp列の値に基づいて条件付きで新しい列を作成できます。

参考:スパークでのUDFを使用する方法の詳細については

+1

ありがとう!このメソッドは、若干の変更を加えて機能します。 UDFが以前にどのように動作していたのか分かりませんでした。代わりにStringType()を返すようにmyUDFを編集し、col( 'column')の代わりにdf ['column']を使用しました – SK2

関連する問題