2016-08-09 3 views
2

私はスパークアプリケーションの設計を改善するための提案をしたいと思います。私はスパークするのが初めてで、パフォーマンスに関してオンラインで文書を読んでいます。完全なデータロードでクラスタを実行しているとき、非常に遅いです。ですから、アプリケーションを開発者の視点からどれだけうまく調整できるかをお勧めします。私のスパークアプリケーションをPythonでチューニングする提案

これは私のスパークアプリケーションを開発した方法です。 機能は、HDFSにあるファイルを読み込み、処理し、データを寄木細工のハイブテーブルに保存することです。 sparkとpythonを使用して開発されました。

各ファイルサイズは約50Mバイトで、処理するファイルは約50ファイルです。その3ノードクラスタ(2スレーブ+ 1マスタ)。現在、データの処理には約4時間かかります。 10個のドライバコア、全エグゼキュータコア60、エグゼキュータメモリ7G、コンフィグレーションに割り当てられたドライバメモリ7Gがある。

スクリプトはsc.TextFileを使用してすべてのファイルを読み取り、phytonRDDを作成します。 python RDDにはスキーマが割り当てられ、ラムダ関数を使用して行ごとに処理されます。行単位で処理するには時間がかかります。処理後、それは寄木張りテーブルに格納されます。

各RDDが作成するRDDの数とメモリの量を確認するにはどうすればよいですか。どのようにこれを改善することができます。

ありがとうございます。

コードスニペット:

# Input: list of files along with metadata file. 
# Files start with a number to identify which branch file and the branch number is also a value in the base table 
# If there are 100 files, then I can specify to process 10 files 

# checking the metadata table 
    field_lst = branch list 
    sql_stmt = 'from bd.tb select filename where field1 in ' + \ 
        ' (\"' + '\", \"'.join(["%s"] * int(field_lst.__len__())) % tuple(field_lst) + '\")' + \ 
        ' and ' + \ 
        'filename like "%_yyyy_xxxxx%"' 
     newdf = hc.sql(sql_stmt) 
# preparing the list of files that needs to be processed. This is a subset of input files. 
     files = newdf.toPandas().values.tolist() 
     newfiles = list(ny.array(files).flat) 
# processing the input file 
     pyrdd = sc.textFile(','.join(newfiles), use_unicode=False) 
    rdd1 = pyrdd.map(lambda row: tbl_schema(*row.split('\t'))) 
    rdd2 = rdd1.map(lambda row: None if (row == '0' or row == '') else row) 
# input file has around 20 columns which is processed in Row 
    rdd3 = rdd2.map(lambda row: Row(
      str(row.field1).lower.replace("'", "''").replace("\\", "\\\\").strip(), 
      row.field2, 
      row.field3, 
      datetime.datetime.now() 
      )) 
    df2 = hc.createDataFrame(rdd3, SCHEMA_rdd3) 
# reading from the base table where the rows does not have the branch list in field_lst 
    sql_stmt1 = 'select * from ' + base_table + ' where field1 not in ' + \ 
      ' (\"' + '\", \"'.join(["%s"] * int(field_lst.__len__())) % tuple(field_lst) + '\")' 
     df3 = hc.sql(sql_stmt1) 
     new_df = df2.unionAll(df3) 
     new_df.saveAsTable('tmp_tbl1, mode='overwrite') 
     df_new = hc.sql('select * from tmp_tbl1') 
     df_new.saveAsTable(base_table, mode='overwrite') 
+0

コードを投稿することはできますか?ボトルネックがどこにあるかを見つけるのに役立つかもしれません。 – jtitusj

+0

Spark 2.0では、CSVファイルをDataFrames *に直接読み込むことができます(Spark 1.xでは、構文が少し異なり、CLASSPATHの調整が可能な「Spark-CSV」プラグインを使用できます)。すべてのSparkベンチマークでは、DataFramesがScalaの生のRDDよりも高速に処理され、Pythonの方がはるかに高速です。 –

答えて

1

私はあなたがより良いあなたの仕事を理解するためにSpark History Serverを使用することをお勧めします。

各RDDで作成されるRDDの数とメモリの量を知るにはどうすればよいですか。

(あなたがRDDSをキャッシュまたはシャッフルなければ、そうでない場合は、多くのメモリを消費しない)

また、履歴サーバ等

、あなたの仕事のDAG、潜在的なGCの問題をも表示することができます

行ごとの処理に時間がかかります。

これは既に分かっているので、単体テストとプロファイリングを使用して機能を調整することに焦点を当てることができます。あなたの質問に実際のコードを貼り付けることで、人々が手助けすることが可能になります。

+0

こんにちは、ありがとうございます。以下のコードスニペットを見つけてください: – Aavik

+0

これはたくさんのコードです。あなたはそれが遅いと思っていますが、それを時間をずらすか、それを調整しようとしましたか? – ShuaiYuan

0

ありがとうございます。下記のコードスニペットをご覧ください:

# Input: list of files along with metadata file. 
# Files start with a number to identify which branch file and the branch number is also a value in the base table 
# If there are 100 files, then I can specify to process 10 files 

# checking the metadata table 
    field_lst = branch list 
    sql_stmt = 'from bd.tb select filename where field1 in ' + \ 
        ' (\"' + '\", \"'.join(["%s"] * int(field_lst.__len__())) % tuple(field_lst) + '\")' + \ 
        ' and ' + \ 
        'filename like "%_yyyy_xxxxx%"' 
     newdf = hc.sql(sql_stmt) 
# preparing the list of files that needs to be processed. This is a subset of input files. 
     files = newdf.toPandas().values.tolist() 
     newfiles = list(ny.array(files).flat) 
# processing the input file 
     pyrdd = sc.textFile(','.join(newfiles), use_unicode=False) 
    rdd1 = pyrdd.map(lambda row: tbl_schema(*row.split('\t'))) 
    rdd2 = rdd1.map(lambda row: None if (row == '0' or row == '') else row) 
# input file has around 20 columns which is processed in Row 
    rdd3 = rdd2.map(lambda row: Row(
      str(row.field1).lower.replace("'", "''").replace("\\", "\\\\").strip(), 
      row.field2, 
      row.field3, 
      datetime.datetime.now() 
      )) 
    df2 = hc.createDataFrame(rdd3, SCHEMA_rdd3) 
# reading from the base table where the rows does not have the branch list in field_lst 
    sql_stmt1 = 'select * from ' + base_table + ' where field1 not in ' + \ 
      ' (\"' + '\", \"'.join(["%s"] * int(field_lst.__len__())) % tuple(field_lst) + '\")' 
     df3 = hc.sql(sql_stmt1) 
     new_df = df2.unionAll(df3) 
     new_df.saveAsTable('tmp_tbl1, mode='overwrite') 
     df_new = hc.sql('select * from tmp_tbl1') 
     df_new.saveAsTable(base_table, mode='overwrite') 
関連する問題