3 EC2インスタンスのc4.2xlarge(15GB RAM/8コア)タイプを使用してスパークのクラスタを起動しました。次にA、B、Cとします。設定クリークアキュムレータステップ中にのみマスタサーバ上でスパークジョブが実行される理由
:私はmaster-server.sh start-master.sh
として、私は唯一の3人の執行を開始したこのクラスタ上でそれを開始している 。 Iは、各インスタンスで、次のコマンドを実行して、インスタンスの両方に8つのエグゼキュータを作成した
:BおよびCの設定は、次のコマンド start-slave.sh <master-uri> -c 3
有します。
start-slave.sh <master-uri> -c 8
今、私のコードは以下の通りです:
# Loading wiki dumps files.
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0].encode("utf-8"))
# Running word count algorithm. and selecting with count as 1.
counts = lines.flatMap(lambda x: x.lower().split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add) \
.filter(lambda x: x[1] == 1) \
.map(lambda (x,y): x)
# Making Dataframe from RDD.
df = lines.map(lambda x: (x,)).toDF(['raw_sentence'])
# Tokenizing using spark ml API.
fl = Tokenizer(inputCol="raw_sentence", outputCol="words")
df = fl.transform(df).select("words")
# Removing Stopwords. Pay attention I am converting counts to list iterator.
fl = StopWordsRemover(inputCol="words", outputCol="filtered")
fl.setStopWords(fl.getStopWords() + list(counts.toLocalIterator()))
df = fl.transform(df).select("filtered")
を最初は私が仕事を始めたとき。私のサーバーA、B、Cはすべてのコアを利用していました。しかし、その後しばらくして、私のBとCコアは、任意のメモリ またはコアを使用することはありませんし、この段階で、次のログました:
17/09/08午後08時31分54秒INFO BlockManagerInfo:削除しましbroadcast_0_piece0に 172.31.35.55:45288 172.31.35.55:45288 in memory(サイズ:25.0 KB、無料:6.2 GB)17/09/08 20:31:54 INFO BlockManagerInfo:ブロードキャスト_0_piece0を削除しました 172.31.44.209:39094 in memory(サイズ:25.0 KB、無料6.2 GB)17/09/08午後08時31分54秒INFO ContextCleaner:クリーンアキュムレータ51
17/09/08午後9時13分51秒はHeartbeatReceiverに警告:なし 最近のハートビートで実行部2の取り外し:232069ミリ秒のタイムアウトを超え120000 ms 17/09/08 午前21時26分15秒ERROR TaskSchedulerImpl:172.31.44.209で失わエグゼキュータ2:エラー送信結果 RpcResponse {requestId = 8270848140270032673、 : キュータハートビートは、232069ミリ秒 17/09/08夜9時27分09秒エラーTransportRequestHandler後にタイムアウトbody = NioManagedBuffer {buf = java.nio.HeapByteBuffer [pos = 0 lim = 47 cap = 64]}} /172.31.44.209:33418に;クローズ接続 にjava.io.IOException:私のコードの壊れたパイプ
ライン47は以下の通りである上記のコードの第二の最後の行です:
fl.setStopWords(fl.getStopWords() + list(counts.toLocalIterator()))
カスタム構成は以下のとおりです。 SPARK_EXECUTOR_MEMORY = 12G
その他はデフォルトです。
なぜ、47行目のタスクが分散して実行されていないのですか? 余分なリソース、特にRAMがあるのに、クラッシュしたのはなぜですか?