2016-03-22 12 views
2

私は現在、Scalaでsparkを使用する方法を学び始めています。私が取り組んでいる問題は、ファイルを読み込み、特定の文字の各行を分割し、列の1つが述語と一致する行をフィルタリングし、最後に列を削除する必要があります。したがって、基本的で素朴な実装はマップであり、次にフィルターと別のマップです。ScalaはSparkで非効率的に収集しますか?

これはコレクションを3回通過することを意味し、それは私には非常に不合理なようでした。そこで私はそれらを1つの収集(引数として部分的な機能を持つ収集)に置き換えようとしました。私の驚いたことに、これはずっと遅く走った。私は通常のScalaコレクションでローカルに試しました。後者の方がはるかに高速です。

これはなぜですか?私の考えは、マップとフィルタとマップが順番に適用されるのではなく、むしろ1つの操作に混在するということです。言い換えれば、アクションが評価されると、リストの各要素がチェックされ、保留中の操作が実行されます。そうですか?しかし、そうであっても、なぜコレクターはそれほどひどくパフォーマンスするのでしょうか?

編集:私は何をしたいか示すためのコード例:

素朴な方法:

sc.textFile(...).map(l => { 
    val s = l.split(" ") 
    (s(0), s(1)) 
}).filter(_._2.contains("hello")).map(_._1) 

収集方法:

sc.textFile(...).collect { 
    case s if(s.split(" ")(0).contains("hello")) => s(0) 
} 
+0

"ノン・ナイーブ"な方法で正確に何を保存したのかを理解しようとしています。 2番目のアプローチでは、どちらのリソースがあまり使用されないと思いますか? RDDを収集しているため、データ全体を強制的に1台のマシンに移動させます。通常は、パフォーマンスペナルティまたはOOM例外が発生します。 –

+0

私はそうは思わない。部分関数を引数として収集すると、fを適用してすべての一致する値を含むRDDを返します。それで、私が理解しているところからは、マップのように動作するはずですが、部分的な機能です。たとえそれが速くなくても、それほど遅くはありません。 – Nico

+0

質問とは無関係に、あなたの例はさまざまなことをしています。 「Naive Way」は分割文字列の最初の部分を返し、「collect way」は入力文字列の最初の文字を返します。 – Aivean

答えて

4

答えはの実装でありますcollect

/** 
* Return an RDD that contains all matching values by applying `f`. 
*/ 
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope { 
    val cleanF = sc.clean(f) 
    filter(cleanF.isDefinedAt).map(cleanF) 
} 

ご覧のとおり、filter - >mapという同じシーケンスですが、あなたのケースでは効率が悪いです。

isDefinedAtおよびapplyの方法は、の数値をifと評価しています。

したがって、「収集」の例では、splitは入力要素ごとに2回実行されます。

+2

ここで私は収集が地図とフィルターとは違っていたと思っていました。あなたの答えは大変ありがとうございます。 – Nico

関連する問題