2016-05-28 14 views
2

たとえば、スパークでいくつかのクエリを実行すると、スパークのUIでいくつかのクエリにシャッフルが増えていることがわかります。このシャッフルはローカルで読み込まれたデータ量エグゼクティブ。スパークでシャッフルされるデータの量に影響するもの

しかし、私は1つのことを理解していません。たとえば、このクエリではHDFSから7GBが読み込まれましたが、読み込み+シャッフル書き込みは10GB以上です。しかし、私はHDFSから7GBもロードする他のクエリを見て、シャッフルは500kbに​​似ています。だから私はこれを理解していない、助けてもらえますか?シャッフルされたデータの量は、hdfsから読み取られたデータには関係ありませんか?

select 
    nation, o_year, sum(amount) as sum_profit 
from 
    (
select 
    n_name as nation, year(o_orderdate) as o_year, 
    l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount 
    from 
     orders o join 
     (select l_extendedprice, l_discount, l_quantity, l_orderkey, n_name, ps_supplycost 
     from part p join 
     (select l_extendedprice, l_discount, l_quantity, l_partkey, l_orderkey, 
       n_name, ps_supplycost 
      from partsupp ps join 
      (select l_suppkey, l_extendedprice, l_discount, l_quantity, l_partkey, 
        l_orderkey, n_name 
      from 
       (select s_suppkey, n_name 
       from nation n join supplier s on n.n_nationkey = s.s_nationkey 
       ) s1 join lineitem l on s1.s_suppkey = l.l_suppkey 
      ) l1 on ps.ps_suppkey = l1.l_suppkey and ps.ps_partkey = l1.l_partkey 
     ) l2 on p.p_name like '%green%' and p.p_partkey = l2.l_partkey 
    ) l3 on o.o_orderkey = l3.l_orderkey 
)profit 
group by nation, o_year 
order by nation, o_year desc; 
+0

a)操作b)構成(例:パーティションの数)c)初期データの分布 – zero323

+0

OPの質問は、入力がGB単位の間にシャッフル読み込みまたは書き換えがそれほど少ない理由についてです。そのような程度のシャッフル読み取り書き込みを決定または制御できる要因は何ですか? –

答えて

2

シャッフルは、パーティション間でグループ分けされるようにデータを再配布するためのSparkのメカニズムです。これには通常、エグゼキュータおよびマシン間でデータをコピーすることが含まれます。したがって、ここではデータをシャッフルしていることは入力データの量にはまったく依存していません。ただし、入力データに対してどのような操作を実行するかによって、エグゼキュータ(したがってマシン)間でデータが移動することになります。シャッフルがコストのかかるプロセスである理由を理解して理解するには、http://spark.apache.org/docs/latest/programming-guide.html#shuffle-operationsをご覧ください。

貼り付けたクエリを見ると、たくさんの結合操作を行っているようです(あなたが行っている究極の操作を理解するために深くは見ていない)。そして、それは間違いなくパーティション間でデータを移動する必要があります。この問題は、問合せを再訪して、入力データを同じように最適化または操作したり、データの移動を少なくするように事前処理したりすることで処理できます(例:結合されたデータを同じパーティションに入れる)。繰り返しますが、これは単なる例であり、ユースケースからあなたに最適なものを判断する必要があります。

2

私はであると考えることを強くお勧めします。

基本的に、シャッフルされるデータの量を決定するのは、HDFS(またはソースが何であれ)のデータ量ではありません。

例シャッフルデータの1金額が入力されたデータよりも小さい:私は3つの例を使って説明しよう

ここ
val wordCounts = words.map((_, 1)).reduceByKey(_ + _) 

私たちはそれぞれに(各キーの)単語の数を数えますその結果をシャッフルするだけです。その後、サブカウントをシャッフルしたら、それらを追加します。したがって、シャッフルするデータの量は、カウントの量に関係します。この場合、ユニークな単語の数に関連しています。

ユニークワードが1つしかない場合は、入力よりもずっと少ないデータがシャッフルされます。実際には、スレッド数と同じくらい多くの数(非常に小さい量)です。

各単語が一意である場合は、さらにデータをシャッフルします(詳細については、記事を参照してください)。この例でシャッフルされたデータの量は、固有のキーの数(固有の単語)に関連しています。

例2。シャッフルされるデータの量は、入力データと同じである:ここでは

val wordCounts = words.map((_, 1)).groupByKey().mapValues(_.size) 

、当社グループのすべての単語一緒に、我々はいくつあるか数えます。それで、すべてのデータをシャッフルする必要があります。

例3。シャッフルされるデータの量は、入力データ以上のものです:

val silly = 
    words.map(word => 
    (word, generateReallyLongString())) 
    .groupByKey() 

ここに私たちのマップステージは言葉によってそれらをすべて一緒に、本当に長いランダムな文字列にし、我々のグループのすべての単語をマッピングします。ここでは、入力より多くのデータを生成し、入力よりも多くのデータをシャッフルします。

関連する問題