2016-03-31 18 views
0

私は、最小のパーティションが64MB未満で、最大のパーティションが1GB以上になるような、歪んだデータの問題に取り組んでいます。私は、いくつかの小さなパーティションを同じパーティションキーにマッピングし、パーティションで構成されるパーティションを作成する戦略を検討してきました。これは、タスクサイズのばらつきやディスクに保存されているファイルの数を減らすためのものです。パーティション分割されたデータを再パーティション化する

私のSparkアプリケーションでは、(グループ化されていない)元のパーティションを操作する必要があります。元のキーで再パーティション分割する必要があります。これは私の質問に私をもたらします:

私は以下のように2つのデータセットがあるとします。各行は(partition_key、(original_key、data))という形式のタプルです。 data0では、original_key = 0はそれ自身のノードにあるのに対して、original_key = 4とoriginal_key = 5はpartition_key = 3を含むノード上で一緒になっています。data1では、物事は整然としていません。

data0がpartition_keyでパーティション分割され、original_keyでパーティション化されている場合、シャッフルが発生しますか?言い換えれば、2番目のパーティションで重要なのですか?data0data1より整理されていますか?

data0 = [ 
    (0, (0, 'a')), 
    (0, (0, 'b')), 
    (0, (0, 'c')), 
    (1, (1, 'd')), 
    (1, (1, 'e')), 
    (1, (2, 'f')), 
    (1, (2, 'g')), 
    (2, (3, 'h')), 
    (2, (3, 'i')), 
    (2, (3, 'j')), 
    (3, (4, 'k')), 
    (3, (4, 'l')), 
    (3, (5, 'm')), 
    (3, (5, 'n')), 
    (3, (5, 'o')), 
] 

data1 = [ 
    (0, (0, 'a')), 
    (1, (0, 'b')), 
    (0, (0, 'c')), 
    (1, (1, 'd')), 
    (2, (1, 'e')), 
    (1, (2, 'f')), 
    (3, (2, 'g')), 
    (2, (3, 'h')), 
    (0, (3, 'i')), 
    (3, (3, 'j')), 
    (3, (4, 'k')), 
    (3, (4, 'l')), 
    (1, (5, 'm')), 
    (2, (5, 'n')), 
    (3, (5, 'o')), 
] 

rdd0 = sc.parallelize(data0, 3).cache() 
partitioned0 = rdd0.partitionBy(4) 
partitioned0.map(lambda row: (row[1][0], row[1])).partitionBy(6).collect() 

rdd1 = sc.parallelize(data1, 3).cache() 
partitioned1 = rdd1.partitionBy(4) 
partitioned1.map(lambda row: (row[1][0], row[1])).partitionBy(6).collect() 
+0

私は考えが分かればわかりません。だから、あなたは実際にパーティションを大きくしようとしているのですか?おそらく実際には大きなパーティションの再配布に焦点を当てるほうが意味があります。手元にある問題について、新しいキー、パーティション、パーティションの数が同じであれば、データを移動しないでください。したがって、「より組織化された」という意味では、シャッフルするデータの量が減ります。 – zero323

答えて

0

元RDDに基づいてどのくらいのデータのシャッフルのget。 で再パーティションシャッフルキックを呼び出します。

注:sc.parallelize(data0,3)を実行すると、3は単なるガイドラインです。デフォルトのパーティションが< = 3の場合、rdd0には3つのパーティションがあります。あなたのdata0がより多くのHDFS上にある場合、パーティション番号を提供するブロックは効果がありません。