私は以下の構造のスパークデータフレームを持っています。 bodyText_tokenには、トークン(処理された/単語のセット)があります。そして私は、私は各キーワードのリストに該当どのように多くのトークンをチェックし、既存のデータフレームの新しい列として結果を追加するために必要な定義されたキーワード データフレームの列と外部リストをwithColumnの下のudfに渡す
root
|-- id: string (nullable = true)
|-- body: string (nullable = true)
|-- bodyText_token: array (nullable = true)
keyword_list=['union','workers','strike','pay','rally','free','immigration',],
['farmer','plants','fruits','workers'],['outside','field','party','clothes','fashions']]
のネストされたリストを持っています。 例:
tokens =["become", "farmer","rally","workers","student"]
結果は - > [1,2,0]
次の機能は期待通りに機能しました。
def label_maker_topic(tokens,topic_words):
twt_list = []
for i in range(0, len(topic_words)):
count = 0
#print(topic_words[i])
for tkn in tokens:
if tkn in topic_words[i]:
count += 1
twt_list.append(count)
return twt_list
機能にアクセスするためにudfをwithColumnで使用しましたが、エラーが発生します。私はそれが外部リストをudfに渡すことだと思います。外部リストとdatafram列をudfに渡してデータフレームに新しい列を追加する方法はありますか?
topicWord = udf(label_maker_topic,StringType())
myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token,keyword_list))
に渡すことができる場所これは動作しますが、UDFは、 'topic_wordsを持っていますので、私は、これは慎重になり正常に動作します'udfが定義された瞬間の値。 'topic_words'を変更してudfを再利用することはできません。udfが定義された時点で' topic_words'の値を使用します。 – CHP