2017-07-12 10 views
0

私はDataprocでSparkクラスタを使用していますが、処理の最後にジョブが失敗します。Google Cloudでスパーク最後の段階でDataprocジョブが失敗する

私のデータソースは、Google Cloud Storage(全容量は3.5TB、5000ファイル)のcsv形式のテキストログファイルです。

処理ロジックは以下の通りである:

  • はデータフレーム(スキーマ[ "タイムスタンプ"、 "メッセージ"])にファイルを読み取ります。
  • すべてのメッセージを1秒のウィンドウにグループ化する。
  • グループ化されたすべてのメッセージにパイプライン[Tokenizer - > HashingTF]を適用して、単語とその頻度を抽出して特徴ベクトルを構築します。
  • GCSにタイムラインを含む特徴ベクトルを保存します。

データの小さなサブセット(10個のファイルのような)での処理はうまく動作しますが、すべてのファイルで実行しているときには、最後に「Container YARNによってメモリ制限を超えて殺されました。25 GBの24 GBの物理メモリが使用されました。spark.yarn.executor.memoryOverheadを増強することを検討してください。

私のクラスタには、n1-highmem-8マシンを持つ25人のワーカーがいます。だから私はこのエラーのためにグーグルで、文字通り "spark.yarn.executor.memoryOverhead"パラメータを6500MBに増やしました。

は今私の火花ジョブがまだ失敗しますが、エラーで「ジョブによるステージ障害のために中止さ:4293のタスク(1920.0メガバイト)のシリアル化された結果の合計サイズがspark.driver.maxResultSize(1920.0メガバイト)よりも大きい」

私はスパークするのが新しく、私は何か間違っているか、設定レベルで、または私のコードでやっていると信じています。あなたがこれらのものをきれいにするのを助けることができるなら、それは素晴らしいでしょう!ここで

は、スパークタスクのための私のコードです:

import logging 
import string 
from datetime import datetime 

import pyspark 
import re 
from pyspark.sql import SparkSession 

from pyspark.ml.feature import HashingTF, IDF, Tokenizer 
from pyspark.ml import Pipeline 

from pyspark.sql.types import StructType, StructField 
from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType, ArrayType 
from pyspark.sql import functions as F 

logging.basicConfig(level=logging.INFO) 
logger = logging.getLogger(__name__) 

# Constants 
NOW = datetime.now().strftime("%Y%m%d%H%M%S") 
START_DATE = '2016-01-01' 
END_DATE = '2016-03-01' 

sc = pyspark.SparkContext() 
spark = SparkSession\ 
     .builder\ 
     .appName("LogsVectorizer")\ 
     .getOrCreate() 
spark.conf.set('spark.sql.shuffle.partitions', 10000) 

logger.info("Start log processing at {}...".format(NOW)) 

# Filenames to read/write locations 
logs_fn = 'gs://databucket/csv/*' 
vectors_fn = 'gs://databucket/vectors_out_{}'.format(NOW) 
pipeline_fn = 'gs://databucket/pipeline_vectors_out_{}'.format(NOW) 
model_fn = 'gs://databucket/model_vectors_out_{}'.format(NOW) 


# CSV data schema to build DataFrame 
schema = StructType([ 
    StructField("timestamp", StringType()), 
    StructField("message", StringType())]) 

# Helpers to clean strings in log fields 
def cleaning_string(s): 
    try: 
     # Remove ids (like: app[2352] -> app) 
     s = re.sub('\[.*\]', 'IDTAG', s) 
     if s == '': 
      s = 'EMPTY' 
    except Exception as e: 
     print("Skip string with exception {}".format(e)) 
    return s 

def normalize_string(s): 
    try: 
     # Remove punctuation 
     s = re.sub('[{}]'.format(re.escape(string.punctuation)), ' ', s) 
     # Remove digits 
     s = re.sub('\d*', '', s) 
     # Remove extra spaces 
     s = ' '.join(s.split()) 
    except Exception as e: 
     print("Skip string with exception {}".format(e)) 
    return s 

def line_splitter(line): 
    line = line.split(',') 
    timestamp = line[0] 
    full_message = ' '.join(line[1:]) 
    full_message = normalize_string(cleaning_string(full_message)) 
    return [timestamp, full_message] 

# Read line from csv, split to date|message 
# Read CSV to DataFrame and clean its fields 
logger.info("Read CSV to DF...") 
logs_csv = sc.textFile(logs_fn) 
logs_csv = logs_csv.map(lambda line: line_splitter(line)).toDF(schema) 

# Keep only lines for our date interval 
logger.info("Filter by dates...") 
logs_csv = logs_csv.filter((logs_csv.timestamp>START_DATE) & (logs_csv.timestamp<END_DATE)) 
logs_csv = logs_csv.withColumn("timestamp", logs_csv.timestamp.cast("timestamp")) 

# Helpers to join messages into window and convert sparse to dense 
join_ = F.udf(lambda x: "| ".join(x), StringType()) 
asDense = F.udf(lambda v: v.toArray().tolist()) 

# Agg by time window 
logger.info("Group log messages by time window...") 
logs_csv = logs_csv.groupBy(F.window("timestamp", "1 second"))\ 
         .agg(join_(F.collect_list("message")).alias("messages")) 

# Turn message to hashTF 
tokenizer = Tokenizer(inputCol="messages", outputCol="message_tokens") 
hashingTF = HashingTF(inputCol="message_tokens", outputCol="tokens_counts", numFeatures=1000) 

pipeline_tf = Pipeline(stages=[tokenizer, hashingTF]) 

logger.info("Fit-Transform ML Pipeline...") 
model_tf = pipeline_tf.fit(logs_csv) 
logs_csv = model_tf.transform(logs_csv) 

logger.info("Spase vectors to Dense list...") 
logs_csv = logs_csv.sort("window.start").select(["window.start", "tokens_counts"])\ 
        .withColumn("tokens_counts", asDense(logs_csv.tokens_counts)) 

# Save to disk 
# Save Pipeline and Model 
logger.info("Save models...") 
pipeline_tf.save(pipeline_fn) 
model_tf.save(model_fn) 

# Save to GCS 
logger.info("Save results to GCS...") 
logs_csv.write.parquet(vectors_fn) 

答えて

1

spark.driver.maxResultSize Dataprocでマスターノード上で実行され、あなたのドライバーの大きさの問題、です。 (最大RDDスパークはあなた.collect()をできるようになるマスターのメモリのデフォルト1/4

は、ドライバーに与えられ、それの1/2はspark.driver.maxResultSizeにセットを与えられている。

私はTokenizerかを推測していますHashingTFは、あなたのキースペースのサイズであるドライバを通して "メタデータ"を動かしています。許容サイズを大きくするには、spark.driver.maxResultSizeを増やすことができますが、spark.driver.memoryを増やすか、またはより大きなマスターを使用することもできます。

+0

ありがとうございます! 'spark.driver.maxResultSize'をマスターのマシンタイプが' n1-high 'の '10g'に設定しましたmem-4'が助けになりました。 –

関連する問題