2016-08-15 5 views
2

pysparkでApache Spark 2.0を使用すると、1000行のデータが含まれているDataFrameがあり、そのDataFrameを2つの別々のDataFramesに分割/スライスしたいと思います。私はこの分割を繰り返していきようなランダムシードが、十分ではありません:最初のデータフレームが第二のデータフレームは、残りの250行 Apache Sparkでのデータフレームの分割

注意を含まなければならない最初の750行

  • を含むべきである

    • メソッドを複数回実行し、最初と2番目のDataFrameでどのデータが使用されているかを制御したいと考えています。

      最初の結果を生成するのに便利なtake(n)メソッドが見つかりました。
      しかし、私は2番目のDataFrameを取得するための正しい方法(または何らかの方法で)を見つけることができません。

      正しい方向へのポインタがあれば幸いです。

      ありがとうございます。

      更新:take(n)をもう一度ソートして適用して解決策を見つけることができました。これはまだかかわらず、準最適解のように感じている:あなたはそれがドライバーにデータを描画し、その後createDataFrameは、クラスタ全体でそれを再配布するためにテイクを使用して質問するのは正しいです

      # First DataFrame, simply take the first 750 rows 
      part1 = spark.createDataFrame(df.take(750)) 
      # Second DataFrame, sort by key descending, then take 250 rows 
      part2 = spark.createDataFrame(df.rdd.sortByKey(False).toDF().take(250)) 
      # Then reverse the order again, to maintain the original order 
      part2 = part2.rdd.sortByKey(True).toDF() 
      # Then rename the columns as they have been reset to "_1" and "_2" by the sorting process 
      part2 = part2.withColumnRenamed("_1", "label").withColumnRenamed("_2", "features") 
      
  • 答えて

    3

    。これは非効率的で、ドライバがデータを格納するのに十分なメモリを持っていないと失敗する可能性があります。

    は、ここでその上の行のインデックス列とスライスを作成し、ソリューションです:

    from pyspark.sql.functions import monotonicallyIncreasingId 
    
    idxDf = df.withColumn("idx", monotonicallyIncreasingId()) 
    part1 = idxDf.filter('idx < 750') 
    part2 = idxDf.filter('idx >= 750') 
    
    関連する問題