9

スパークに関する物理的な計画を理解しようとしていますが、従来のrdbmsとは異なるように見えるため、一部の部分を理解していません。たとえば、以下のこのプランでは、ハイブテーブルに対するクエリについての計画です。私は計画に理解よ何のためスパーク物理計画を理解する

select 
     l_returnflag, 
     l_linestatus, 
     sum(l_quantity) as sum_qty, 
     sum(l_extendedprice) as sum_base_price, 
     sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, 
     sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, 
     avg(l_quantity) as avg_qty, 
     avg(l_extendedprice) as avg_price, 
     avg(l_discount) as avg_disc, 
     count(*) as count_order 
    from 
     lineitem 
    where 
     l_shipdate <= '1998-09-16' 
    group by 
     l_returnflag, 
     l_linestatus 
    order by 
     l_returnflag, 
     l_linestatus; 


== Physical Plan == 
Sort [l_returnflag#35 ASC,l_linestatus#36 ASC], true, 0 
+- ConvertToUnsafe 
    +- Exchange rangepartitioning(l_returnflag#35 ASC,l_linestatus#36 ASC,200), None 
     +- ConvertToSafe 
     +- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Final,isDistinct=false),(sum(l_extendedpr#32),mode=Final,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Final,isDistinct=false),(sum(((l_extendedprice#32 * (1.0l_discount#33)) * (1.0 + l_tax#34))),mode=Final,isDistinct=false),(avg(l_quantity#31),mode=Final,isDistinct=false),(avg(l_extendedprice#32),mode=Fl,isDistinct=false),(avg(l_discount#33),mode=Final,isDistinct=false),(count(1),mode=Final,isDistinct=false)], output=[l_returnflag#35,l_linestatus,sum_qty#0,sum_base_price#1,sum_disc_price#2,sum_charge#3,avg_qty#4,avg_price#5,avg_disc#6,count_order#7L]) 
      +- TungstenExchange hashpartitioning(l_returnflag#35,l_linestatus#36,200), None 
       +- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Partial,isDistinct=false),(sum(l_exdedprice#32),mode=Partial,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Partial,isDistinct=false),(sum(((l_extendedpri32 * (1.0 - l_discount#33)) * (1.0 + l_tax#34))),mode=Partial,isDistinct=false),(avg(l_quantity#31),mode=Partial,isDistinct=false),(avg(l_extendedce#32),mode=Partial,isDistinct=false),(avg(l_discount#33),mode=Partial,isDistinct=false),(count(1),mode=Partial,isDistinct=false)], output=[l_retulag#35,l_linestatus#36,sum#64,sum#65,sum#66,sum#67,sum#68,count#69L,sum#70,count#71L,sum#72,count#73L,count#74L]) 
        +- Project [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_returnflag#35] 
        +- Filter (l_shipdate#37 <= 1998-09-16) 
         +- HiveTableScan [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_shipdate#37,l_returnflag#35], astoreRelation default, lineitem, None 

です::クエリは、この第一は、ハイブのテーブルスキャン

  • で始まる

    1. そして、それがどこ条件

      使用してフィルタ
    2. 次に、私たちが望む列を得るためにプロジェクト

    3. 次にTungstenAggregate?

    4. 次にTungstenExchange?

    5. さらにTungstenAggregate?

    6. ConvertToSafe?

    7. は、その後、最終的な結果

    をソートしかし、私は4、5、6および7の手順を理解していませんよ。彼らが何であるか知っていますか?私はこれについての情報を探しているので、私は計画を理解することができますが、具体的なものは何も見つかりません。

  • 答えて

    14

    使用するSQLクエリの構造を見てみましょう:あなたは既に容疑者として

    SELECT 
        ... -- not aggregated columns #1 
        ... -- aggregated columns  #2 
    FROM 
        ...       -- #3 
    WHERE 
        ...       -- #4 
    GROUP BY 
        ...       -- #5 
    ORDER BY 
        ...       -- #6 
    

    • Filter (...)WHERE句(#4
    • Project ...限度の数の述語に対応(#1#2、および#4/に存在ない場合)
    • HiveTableScanFROM句(#3)以下のように帰属することができ

    残りの部分に対応する:

    • #2SELECT句から - functionsフィールドTungstenAggregates
    • GROUP BY句(#4):

      • TungstenExchange /ハッシュ分割
      • keyフィールドTungstenAggregates
    • #6で - ORDER BY句。 sun.misc.Unsafe

      • 明示的なメモリ管理:を含む - (sets)一般に

      プロジェクトタングステンは、スパークDataFramesによって使用される最適化のセットを記述する。これは、「ネイティブ」(オフヒープ)メモリ使用と、GC管理外の明示的なメモリ割り当て/解放を意味します。これらの変換は、実行計画のステップでConvertToUnsafe/ConvertToSafeに対応します。 Understanding sun.misc.Unsafe

    • コード生成 - コンパイル時に最適化されたコードを生成するように設計されたさまざまなメタプログラミングのテクニックがあります。素敵な機能コードを醜いループに書き換えるようなことをする内部のSparkコンパイラと考えることができます。

    タングステンについての詳細は、Project Tungsten: Bringing Apache Spark Closer to Bare Metalから一般的に知ることができます。 Apache Spark 2.0: Faster, Easier, and Smarterは、コード生成のいくつかの例を提供します。

    TungstenAggregateは、データがシャッフルされ、最後にマージされるよりも最初に各パーティションでローカルに集約されるため、2回発生します。 RDD APIに精通している場合、このプロセスはおおよそreduceByKeyに相当します。

    実行計画が明確でない場合は、結果としてDataFrameRDDに変換して、toDebugStringの出力を分析することもできます。

    +0

    あなたの答えをありがとう。私はちょうどこの部分 "SELECT句からの#2 - TungstenAggregatesの関数フィールド"をはっきり理解できませんでした。あなたがうまく説明することができればうれしいよ! – codin

    +0

    'Functions'フィールドは、指定されたステージで実行されるすべての集計をリストし、' Key'フィールドはグループ化を示します。 'df.groupBy(* key).agg(* functions)'です。 – zero323

    1

    タングステンは、JVM以外のデータを管理してGCオーバーヘッドを節約する、1.4以降のSparkの新しいメモリエンジンです。 JVMとの間でデータをコピーすることを含むことを想像することができます。それでおしまい。 Spark 1.5では、タングステンをspark.sql.tungsten.enabledにすることができます。そして、あなたは "古い"計画を見るでしょう.Spark 1.6では、もうこれを無効にすることはできません。

    関連する問題