2017-06-08 10 views
2

私は初心者です。私のレガシーなPythonコードをPysparkを使ってSparkに変換しています。Pyspark isin function

私は

usersofinterest = actdataall[actdataall['ORDValue'].isin(orddata['ORDER_ID'].unique())]['User ID'] 

どちらも以下のコードのPyspark相当を取得したいと思い、actdataallorddataは、Sparkデータフレームです。

toPandas()機能を使用したくない場合、その機能に関連する欠点があります。

何か助けていただければ幸いです。

+0

を私は答えの編集をしました –

答えて

0

したがって、2つのスパークデータフレームがあります。 1つはactdataallで、もう1つはorddataです。その後、次のコマンドを使用してあなたの欲望の結果を取得します。

usersofinterest = actdataall.where(actdataall['ORDValue'].isin(orddata.select('ORDER_ID').distinct().rdd.flatMap(lambda x:x).collect()[0])).select('User ID') 
0
  • 両方のデータフレームが大きい場合には、フィルタとして機能しますどの内部結合を使用して検討する必要があります。

    まず我々は維持したいためのIDを含むデータフレームを作成してみましょう:

    orderid_df = orddata.select(orddata.ORDER_ID.alias("ORDValue")).distinct() 
    

    今度は、私たちのactdataallのデータフレームとそれに参加してみましょう:

    usersofinterest = actdataall.join(orderid_df, "ORDValue", "inner").select('User ID').distinct() 
    
  • あなたのターゲットリストが小さい場合、furianpanditの投稿に記載されているようにpyspark.sql isin関数を使用できます。変数を使用する前にブロードキャストすることを忘れないでください(sparkはオブジェクトをはるかに高速自分のタスクを作成するノード):

    orderid_list = orddata.select('ORDER_ID').distinct().rdd.flatMap(lambda x:x).collect()[0] 
    sc.broadcast(orderid_list) 
    
0

あなたのコードの最も直接的な翻訳は次のようになります。

from pyspark.sql import functions as F 

# collect all the unique ORDER_IDs to the driver 
order_ids = [x.ORDER_ID for x in orddata.select('ORDER_ID').distinct().collect()] 

# filter ORDValue column by list of order_ids, then select only User ID column 
usersofinterest = actdataall.filter(F.col('ORDValue').isin(order_ids)).select('User ID') 

しかし、あなたがshoul 'ORDER_ID'の数が確実に小さい(おそらく< 100,000程度)場合にのみ、このようなフィルタリングを行います。

'ORDER_ID'の数が多い場合は、order_idsのリストを各実行者に送信するブロードキャスト変数を使用して、order_idsと比較して処理を高速化する必要があります。これは、 'ORDER_ID'が小さい場合でも機能することに注意してください。

order_ids = [x.ORDER_ID for x in orddata.select('ORDER_ID').distinct().collect()] 
order_ids_broadcast = sc.broadcast(order_ids) # send to broadcast variable 
usersofinterest = actdataall.filter(F.col('ORDValue').isin(order_ids_broadcast.value)).select('User ID') 

放送変数の詳細については、チェックアウト:https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-broadcast.html