私はDataprocでSparkクラスタを使用していますが、処理の最後にジョブが失敗します。Google Cloudでスパーク最後の段階でDataprocジョブが失敗する
私のデータソースは、Google Cloud Storage(全容量は3.5TB、5000ファイル)のcsv形式のテキストログファイルです。
処理ロジックは以下の通りである:
- はデータフレーム(スキーマ[ "タイムスタンプ"、 "メッセージ"])にファイルを読み取ります。
- すべてのメッセージを1秒のウィンドウにグループ化する。
- グループ化されたすべてのメッセージにパイプライン[Tokenizer - > HashingTF]を適用して、単語とその頻度を抽出して特徴ベクトルを構築します。
- GCSにタイムラインを含む特徴ベクトルを保存します。
データの小さなサブセット(10個のファイルのような)での処理はうまく動作しますが、すべてのファイルで実行しているときには、最後に「Container YARNによってメモリ制限を超えて殺されました。25 GBの24 GBの物理メモリが使用されました。spark.yarn.executor.memoryOverheadを増強することを検討してください。
私のクラスタには、n1-highmem-8マシンを持つ25人のワーカーがいます。だから私はこのエラーのためにグーグルで、文字通り "spark.yarn.executor.memoryOverhead"パラメータを6500MBに増やしました。
は今私の火花ジョブがまだ失敗しますが、エラーで「ジョブによるステージ障害のために中止さ:4293のタスク(1920.0メガバイト)のシリアル化された結果の合計サイズがspark.driver.maxResultSize(1920.0メガバイト)よりも大きい」私はスパークするのが新しく、私は何か間違っているか、設定レベルで、または私のコードでやっていると信じています。あなたがこれらのものをきれいにするのを助けることができるなら、それは素晴らしいでしょう!ここで
は、スパークタスクのための私のコードです:
import logging
import string
from datetime import datetime
import pyspark
import re
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType, ArrayType
from pyspark.sql import functions as F
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Constants
NOW = datetime.now().strftime("%Y%m%d%H%M%S")
START_DATE = '2016-01-01'
END_DATE = '2016-03-01'
sc = pyspark.SparkContext()
spark = SparkSession\
.builder\
.appName("LogsVectorizer")\
.getOrCreate()
spark.conf.set('spark.sql.shuffle.partitions', 10000)
logger.info("Start log processing at {}...".format(NOW))
# Filenames to read/write locations
logs_fn = 'gs://databucket/csv/*'
vectors_fn = 'gs://databucket/vectors_out_{}'.format(NOW)
pipeline_fn = 'gs://databucket/pipeline_vectors_out_{}'.format(NOW)
model_fn = 'gs://databucket/model_vectors_out_{}'.format(NOW)
# CSV data schema to build DataFrame
schema = StructType([
StructField("timestamp", StringType()),
StructField("message", StringType())])
# Helpers to clean strings in log fields
def cleaning_string(s):
try:
# Remove ids (like: app[2352] -> app)
s = re.sub('\[.*\]', 'IDTAG', s)
if s == '':
s = 'EMPTY'
except Exception as e:
print("Skip string with exception {}".format(e))
return s
def normalize_string(s):
try:
# Remove punctuation
s = re.sub('[{}]'.format(re.escape(string.punctuation)), ' ', s)
# Remove digits
s = re.sub('\d*', '', s)
# Remove extra spaces
s = ' '.join(s.split())
except Exception as e:
print("Skip string with exception {}".format(e))
return s
def line_splitter(line):
line = line.split(',')
timestamp = line[0]
full_message = ' '.join(line[1:])
full_message = normalize_string(cleaning_string(full_message))
return [timestamp, full_message]
# Read line from csv, split to date|message
# Read CSV to DataFrame and clean its fields
logger.info("Read CSV to DF...")
logs_csv = sc.textFile(logs_fn)
logs_csv = logs_csv.map(lambda line: line_splitter(line)).toDF(schema)
# Keep only lines for our date interval
logger.info("Filter by dates...")
logs_csv = logs_csv.filter((logs_csv.timestamp>START_DATE) & (logs_csv.timestamp<END_DATE))
logs_csv = logs_csv.withColumn("timestamp", logs_csv.timestamp.cast("timestamp"))
# Helpers to join messages into window and convert sparse to dense
join_ = F.udf(lambda x: "| ".join(x), StringType())
asDense = F.udf(lambda v: v.toArray().tolist())
# Agg by time window
logger.info("Group log messages by time window...")
logs_csv = logs_csv.groupBy(F.window("timestamp", "1 second"))\
.agg(join_(F.collect_list("message")).alias("messages"))
# Turn message to hashTF
tokenizer = Tokenizer(inputCol="messages", outputCol="message_tokens")
hashingTF = HashingTF(inputCol="message_tokens", outputCol="tokens_counts", numFeatures=1000)
pipeline_tf = Pipeline(stages=[tokenizer, hashingTF])
logger.info("Fit-Transform ML Pipeline...")
model_tf = pipeline_tf.fit(logs_csv)
logs_csv = model_tf.transform(logs_csv)
logger.info("Spase vectors to Dense list...")
logs_csv = logs_csv.sort("window.start").select(["window.start", "tokens_counts"])\
.withColumn("tokens_counts", asDense(logs_csv.tokens_counts))
# Save to disk
# Save Pipeline and Model
logger.info("Save models...")
pipeline_tf.save(pipeline_fn)
model_tf.save(model_fn)
# Save to GCS
logger.info("Save results to GCS...")
logs_csv.write.parquet(vectors_fn)
ありがとうございます! 'spark.driver.maxResultSize'をマスターのマシンタイプが' n1-high 'の '10g'に設定しましたmem-4'が助けになりました。 –