2016-08-02 3 views
4

私は多くの画像を読んでいます。私は開発のためにそれらの小さなサブセットで作業したいと思います。その結果、私はは、それが起こる作ることができる方法を理解しようとしています:大きなRDDの制限に向けて

In [1]: d = sqlContext.read.parquet('foo') 
In [2]: d.map(lambda x: x.photo_id).first() 
Out[2]: u'28605' 

In [3]: d.limit(1).map(lambda x: x.photo_id) 
Out[3]: PythonRDD[31] at RDD at PythonRDD.scala:43 

In [4]: d.limit(1).map(lambda x: x.photo_id).first() 
// still running... 

何が起こっているか..so? limit()は、私たちが持っていたものよりもはるかに速く動くと期待していますが、それはそうではありません*です。明らかに、私は何か欠けておりますので、私は、私の理解を説明し、私を修正してください下記

  1. dはペアのRDD(私はそのスキーマから知っている)であり、私はと を言っています

    i)すべてのペア(xという名前が付けられ、photo_id属性を返します)。我々はそれが$をどのように動作するか私はわからないfirst()方法を、適用される新しい(匿名)RDD、になりますが、私はその匿名RDDの最初の要素を与える必要があります

    II)。 [3]

  2. 、我々は、dもかかわらず 多くの要素を有することを意味する、1にd RDDを制限のみ1を使用してのみ1つの 要素にマッピング関数を適用します。 Out [3]は、マッピングによって作成されたRDDである必要があります。モニターを見た後、予想通り[4]

  3. 、私は[3]のロジックに従うと、一つだけと限られたRDDの要素のみを印刷することを期待する...

、[ 4]他はありませんが、データセット全体を処理するように見えるので、私が正しくlimit()を使用していないようだ、またはそれがないことを私は何を探しています:

enter image description here


編集:

tiny_d = d.limit(1).map(lambda x: x.photo_id) 
tiny_d.map(lambda x: x.photo_id).first() 

説明hereとして、それが実際にどのアクション、単に変換を行うことはありませんPipelinedRDDを与える最初。

しかし、2行目でもデータセット全体が処理されます(実際には、タスクの数は以前と同じ数になります)。ために、名前の、

$ 私はドキュメントでそれを見つけることができませんでした.. [4]がまだ実行されている間


* [2]は、即座に実行され、> 3時間が経過しました。コードに基づい

+0

私は実行時間についてはわかりませんが、 'sample()'は複数のデータポイントを提供します。 'first()'は、わかるように、最初のレコードを与えるだけです。 –

+0

@ cricket_007 Sparkの「第1の記録」は何を意味していますか?おそらく、それを決定するためにデータセット全体を処理する必要があるかもしれません...しかし、それは '[3]'が即座に実行された理由を説明しません。 'sample()'については、[this](http://stackoverflow.com/questions/24806084/sampling-a-large-distributed-data-set-using-pyspark-spark)のようなものを覚えていますか? – gsamaras

+0

'first()'は主に 'take(1)'のショートカットで、 '' take() ']を読むことができます(https://spark.apache.org/docs/latest/api/python/pyspark .html#pyspark.RDD.take)は意味します。そして確かに、そのリンクはあなたが「小さなサブセット」と言っていたからです。はい1つの要素がサブセットですが、それ以上のものが必要な場合もあります:) –

答えて

3

は、ここでスパーク2.0

​​

に単純なテストケースは実際に、Dataset.firstはDataset.limit(1).collectと同等であるので、2つの物理的な計画を確認していますケース:

scala> df1.map { r => r.getAs[Int](0) }.limit(1).explain 
== Physical Plan == 
CollectLimit 1 
+- *SerializeFromObject [input[0, int, true] AS value#124] 
    +- *MapElements <function1>, obj#123: int 
     +- *DeserializeToObject createexternalrow(x#74, StructField(x,IntegerType,false)), obj#122: org.apache.spark.sql.Row 
     +- Scan ExistingRDD[x#74] 

scala> df2.map { r => r.getAs[Int](0) }.limit(1).explain 
== Physical Plan == 
CollectLimit 1 
+- *SerializeFromObject [input[0, int, true] AS value#131] 
    +- *MapElements <function1>, obj#130: int 
     +- *DeserializeToObject createexternalrow(x#74, StructField(x,IntegerType,false)), obj#129: org.apache.spark.sql.Row 
     +- *GlobalLimit 1 
      +- Exchange SinglePartition 
       +- *LocalLimit 1 
        +- Scan ExistingRDD[x#74] 

最初のケースでは、CollectLimitExec物理演算子の最適化に関連しています。つまり、最初に最初のパーティションをフェッチして、行の制限数を取得します(この場合は1)。満たされていない場合は、目的の制限に達するまでさらにパーティションをフェッチします。したがって、最初のパーティションが空でない場合は、最初のパーティションのみが計算され、フェッチされます。他のパーティションも計算されません。

ただし、2番目のケースでは、以前の制限操作にシャッフル操作が含まれるため、CollectLimitExecの最適化は役に立ちません。すべてのパーティションが計算され、各パーティションでLocalLimit(1)を実行して1行を取得し、すべてのパーティションを1つのパーティションにシャッフルします。 CollectLimitExecは結果の単一パーティションから1行をフェッチします。

+0

ありがとうSun Rui!しかし、何かが私にはまだ分かりません。あなたは 'first'は' limit(1).collect'と同じですが、説明段階では 'limit(1)'だけを使用します、なぜですか? – gsamaras

関連する問題