2016-11-30 6 views
0

データフレーム列でカスタム関数を実行します。列には長い文字列があり、いくつかの電子メールが含まれています。文字列の形式は次のようなものです:PySparkは列にカスタム関数を適用します

"Don Joe<[email protected]>, Matt Scheurer <[email protected]>, Dan Lawler <[email protected]>" 

私は電子メールを抽出するために正規表現を実行するためにきた、そして、私は全体の列にありますどのように多くのユニークな電子メール検索しました。

私は正規表現を書いて、Pythonで独自の電子メールリストを作成することができます。しかし、私はspark dataframeにこの関数をどのように適用するのか分かりません。私はこのようなことをやってみました:

all_names = set() 

def get_distinct_users(userlist): 
    global all_names 
    for email in re.findall('\<\S*\>',userlist): 
     all_names.add(email) 

get_distinct_users_udf = udf(get_distinct_users,StringType()) 
users = users.withColumn("user_count",get_distinct_users_udf(users["users"])) 

しかし、gloabl変数のall_namesは更新されていません。 UDFを作成する代わりにマップ関数を適用するか、集計関数の一種であるために減らす必要がありますか?

あなたがこれを行うことができます

+0

もちろん、これはうまくいかないでしょう。それぞれのエグゼキュータは 'all_names'の独自のコピーを取得します。他のエグゼキュータはそれにアクセスできません... – user4601931

+0

'all_names'をアキュムレータにするとどうなりますか? – anwartheravian

+0

数値型には[ビルトインサポート](http://spark.apache.org/docs/latest/programming-guide.html#accumulators)しかありませんが、あなた自身で作ることができます。 – user4601931

答えて

2

一つの方法は、例えば、

import re 

def get_email(x): 
    return re.findall("\<\S*\>", x) 

uniqueEmails = users.select("users").rdd\ 
    .flatMap(lambda x: get_email(x[0]))\ 
    .distinct() 

が別個の電子メールアドレスのRDDとなり、カラム上の電子メールアドレスのリストを抽出するためにflatMap関数にあります。

関連する問題