2

私は(String, String, String)というデータセットを持っています。これは約6GBです。データセットを解析した後、私はgroupbyを使って(element => element._2)を使用し、RDD[(String, Iterable[String, String, String])]を得ました。次にforeach要素がgroupbytoListであり、DataFrameに変換しています。Iterable [String、String、String]をDataFrameに変換するには?

val dataFrame = groupbyElement._2.toList.toDF() 

しかし、パーケットファイル形式でデータを保存するには膨大な時間がかかります。 効率的な方法はありますか? N.B.私は5つのノードクラスタを持っています。各ノードには28 GBのRAMと4つのコアがあります。私はスタンドアロンモードを使用しており、各エグゼキュータに16 GBのRAMを与えています。

+0

なぜRDD APIを使用するのですか?最初からDataset APIを使用してデータセットを扱うのはなぜですか? –

+0

実際には、データセットの解析後に(String、String、String)のRDDを取得しました。 –

+0

データセット(すべて小文字)!= aデータセット( 'D'は大文字)。あなたはデータセットを解析した後に 'RDD [(String、String、String)]'を持っていますが、Spark SQLのDataset APIを使ってデータセットを解析しないのはなぜですか?なぜSpark CoreのRDD APIを使うのですか? –

答えて

2

RDDの代わりにdataframe/datasetメソッドを使用できます。代わりにあなたが直接データフレームを取得するためのメソッドを利用することができRDDにデータを読み込むので、さらに

val spark = SparkSession.builder.getOrCreate() 
import spark.implicits._ 

val df = Seq(
    ("ABC", "123", "a"), 
    ("ABC", "321", "b"), 
    ("BCA", "123", "c")).toDF("Col1", "Col2", "Col3") 
scala> df.show 
+----+----+----+ 
|Col1|Col2|Col3| 
+----+----+----+ 
| ABC| 123| a| 
| ABC| 321| b| 
| BCA| 123| c| 
+----+----+----+ 

val df2 = df 
    .groupBy($"Col2") 
    .agg(
    collect_list($"Col1") as "Col1_list"), 
    collect_list($"Col3") as "Col3_list")) 
scala> df2.show 
+----+----------+---------+ 
|Col2| Col1_list|Col3_list| 
+----+----------+---------+ 
| 123|[ABC, BCA]| [a, c]| 
| 321|  [ABC]|  [b]| 
+----+----------+---------+ 

:それはこのような何かを見ることができます。

関連する問題