2017-05-02 3 views
1

これは私が持っているデータは、次のとおりです。pysparkでは、データフレームの列を通してフィルタ関数をループする方法は?

**name** **movie** 
jason  a 
jason  b 
jason  c 
mike   a 
mike   b 
bruce  a 
bruce  c 
ryan   b 

私の目標は、私はpysparkを使用しています。この

**name** **# of moive** 
jason  a,b,c 
mike   a,b 
bruce  a,c 
ryan   b 

を作成し、このスタッフを行うためにUDFを使用しようとすることです。私はこの関数を定義し、スパークは基本関数 'filter'を呼び出すためエラーとなりました。これは新しいワーカーを開始する際に問題になります。

私のロジックは、まずサブセットを作るためにフィルタを使い、次に行数が映画の数になります。その後、私はこのUDFで新しい列を作成します。

def udf(user_name): 
    return df.filter(df['name'] == user_name).select('movie').dropDuplictes()\ 
            .toPandas['movie'].tolist() 

df.withColumn('movie_number', udf(df['name'])) 

しかし、機能しません。基本的なスパーク機能を持つUDFを作成する方法はありますか?

リストに名前の列を作成してリストをループしますが、それは非常に遅いです。私は分散コンピューティングをやりませんでした。

1)私の優先事項は、spark_df.filterのような基本機能を持つpysparkデータフレームの1つの列の情報をループする方法を理解することです。

2)私たちは最初にRDDに名前列を作成し、そのRDDをループするために私のUDFを使うことができるので、分散コンピューティングを利用できますか?

3)私は同じ構造(名前/映画)との2つのテーブルを持っていますが、別の年のために、2005年と2007年のように、我々は構造第三のテーブルを作成するための効率的な方法を持つことができる場合である。

**name** **movie** **in_2005** **in_2007** 
jason  a   1   0 
jason  b   0   1 
jason  c   1   1 
mike   a   0   1 
mike   b   1   0 
bruce  a   0   0 
bruce  c   1   1 
ryan   b   1   0 

1と0は、この男が2005/2007年に映画にコメントしたかどうかを意味します。この場合、元の表は次のようになります。

2005:

**name** **movie** 
jason  a 
jason  c 
mike   b 
bruce  c 
ryan   b 

**name** **movie** 
jason  b 
jason  c 
mike   a 
bruce  c 

2007と私の考えは、「今年の列と一緒に2つのテーブルをCONCATすることであり、使用所望の構造を得るためのピボットテーブル。

答えて

0

groupbyを使用して、データフレーム全体をRDDにするのではなく、collect_listに従うことをお勧めします。 UDFは後に適用できます。

import pyspark.sql.functions as func 

# toy example dataframe 
ls = [ 
    ['jason', 'movie_1'], 
    ['jason', 'movie_2'], 
    ['jason', 'movie_3'], 
    ['mike', 'movie_1'], 
    ['mike', 'movie_2'], 
    ['bruce', 'movie_1'], 
    ['bruce', 'movie_3'], 
    ['ryan', 'movie_2'] 
] 
df = spark.createDataFrame(pd.DataFrame(ls, columns=['name', 'movie'])) 

df_movie = df.groupby('name').agg(func.collect_list(func.col('movie'))) 

さて、これは新しい列moviesに対処するためにudfを作成するための一例です。各行の長さを計算する方法の例を示します。

def movie_len(movies): 
    return len(movies) 
udf_movie_len = func.udf(movie_len, returnType=StringType()) 

df_movie.select('name', 'movies', udf_movie_len(func.col('movies')).alias('n_movies')).show() 

これは与える:

+-----+--------------------+--------+ 
| name|    movies|n_movies| 
+-----+--------------------+--------+ 
|jason|[movie_1, movie_2...|  3| 
| ryan|   [movie_2]|  1| 
|bruce| [movie_1, movie_3]|  2| 
| mike| [movie_1, movie_2]|  2| 
+-----+--------------------+--------+ 
+0

おかげで、私の質問1は、基本的なpyspark関数でUDFを使用する方法ですが、私は私の質問を編集しました。また、私が学びたいのは、列内の値によってデータフレームをスライスし、それらの部分集合の変換を行うことです。 – Olap

+0

@Olap、私はあなたの質問に従って私の解決策を変更しました。それでも 'groupby'を使うことをお勧めします。その後、udfを適用することができます。 – titipata

+0

ありがとう、私は複数のテーブルで遊ぶことに関する別の質問があります、あなたは助けてくださいできますか?私は質問を更新しました – Olap

関連する問題