2017-01-05 21 views
7

例(Python)の例は私の質問を明確にします。のは、次のように私は、特定の日に特定の映画を見た人々のスパークデータフレームを持っているとしましょう:前の行の配列を累積する(PySparkデータフレーム)

movierecord = spark.createDataFrame([("Alice", 1, ["Avatar"]),("Bob", 2, ["Fargo", "Tron"]),("Alice", 4, ["Babe"]), ("Alice", 6, ["Avatar", "Airplane"]), ("Alice", 7, ["Pulp Fiction"]), ("Bob", 9, ["Star Wars"])],["name","unixdate","movies"]) 

スキーマと以下のような外観で定義されたデータフレーム:

root 
|-- name: string (nullable = true) 
|-- unixdate: long (nullable = true) 
|-- movies: array (nullable = true) 
| |-- element: string (containsNull = true) 

+-----+--------+------------------+ 
|name |unixdate|movies   | 
+-----+--------+------------------+ 
|Alice|1  |[Avatar]   | 
|Bob |2  |[Fargo, Tron]  | 
|Alice|4  |[Babe]   | 
|Alice|6  |[Avatar, Airplane]| 
|Alice|7  |[Pulp Fiction] | 
|Bob |9  |[Star Wars]  | 
+-----+--------+------------------+ 

私は思います上記から新しいデータフレーム列を生成することになります。これは、すべて前のムービーを各ユーザに表示し、重複しないようにします(unixdateフィールドには「previous」)。したがって、次のようになります。

+-----+--------+------------------+------------------------+ 
|name |unixdate|movies   |previous_movies   | 
+-----+--------+------------------+------------------------+ 
|Alice|1  |[Avatar]   |[]      | 
|Bob |2  |[Fargo, Tron]  |[]      | 
|Alice|4  |[Babe]   |[Avatar]    | 
|Alice|6  |[Avatar, Airplane]|[Avatar, Babe]   | 
|Alice|7  |[Pulp Fiction] |[Avatar, Babe, Airplane]| 
|Bob |9  |[Star Wars]  |[Fargo, Tron]   | 
+-----+--------+------------------+------------------------+ 

効率的な方法でこれを実装するにはどうすればよいですか?

答えて

5

SQLのみためのオブジェクトのを保存せずに:

  • 必要な輸入:

    import pyspark.sql.functions as f 
    from pyspark.sql.window import Window 
    
  • ウィンドウ定義:

    w = Window.partitionBy("name").orderBy("unixdate") 
    
  • 完全なソリューション:

    (movierecord 
        # Flatten movies 
        .withColumn("previous_movie", f.explode("movies")) 
        # Collect unique 
        .withColumn("previous_movies", f.collect_set("previous_movie").over(w)) 
        # Drop duplicates for a single unixdate 
        .groupBy("name", "unixdate") 
        .agg(f.max(f.struct(
         f.size("previous_movies"), 
         f.col("movies").alias("movies"), 
         f.col("previous_movies").alias("previous_movies") 
        )).alias("tmp")) 
        # Shift by one and extract 
        .select(
         "name", "unixdate", "tmp.movies", 
         f.lag("tmp.previous_movies", 1).over(w).alias("previous_movies"))) 
    
  • 結果:

    +-----+--------+------------------+------------------------+ 
    |name |unixdate|movies   |previous_movies   | 
    +-----+--------+------------------+------------------------+ 
    |Bob |2  |[Fargo, Tron]  |null     | 
    |Bob |9  |[Star Wars]  |[Fargo, Tron]   | 
    |Alice|1  |[Avatar]   |null     | 
    |Alice|4  |[Babe]   |[Avatar]    | 
    |Alice|6  |[Avatar, Airplane]|[Babe, Avatar]   | 
    |Alice|7  |[Pulp Fiction] |[Babe, Airplane, Avatar]| 
    +-----+--------+------------------+------------------------+ 
    

SQLのPython UDFため保存:

  • 輸入:

  • UDF:以前のように

    def flatten_distinct(col: Union[Column, str]) -> Column: 
        def flatten_distinct_(xss: Union[List[List[str]], None]) -> List[str]: 
         return compose(list, unique, concat)(xss or []) 
        return f.udf(flatten_distinct_, ArrayType(StringType()))(col) 
    
  • ウィンドウ定義。

  • 完全なソリューション:

    (movierecord 
        # Collect lists 
        .withColumn("previous_movies", f.collect_list("movies").over(w)) 
        # Flatten and drop duplicates 
        .withColumn("previous_movies", flatten_distinct("previous_movies")) 
        # Shift by one 
        .withColumn("previous_movies", f.lag("previous_movies", 1).over(w)) 
        # For presentation only 
        .orderBy("unixdate")) 
    
  • 結果:

    +-----+--------+------------------+------------------------+ 
    |name |unixdate|movies   |previous_movies   | 
    +-----+--------+------------------+------------------------+ 
    |Alice|1  |[Avatar]   |null     | 
    |Bob |2  |[Fargo, Tron]  |null     | 
    |Alice|4  |[Babe]   |[Avatar]    | 
    |Alice|6  |[Avatar, Airplane]|[Avatar, Babe]   | 
    |Alice|7  |[Pulp Fiction] |[Avatar, Babe, Airplane]| 
    |Bob |9  |[Star Wars]  |[Fargo, Tron]   | 
    +-----+--------+------------------+------------------------+ 
    

パフォーマンス:私は制約が与えられ、これを解決するために何の効果的な方法がないと信じて

。要求された出力だけでなく、タングステン・フォーマットに適合するようにバイナリ・コード化されているため、圧縮は可能ですが、オブジェクトの識別は緩やかですが、高価なグループ分けやソートを含むSparkコンピューティング・モデルでは高価です。

previous_moviesのサイズが制限されていて小さくても一般的には実現できない場合は、これは問題ありません。

データの複製は、ユーザーにとって単一の遅延履歴を保持することで簡単に対応できます。 SQLで行うことはできませんが、低レベルのRDD操作では非常に簡単です。

爆発とcollect_パターンが高価です。要件が厳しいが、パフォーマンスを向上させたい場合は、Pythonの代わりにScala UDFを使用できます。

関連する問題