2017-11-13 16 views
0

私は、各レコードはintでRDD、持っている:私がする必要があるのは、バッチにこのRDDを分割されスパーク。バッチに分割RDD

[0,1,2,3,4,5,6,7,8] 

を。私。

  1. 使用ZipWithIndex:

    [[0,1,2], [3,4,5], [6,7,8]] 
    

    これは、しかし、私は数日の最後に困惑していますし、次のソリューションを除いて何かを見つけることができない、些細な音:各要素は、要素の固定サイズのリストである他のRDDを作りますRDDのレコードを列挙しますマップを()を使用して、このRDDオーバー

    [0,1,2,3,4,5] -> [(0, 0),(1, 1),(2, 2),(3, 3),(4, 4),(5, 5)]

  2. 反復とINDEを計算X生成されたインデックスによってindex = int(index/batchSize)

    [1,2,3,4,5,6] -> [(0, 0),(0, 1),(0, 2),(1, 3),(1, 4),(1, 5)]

  3. そして基等が挙げられます。

    [(0, [0,1,2]), (1, [3,4,5])]

これは、しかし、私はここでグループを使用したくない、私は必要なものを私に取得します。プレーン・マップ・リデュースやApache Crunchのような抽象化を使用しているときは、それは自明です。しかし、重いグループを使わずにSparkで同様の結果を出す方法はありますか?

+0

あなたがa)は複数のフィルタを適用することができます説明。 b)カスタムパーティションを使用し、各パーティションからRDDを作成します。私はあなたが固定サイズのRDDが必要な理由を想像することはできません。 – khachik

+0

@khachik「複数のフィルタを適用する」と「カスタムパーティショナーを実装する」を詳しく教えてください。固定サイズのRDDは必要ありません。 RDDの各レコードはレコードの配列(バッチ)にする必要があります。これは、単一のレコードではなくレコードのバッチを消費し、予測のバッチを返す数学モデルを持っているために必要です。 – Dmitry

答えて

0

固定サイズのRDDが必要な理由を明確に説明していませんでした。何を達成しようとしているのかによって、より良い解決策が得られるかもしれませんが、質問に答えるために、次のオプションが表示されます。
1)アイテム数とバッチサイズに基づいてフィルタを実装します。たとえば、元のRDDに1000個の項目があり、10個のバッチに分割したい場合は、10個のフィルタを適用して終了します。最初のものは、インデックスが[0,99]、2番目の[100,199]と1つ。各フィルタを適用すると、1つのRDDが作成されます。元のRDDはフィルタリングの前にキャッシュされる可能性があることに注意してください。利点:結果として得られる各RDDは別々に処理することができ、1つのノードに完全に割り当てる必要はありません。短所:このアプローチは、バッチの数によって遅くなります。
2)これと似ていますが、フィルタの代わりに、ここで説明するようにインデックス(キー)に基づいてパーティションIDを返すカスタムパーティションを実装するだけです:Custom partitioner for equally sized partitions。長所:フィルターよりも速い短所:各パーティションは1つのノードに収まる必要があります。
3元RDDでの順序は重要ではなく、ちょうどそれが大体均等にチャンクする必要がある場合)には、中/再分配を合体することができ、ここでhttps://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-partitions.html

+0

詳細な説明をお寄せいただきありがとうございます。私はあなたに多くの背景を提供しましょう。私は入力として1Kレコードを取り、魔法をかけて同じ数のレコードを返すマシン学習モデルをいくつか持っています。私がする必要があるのは、このモデルを使って自分のRDD内のすべてのレコードに「スコアを付ける」ことだけです。ですから、基本的にこれが元のRDDをチャンクに分割する必要がある理由です。各チャンクには1000個以下のレコードが含まれている必要があります。 Sparkのパラダイムに適合しないものですか?私はCrunch/MapReduceを中心に取り組んできましたが、そのような問題はありません。前もって感謝します – Dmitry