2016-04-23 4 views
1

行が約30のメンバー(int、double、およびstring)を含むDataFrameを作成する必要があります。私がやったことはデータフレームの1行を作成することでしたし、それが動作します:私は、配列の内部タプルに要素を追加しようとしたときSparkで多くのフィールドの行のデータフレームを作成する

var res_df = sc.parallelize(Seq((
    results_combine(0), 
    results_combine(1), 
    results_combine(2), 
    results_combine(3), 
    results_combine(4), 
    results_combine(5), 
    results_combine(6), 
    results_combine(7), 
    results_combine(8), 
    results_combine(9), 
    results_combine(10) 
))).toDF("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k") 

は、しかし、私はので、22要素の制限のエラーを得ました。これどうやってするの?

答えて

0

おそらく最も簡単な方法は、ちょうどuse case classes to define the contents of your rowsです。 SparkContext scとHIveContext hiveContextを推定することは、すでに確立されており、いくつかの醜いログメッセージを省略...

scala> case class Alphabet (
    | a : Int = 1, 
    | b : Int = 2, 
    | c : Int = 3, 
    | d : Int = 4, 
    | e : Int = 5, 
    | f : Int = 6, 
    | g : Int = 7, 
    | h : Int = 8, 
    | i : Int = 9, 
    | j : Int = 10, 
    | k : Int = 11, 
    | l : Int = 12, 
    | m : Int = 13, 
    | n : Int = 14, 
    | o : Int = 15, 
    | p : Int = 16, 
    | q : Int = 17, 
    | r : Int = 18, 
    | s : Int = 19, 
    | t : Int = 20, 
    | u : Int = 21, 
    | v : Int = 22, 
    | w : Int = 23, 
    | x : Int = 24, 
    | y : Int = 25, 
    | z : Int = 26 
    |) 
defined class Alphabet 

scala> val rdd = sc.parallelize(Seq(new Alphabet())) 
rdd: org.apache.spark.rdd.RDD[Alphabet] = ParallelCollectionRDD[1] at parallelize at <console>:16 

scala> import hiveContext.implicits._ 
import hiveContext.implicits._ 

scala> val df = rdd.toDF() 
df: org.apache.spark.sql.DataFrame = [a: int, b: int, c: int, d: int, e: int, f: int, g: int, h: int, i: int, j: int, k: int, l: int, m: int, n: int, o: int, p: int, q: int, r: int, s: int, t: int, u: int, v: int, w: int, x: int, y: int, z: int] 

scala> df.show() 
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ 
| a| b| c| d| e| f| g| h| i| j| k| l| m| n| o| p| q| r| s| t| u| v| w| x| y| z| 
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ 
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20| 21| 22| 23| 24| 25| 26| 
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ 

別のアプローチは、Sparkのexplicit Row and schema definition APIsを使用することです。

+0

答えをありがとう。しかし、あなたのアプローチの問題は、Scala 2.11では22フィールド以上のケースクラスを定義できますが、Apache Sparkが使用するバージョンであるバージョン2.10では定義できないということです。したがって、スパークシェルでは、そのケースクラスを定義しようとしている: "エラー:実装の制限:ケースクラスは22以上のパラメータを持つことはできません。 –

+0

22個以上の要素を持つケースクラスはscala 2.10ではサポートされていません。OPがそのバージョンのscalaを使用していると思います – eliasah

+0

SparkライブラリはScala 2.11で利用可能でサポートされていますので、 2.10そうではありません! –

4

explicit Row and schema definition APIsを使用した例です。

(mildy)厄介な部分がスキーマオブジェクトを設定しています。 StructFieldおよびStructTypeを参照してください。

これはScala 2.10.xでうまく動作します。

scala> import org.apache.spark.sql.{DataFrame,Row} 
import org.apache.spark.sql.{DataFrame, Row} 

scala> import org.apache.spark.sql.types._ 
import org.apache.spark.sql.types._ 

scala> val alphabet = ('a' to 'z').map(_ + "") // for column labels 
alphabet: scala.collection.immutable.IndexedSeq[String] = Vector(a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p, q, r, s, t, u, v, w, x, y, z) 

scala> val row1 = Row(1 to 26 : _*) 
row1: org.apache.spark.sql.Row = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26] 

scala> val row2 = Row(26 to 1 by -1 : _*) 
row2: org.apache.spark.sql.Row = [26,25,24,23,22,21,20,19,18,17,16,15,14,13,12,11,10,9,8,7,6,5,4,3,2,1] 

scala> val schema = StructType(alphabet.map(label => StructField(label, IntegerType, false))) 
schema: org.apache.spark.sql.types.StructType = StructType(StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false), StructField(d,IntegerType,false), StructField(e,IntegerType,false), StructField(f,IntegerType,false), StructField(g,IntegerType,false), StructField(h,IntegerType,false), StructField(i,IntegerType,false), StructField(j,IntegerType,false), StructField(k,IntegerType,false), StructField(l,IntegerType,false), StructField(m,IntegerType,false), StructField(n,IntegerType,false), StructField(o,IntegerType,false), StructField(p,IntegerType,false), StructField(q,IntegerType,false), StructField(r,IntegerType,false), StructField(s,IntegerType,false), StructField(t,IntegerType,false), StructField(u,IntegerType,false), StructField(v,IntegerTyp... 

scala> val rdd = hiveContext.sparkContext.parallelize(Seq(row1, row2)) 
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[5] at parallelize at <console>:23 

scala> val df = hiveContext.createDataFrame(rdd, schema) 
df: org.apache.spark.sql.DataFrame = [a: int, b: int, c: int, d: int, e: int, f: int, g: int, h: int, i: int, j: int, k: int, l: int, m: int, n: int, o: int, p: int, q: int, r: int, s: int, t: int, u: int, v: int, w: int, x: int, y: int, z: int] 

scala> df.show() 
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ 
| a| b| c| d| e| f| g| h| i| j| k| l| m| n| o| p| q| r| s| t| u| v| w| x| y| z| 
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ 
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20| 21| 22| 23| 24| 25| 26| 
| 26| 25| 24| 23| 22| 21| 20| 19| 18| 17| 16| 15| 14| 13| 12| 11| 10| 9| 8| 7| 6| 5| 4| 3| 2| 1| 
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ 
1

ここでタプルのSeqを取り、それに基づいてスキーマを構築し、迅速かつ汚い機能です。最初の行のデータでフィールド名とzipを使用するという考え方です。この関数はデータの型を使用して正しいStructFieldを構築します。

def toStructType(schema: Seq[(String,Any)]) : StructType = { 
    StructType(schema.map(v => { 
    StructField(v._1, v._2 match { 
     case i: Int => IntegerType 
     case l: Long => LongType 
     case s: String => StringType 
     case d: Double => DoubleType 
     case f: Float => FloatType 
     case x => StringType 
    }) 
    })) 
} 

var pseudoSchema = Seq[(String,Any)](("test", 123)) 

toStructType(pseudoSchema) 
// res17: org.apache.spark.sql.types.StructType = StructType(StructField(test,IntegerType,true)) 

私はおそらくいくつかの種類が不足しているんだけど、あなたのアイデアを得ます。以下はあなたにa-zと名付けられた26列を得て、

toStructType(('a' to 'z').map(_.toString).map((_,1))) 
関連する問題