2016-11-30 11 views
0

私は多くの質問をオンラインで見てきましたが、達成しようとしていることをしていないようです。スパーク:データフレームを集約せずに転記

私はScalaでApache Spark 2.0.2を使用しています。

は、私がデータフレームを持っている:私はpivot()を使用してみましたが、私は正しい答えを得ることができなかった

+----+-----+----+----+----+ 
|vals| 1| 2| 3| 4| 
+----+-----+----+----+----+ 
|val1| 100| 0| 0| 0| 
|val2| 0| 50| 0| 0| 
|val3| 0| 0| 0| 0| 
|val4| 0| 0| 0| 0| 
|val5| 0| 20| 0| 0| 
|val6| 0| 0| 0| 0| 
+----+-----+----+----+----+ 

に移調したい

+----------+-----+----+----+----+----+----+ 
|segment_id| val1|val2|val3|val4|val5|val6| 
+----------+-----+----+----+----+----+----+ 
|   1| 100| 0| 0| 0| 0| 0| 
|   2| 0| 50| 0| 0| 20| 0| 
|   3| 0| 0| 0| 0| 0| 0| 
|   4| 0| 0| 0| 0| 0| 0| 
+----------+-----+----+----+----+----+----+ 

。私は私のval{x}列をループして終了し、以下のようにそれぞれをピボットしますが、これは非常に遅いことがわかります。

val d = df.select('segment_id, 'val1) 

+----------+-----+ 
|segment_id| val1| 
+----------+-----+ 
|   1| 100| 
|   2| 0| 
|   3| 0| 
|   4| 0| 
+----------+-----+ 

d.groupBy('val1).sum().withColumnRenamed('val1', 'vals') 

+----+-----+----+----+----+ 
|vals| 1| 2| 3| 4| 
+----+-----+----+----+----+ 
|val1| 100| 0| 0| 0| 
+----+-----+----+----+----+ 

は、その後、私の最初のデータフレームに val{x}の各反復に union()を使用します。

+----+-----+----+----+----+ 
|vals| 1| 2| 3| 4| 
+----+-----+----+----+----+ 
|val2| 0| 50| 0| 0| 
+----+-----+----+----+----+ 

私はデータを集約したくない転置のより効率的な方法がありますか?

感謝:)

+0

データフレームでどうすればいいですか? –

+0

あなたは別の答えを期待していますか、既存のものに満足していますか? –

答えて

1

残念ながらケースはありません。

  • スパークDataFrameは、データの量を考慮し正当化されます。
  • データの転置が可能です。

あなたはDataFrameは、スパークに実装され、行の分散コレクションであり、各列が単一ノード上に格納され、処理されていることを覚えています。

あなたはpivotとしてDataFrameに転置を表現できる:

val kv = explode(array(df.columns.tail.map { 
    c => struct(lit(c).alias("k"), col(c).alias("v")) 
}: _*)) 

df 
    .withColumn("kv", kv) 
    .select($"segment_id", $"kv.k", $"kv.v") 
    .groupBy($"k") 
    .pivot("segment_id") 
    .agg(first($"v")) 
    .orderBy($"k") 
    .withColumnRenamed("k", "vals") 

が、それは単に無実用的なアプリケーションとのおもちゃのコードです。実際には、データの収集よりも良いではありません。

として定義 DataFrameについては
val (header, data) = df.collect.map(_.toSeq.toArray).transpose match { 
    case Array(h, t @ _*) => { 
    (h.map(_.toString), t.map(_.collect { case x: Int => x })) 
    } 
} 

val rows = df.columns.tail.zip(data).map { case (x, ys) => Row.fromSeq(x +: ys) } 
val schema = StructType(
    StructField("vals", StringType) +: header.map(StructField(_, IntegerType)) 
) 

spark.createDataFrame(sc.parallelize(rows), schema) 

:あなたの場合は言われていること

+----+---+---+---+---+ 
|vals| 1| 2| 3| 4| 
+----+---+---+---+---+ 
|val1|100| 0| 0| 0| 
|val2| 0| 50| 0| 0| 
|val3| 0| 0| 0| 0| 
|val4| 0| 0| 0| 0| 
|val5| 0| 20| 0| 0| 
|val6| 0| 0| 0| 0| 
+----+---+---+---+---+ 

val df = Seq(
    (1, 100, 0, 0, 0, 0, 0), 
    (2, 0, 50, 0, 0, 20, 0), 
    (3, 0, 0, 0, 0, 0, 0), 
    (4, 0, 0, 0, 0, 0, 0) 
).toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6") 

の両方が、あなたはあなたの望ましい結果を与えるだろう分散されたデータ構造上で効率的な転置が必要です。あなたは別の場所を見なければなりません。コアCoordinateMatrixBlockMatrixを含む多くの構造体があり、両方の次元にまたがってデータを分散することができ、転置することができます。

関連する問題