2017-12-07 4 views
0

したがって、RDDはRDD[ListBuffer[(Array[String], Long)]]です。簡単にするため、RDD[X]と呼びます。Xはいくつかの変数のリストです。 Xは、objオブジェクトのリストです(X[obj])。RDD [ListBuffer]のListBufferからオブジェクトを取り除き、RDDに新しいエントリを作成するスパーク変換

私はRDD[X]を入力とし、変換を新しいRDD[X]として出力する関数を欲しいということです。この変換ではXという新しいリストが作成され、Xからobjを取り出して新しいリストを作成し、それをRDDに「追加」します。

これを直接サポートするSparkには何も見つかりませんでした。今私が考えることができる唯一の解決策は、collect()を実行し、ドライバのこのほとんどを管理することですが、これは明らかに良くありません。何か案は?

基本的にはこのような何か:(ListBuffer1中)

要素1

エレメント2(中:7つの要素の1 ListBufferが含まれてい

val data = RDD[ListBuffer[(Array[String], Long)]] 
// some transformation that calls some function 
// what will happen is some (Array[String], Long) will be moved into an entirely new ListBuffer in outData while some may be completely removed 
val outData = RDD[ListBuffer[(Array[String], Long)]] 

は、我々が開始RDDを持っている場合としましょうListBuffer1)

Element3(ListBuffer1)

(ListBuffer1で)Element4

(ListBuffer1で)エレメント5(ListBuffer1で)

Element6(ListBuffer1で)

Element7

および変換後のRDDがあります以下の内容:

Element1(ListBuffer1)

(ListBuffer1中)エレメント2(ListBuffer2中)

Element4(ListBuffer2中)

エレメント5(ListBuffer2中)

Element6

いくつかの要素は、新しいに移動されましたListBufferはRDDにあり、2つの要素は完全に削除されました。

私はSpark 1.6.0を使用しています。

答えて

1

あなたはListBufferListが、その後RDDにflatMapを行うとしましょうListBufferのコレクションに各ListBufferの変換を行うことができます。

以下はダミーPOCです。

val rdd = spark.sparkContext.parallelize(Seq(List(1,2,3,4), List(11,22,76,44))) 
val flattenRdd = rdd.map(s => List(s.filter(_%2 == 1), s.filter(_%2 == 0))) 
    .flatMap(s => s) 
flattenRdd.collect().foreach(s => println(s.mkString(","))) 

1,3 
2,4 
11 
22,76,44 
+0

Seqに含まれるリストの数がわからない場合はどうすればよいですか?私の場合、これは非常に1から数千の間です。 – osk

+0

私はrdd.map(s => func(s))を実行すると言うことができます。flatMap(s => s)と関数は1つを複数のsに分割します。それは働くだろうか?たとえば、funcが1つのSeqを入力として3 SeqのListを返した場合(1つのSeqが3つに分割されていた) – osk

+1

私はそれをうまく動作させるでしょう。 – nabongs

関連する問題