2017-09-08 7 views
0

3 EC2インスタンスのc4.2xlarge(15GB RAM/8コア)タイプを使用してスパークのクラスタを起動しました。次にA、B、Cとします。設定クリークアキュムレータステップ中にのみマスタサーバ上でスパークジョブが実行される理由

:私はmaster-server.sh start-master.sh

として、私は唯一の3人の執行を開始したこのクラスタ上でそれを開始している 。 Iは、各インスタンスで、次のコマンドを実行して、インスタンスの両方に8つのエグゼキュータを作成した

:BおよびCの設定は、次のコマンド start-slave.sh <master-uri> -c 3

有します。

start-slave.sh <master-uri> -c 8

今、私のコードは以下の通りです:

# Loading wiki dumps files. 
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0].encode("utf-8")) 

# Running word count algorithm. and selecting with count as 1. 
counts = lines.flatMap(lambda x: x.lower().split(' ')) \ 
       .map(lambda x: (x, 1)) \ 
       .reduceByKey(add) \ 
       .filter(lambda x: x[1] == 1) \ 
       .map(lambda (x,y): x) 

# Making Dataframe from RDD. 
df = lines.map(lambda x: (x,)).toDF(['raw_sentence']) 

# Tokenizing using spark ml API. 
fl = Tokenizer(inputCol="raw_sentence", outputCol="words") 
df = fl.transform(df).select("words") 

# Removing Stopwords. Pay attention I am converting counts to list iterator. 
fl = StopWordsRemover(inputCol="words", outputCol="filtered") 
fl.setStopWords(fl.getStopWords() + list(counts.toLocalIterator())) 
df = fl.transform(df).select("filtered") 

を最初は私が仕事を始めたとき。私のサーバーA、B、Cはすべてのコアを利用していました。しかし、その後しばらくして、私のBとCコアは、任意のメモリ またはコアを使用することはありませんし、この段階で、次のログました:

17/09/08午後08時31分54秒INFO BlockManagerInfo:削除しましbroadcast_0_piece0に 172.31.35.55:45288 172.31.35.55:45288 in memory(サイズ:25.0 KB、無料:6.2 GB)17/09/08 20:31:54 INFO BlockManagerInfo:ブロードキャスト_0_piece0を削除しました 172.31.44.209:39094 in memory(サイズ:25.0 KB、無料6.2 GB)17/09/08午後08時31分54秒INFO ContextCleaner:クリーンアキュムレータ51

17/09/08午後9時13分51秒はHeartbeatReceiverに警告:なし 最近のハートビートで実行部2の取り外し:232069ミリ秒のタイムアウトを超え120000 ms 17/09/08 午前21時26分15秒ERROR TaskSchedulerImpl:172.31.44.209で失わエグゼキュータ2:エラー送信結果 RpcResponse {requestId = 8270848140270032673、 : キュータハートビートは、232069ミリ秒 17/09/08夜9時27分09秒エラーTransportRequestHandler後にタイムアウトbody = NioManagedBuffer {buf = java.nio.HeapByteBuffer [pos = 0 lim = 47 cap = 64]}} /172.31.44.209:33418に;クローズ接続 にjava.io.IOException:私のコードの壊れたパイプ

enter image description here

ライン47は以下の通りである上記のコードの第二の最後の行です:

fl.setStopWords(fl.getStopWords() + list(counts.toLocalIterator())) 

カスタム構成は以下のとおりです。 SPARK_EXECUTOR_MEMORY = 12G

その他はデフォルトです。

なぜ、47行目のタスクが分散して実行されていないのですか? 余分なリソース、特にRAMがあるのに、クラッシュしたのはなぜですか?

答えて

1

RDD.toLocalIteratorは、その時点で1つのパーティションをフェッチします。したがって、実行パターンは次のようになります。

  1. 単一パーティションが計算されます。これには、単一のエグゼキュータ(広範な依存関係なし)または複数のエグゼキュータからのアクティビティが必要な場合があります。
  2. データがドライバにフェッチされ、ローカルスレッドが反復を開始します。ドライバはアクティブで、残りのコードはアイドルです。
  3. チャンクの終わりに達すると、次に続くパーティションがあると、ドライバは次のパーティションを要求します(1に進みます)。

イテレータをlistに変換すると、collectにもなります。メモリ消費量は同じになります(また、障害につながる可能性があります)が、すべてのノードがポーズなしで部品を計算します。

関連する問題