0
データフレーム列でカスタム関数を実行します。列には長い文字列があり、いくつかの電子メールが含まれています。文字列の形式は次のようなものです:PySparkは列にカスタム関数を適用します
"Don Joe<[email protected]>, Matt Scheurer <[email protected]>, Dan Lawler <[email protected]>"
私は電子メールを抽出するために正規表現を実行するためにきた、そして、私は全体の列にありますどのように多くのユニークな電子メール検索しました。
私は正規表現を書いて、Pythonで独自の電子メールリストを作成することができます。しかし、私はspark dataframeにこの関数をどのように適用するのか分かりません。私はこのようなことをやってみました:
all_names = set()
def get_distinct_users(userlist):
global all_names
for email in re.findall('\<\S*\>',userlist):
all_names.add(email)
get_distinct_users_udf = udf(get_distinct_users,StringType())
users = users.withColumn("user_count",get_distinct_users_udf(users["users"]))
しかし、gloabl変数のall_namesは更新されていません。 UDFを作成する代わりにマップ関数を適用するか、集計関数の一種であるために減らす必要がありますか?
あなたがこれを行うことができます
もちろん、これはうまくいかないでしょう。それぞれのエグゼキュータは 'all_names'の独自のコピーを取得します。他のエグゼキュータはそれにアクセスできません... – user4601931
'all_names'をアキュムレータにするとどうなりますか? – anwartheravian
数値型には[ビルトインサポート](http://spark.apache.org/docs/latest/programming-guide.html#accumulators)しかありませんが、あなた自身で作ることができます。 – user4601931