私はスパークアプリケーションの設計を改善するための提案をしたいと思います。私はスパークするのが初めてで、パフォーマンスに関してオンラインで文書を読んでいます。完全なデータロードでクラスタを実行しているとき、非常に遅いです。ですから、アプリケーションを開発者の視点からどれだけうまく調整できるかをお勧めします。私のスパークアプリケーションをPythonでチューニングする提案
これは私のスパークアプリケーションを開発した方法です。 機能は、HDFSにあるファイルを読み込み、処理し、データを寄木細工のハイブテーブルに保存することです。 sparkとpythonを使用して開発されました。
各ファイルサイズは約50Mバイトで、処理するファイルは約50ファイルです。その3ノードクラスタ(2スレーブ+ 1マスタ)。現在、データの処理には約4時間かかります。 10個のドライバコア、全エグゼキュータコア60、エグゼキュータメモリ7G、コンフィグレーションに割り当てられたドライバメモリ7Gがある。
スクリプトはsc.TextFileを使用してすべてのファイルを読み取り、phytonRDDを作成します。 python RDDにはスキーマが割り当てられ、ラムダ関数を使用して行ごとに処理されます。行単位で処理するには時間がかかります。処理後、それは寄木張りテーブルに格納されます。
各RDDが作成するRDDの数とメモリの量を確認するにはどうすればよいですか。どのようにこれを改善することができます。
ありがとうございます。
コードスニペット:
# Input: list of files along with metadata file.
# Files start with a number to identify which branch file and the branch number is also a value in the base table
# If there are 100 files, then I can specify to process 10 files
# checking the metadata table
field_lst = branch list
sql_stmt = 'from bd.tb select filename where field1 in ' + \
' (\"' + '\", \"'.join(["%s"] * int(field_lst.__len__())) % tuple(field_lst) + '\")' + \
' and ' + \
'filename like "%_yyyy_xxxxx%"'
newdf = hc.sql(sql_stmt)
# preparing the list of files that needs to be processed. This is a subset of input files.
files = newdf.toPandas().values.tolist()
newfiles = list(ny.array(files).flat)
# processing the input file
pyrdd = sc.textFile(','.join(newfiles), use_unicode=False)
rdd1 = pyrdd.map(lambda row: tbl_schema(*row.split('\t')))
rdd2 = rdd1.map(lambda row: None if (row == '0' or row == '') else row)
# input file has around 20 columns which is processed in Row
rdd3 = rdd2.map(lambda row: Row(
str(row.field1).lower.replace("'", "''").replace("\\", "\\\\").strip(),
row.field2,
row.field3,
datetime.datetime.now()
))
df2 = hc.createDataFrame(rdd3, SCHEMA_rdd3)
# reading from the base table where the rows does not have the branch list in field_lst
sql_stmt1 = 'select * from ' + base_table + ' where field1 not in ' + \
' (\"' + '\", \"'.join(["%s"] * int(field_lst.__len__())) % tuple(field_lst) + '\")'
df3 = hc.sql(sql_stmt1)
new_df = df2.unionAll(df3)
new_df.saveAsTable('tmp_tbl1, mode='overwrite')
df_new = hc.sql('select * from tmp_tbl1')
df_new.saveAsTable(base_table, mode='overwrite')
コードを投稿することはできますか?ボトルネックがどこにあるかを見つけるのに役立つかもしれません。 – jtitusj
Spark 2.0では、CSVファイルをDataFrames *に直接読み込むことができます(Spark 1.xでは、構文が少し異なり、CLASSPATHの調整が可能な「Spark-CSV」プラグインを使用できます)。すべてのSparkベンチマークでは、DataFramesがScalaの生のRDDよりも高速に処理され、Pythonの方がはるかに高速です。 –