私は多くの画像を読んでいます。私は開発のためにそれらの小さなサブセットで作業したいと思います。その結果、私はsparkとpythonは、それが起こる作ることができる方法を理解しようとしています:大きなRDDの制限に向けて
In [1]: d = sqlContext.read.parquet('foo')
In [2]: d.map(lambda x: x.photo_id).first()
Out[2]: u'28605'
In [3]: d.limit(1).map(lambda x: x.photo_id)
Out[3]: PythonRDD[31] at RDD at PythonRDD.scala:43
In [4]: d.limit(1).map(lambda x: x.photo_id).first()
// still running...
何が起こっているか..so? limit()は、私たちが持っていたものよりもはるかに速く動くと期待していますが、それはそうではありません*です。明らかに、私は何か欠けておりますので、私は、私の理解を説明し、私を修正してください下記
:
d
はペアのRDD(私はそのスキーマから知っている)であり、私はと を言っていますi)すべてのペア(
x
という名前が付けられ、photo_id
属性を返します)。我々はそれが$をどのように動作するか私はわからないfirst()
方法を、適用される新しい(匿名)RDD、になりますが、私はその匿名RDDの最初の要素を与える必要がありますII)。
[3]
で、我々は、
d
もかかわらず 多くの要素を有することを意味する、1にd
RDDを制限のみ1を使用してのみ1つの 要素にマッピング関数を適用します。Out [3]
は、マッピングによって作成されたRDDである必要があります。モニターを見た後、予想通り[4]
で- 、私は
[3]
のロジックに従うと、一つだけと限られたRDDの要素のみを印刷することを期待する...
、[ 4]他はありませんが、データセット全体を処理するように見えるので、私が正しくlimit()
を使用していないようだ、またはそれがないことを私は何を探しています:
編集:
tiny_d = d.limit(1).map(lambda x: x.photo_id)
tiny_d.map(lambda x: x.photo_id).first()
説明hereとして、それが実際にどのアクション、単に変換を行うことはありませんPipelinedRDD
を与える最初。
しかし、2行目でもデータセット全体が処理されます(実際には、タスクの数は以前と同じ数になります)。ために、名前の、
$ 私はドキュメントでそれを見つけることができませんでした.. [4]がまだ実行されている間
* [2]は、即座に実行され、> 3時間が経過しました。コードに基づい
私は実行時間についてはわかりませんが、 'sample()'は複数のデータポイントを提供します。 'first()'は、わかるように、最初のレコードを与えるだけです。 –
@ cricket_007 Sparkの「第1の記録」は何を意味していますか?おそらく、それを決定するためにデータセット全体を処理する必要があるかもしれません...しかし、それは '[3]'が即座に実行された理由を説明しません。 'sample()'については、[this](http://stackoverflow.com/questions/24806084/sampling-a-large-distributed-data-set-using-pyspark-spark)のようなものを覚えていますか? – gsamaras
'first()'は主に 'take(1)'のショートカットで、 '' take() ']を読むことができます(https://spark.apache.org/docs/latest/api/python/pyspark .html#pyspark.RDD.take)は意味します。そして確かに、そのリンクはあなたが「小さなサブセット」と言っていたからです。はい1つの要素がサブセットですが、それ以上のものが必要な場合もあります:) –