したがって、RDDはRDD[ListBuffer[(Array[String], Long)]]
です。簡単にするため、RDD[X]
と呼びます。X
はいくつかの変数のリストです。 X
は、obj
オブジェクトのリストです(X[obj]
)。RDD [ListBuffer]のListBufferからオブジェクトを取り除き、RDDに新しいエントリを作成するスパーク変換
私はRDD[X]
を入力とし、変換を新しいRDD[X]
として出力する関数を欲しいということです。この変換ではX
という新しいリストが作成され、X
からobj
を取り出して新しいリストを作成し、それをRDDに「追加」します。
これを直接サポートするSparkには何も見つかりませんでした。今私が考えることができる唯一の解決策は、collect()
を実行し、ドライバのこのほとんどを管理することですが、これは明らかに良くありません。何か案は?
基本的にはこのような何か:(ListBuffer1中)
要素1
エレメント2(中:7つの要素の1 ListBufferが含まれてい
val data = RDD[ListBuffer[(Array[String], Long)]] // some transformation that calls some function // what will happen is some (Array[String], Long) will be moved into an entirely new ListBuffer in outData while some may be completely removed val outData = RDD[ListBuffer[(Array[String], Long)]]
は、我々が開始RDDを持っている場合としましょうListBuffer1)
Element3(ListBuffer1)
(ListBuffer1で)Element4
(ListBuffer1で)エレメント5(ListBuffer1で)
Element6(ListBuffer1で)
Element7
および変換後のRDDがあります以下の内容:
Element1(ListBuffer1)
(ListBuffer1中)エレメント2(ListBuffer2中)
Element4(ListBuffer2中)
エレメント5(ListBuffer2中)
Element6
いくつかの要素は、新しいに移動されましたListBufferはRDDにあり、2つの要素は完全に削除されました。
私はSpark 1.6.0を使用しています。
Seqに含まれるリストの数がわからない場合はどうすればよいですか?私の場合、これは非常に1から数千の間です。 – osk
私はrdd.map(s => func(s))を実行すると言うことができます。flatMap(s => s)と関数は1つを複数のsに分割します。それは働くだろうか?たとえば、funcが1つのSeqを入力として3 SeqのListを返した場合(1つのSeqが3つに分割されていた) – osk
私はそれをうまく動作させるでしょう。 – nabongs