2016-05-30 1 views
1

問題文のスパーク - 出力によってグループからCompactBufferを取り外します(RDD)

RDD

入力

Header1^Header2 
A^4B 
A^11A 
B^7A 
C^6DF 
C^7DS 

をグループ化した後(CompactBufferを削除)スパーク出力をフォーマットする必要が所望の出力

私は私の解決策では

val records = sc.textFIle("/user/chronicles/test.txt").map(x => { 
    val y = x.split("\\^",-1) 
    (y(0).trim(), 
    y(1).trim()) 
    }).groupBy(x => x._1) 

records.foreach(println) 

出力

(A,CompactBuffer((4B,11A))) 
(B,CompactBuffer((7A))) 
(C,CompactBuffer((6DF,7DS))) 

を試してみました、私はforeachのを使用して各要素を読み取ることによって、 "CompactBuffer" を削除してから、言葉に置き換えると、余分なことができますどのよう

置き換えコマンドを使用しているシンボル

他の方法がありますか?データ。

: 私が続いている: "how to remove compactbuffer in spark output" - この場合にはdidntの仕事をmkString

答えて

2

私が正しくあなたの質問を理解していれば、ここに行く:

val data = sc.parallelize(Seq("Header1^Header2", "A^4B", "A^11A", "B^7A", "C^6DF", "C^7DS")) 
      .map(x => { 
       val y = x.split("\\^", -1) 
      (y(0).trim(), y(1).trim()) 
      }).groupBy(x => x._1).mapValues(_.map(_._2).mkString("(",",",")")) 

data.collect.foreach(println) 
// (A,(4B,11A)) 
// (B,(7A)) 
// (C,(6DF,7DS)) 
// (Header1,(Header2)) 

ヘッダを削除するには、フィルターを使用することができます。これがここの質問であるかどうかは分かりません。もしそうなら、私はそれを訂正できるようにコメントしてください。

+1

ありがとう@eliasah - それは働いた:) 追加 - mapValues(_。map(_._ 2).mkString( "("、 "、"、 ")"))私の既存のコードに。 – Debaditya

関連する問題