2017-05-29 25 views
0

この質問には既に回答がありましたら謝罪します。私はアーカイブを見ましたが、私の質問に固有の答えは見つかりませんでした。ローカルの[*]が使用可能なすべてのコアをマシンで使用しないのはなぜですか?

私はSparkを初めて使用しています。私はMacOS Sierraマシンでspark-2.1.1を使用して、ローカルで並列に添付された簡単なサンプルを実行しようとしています。私は4つのコアを持ち、それぞれ10秒かかる4つのタスクがあるので、合計で10秒以上を費やすことを望んでいました。

各タスクに予想される時間がかかることがわかります。しかし、私には実行スレッドが2つしかないようです。私は4を期待していました。コードでわかるように、各タプルの値は、対応するタスクの実行時間です。

insight086:pyspark lquesada $より出力/パート-00000

(u'1', 10.000892877578735) 
(u'3', 10.000878095626831) 

insight086:pyspark lquesada $より出力/パート-00001

(u'2', 10.000869989395142) 
(u'4', 10.000877857208252) 

また、これは取っている合計時間はかなりあります20秒以上:

total_time 33.2253439426 

ありがとうございました!

乾杯、 ルイス

INPUTファイル:

1 
2 
3 
4 

SCRIPT:

from pyspark import SparkContext 
import time 

def mymap(word): 
    start = time.time() 
    time.sleep(10) 
    et=time.time()-start 
    return (word, et) 

def main(): 
    start = time.time() 
    sc = SparkContext(appName='SparkWordCount') 

    input_file = sc.textFile('/Users/lquesada/Dropbox/hadoop/pyspark/input.txt') 
    counts = input_file.flatMap(lambda line: line.split()) \ 
        .map(mymap) \ 
        .reduceByKey(lambda a, b: a + b) 
    counts.saveAsTextFile('/Users/lquesada/Dropbox/hadoop/pyspark/output') 

    sc.stop() 
    print 'total_time',time.time()-start 

if __name__ == '__main__': 
    main() 
+0

このデータセットは非常に小さいので、何かを証明することは不可能です。 – eliasah

+0

私の実際の質問は、使用したナンバーコアにあります。しかし、オーバーヘッドに関する私の懸念と確かに関連しているので、「コアの数をスケーリングする際のパフォーマンスの数が一貫していません」と私が指摘した事実を感謝します。 –

答えて

0

Divide and conquer algorithmsは、それがすべてでそれらを使用することは理にかなってそのしきい値を持っている理由です。 Sparkで(並列性を備えた)ミックスにディストリビューションを追加すると、そのような小さな計算を行うためのかなりの機械があります。この4要素のデータセットでSparkの強みを活用しているわけではありません。

大規模なデータセットと大規模なデータセットでは、時間が期待通りに収束すると想定しています。

また、ローカルデータセットを読み取るときのパーティション数は2以下であるため、repartitioningでは2つのコアのみを使用します。

配分(numPartitions:INT)(暗黙のORD:注文[T] = NULL):RDD [T]は正確numPartitionsパーティションを持つ新しいRDDを返し

このRDDの並列度を増減できます。内部的には、シャッフルを使用してデータを再配布します。

このRDDのパーティション数を減らす場合は、合体を使用することを検討してください。これにより、シャッフルの実行が回避されます。


local[*]SparkContextLOCAL_N_REGEXためのケースを参照)、コンピュータを持っている限り多くのコアを使用することを意味します。これは、デフォルトで使用するどのように多くのパーティションだけヒントです

def localCpuCount: Int = Runtime.getRuntime.availableProcessors() 
val threadCount = if (threads == "*") localCpuCount else threads.toInt 

が、スパークが上下するのを妨げません。主に、Sparkが適用する最適化によって、分散計算のための最適な実行計画が得られます。 Sparkはあなたにとってかなりのことです。抽象度が高いほど、より多くの最適化が行われます(Spark SQLのOptimizerのbatchesを参照)。

関連する問題