2017-05-02 16 views
0

SparkStreamingアプリケーション(Python)をデベロップしている間、私はそれがどのようにうまく動作するのかよく分かりません。 jsonファイルストリーム(ディレクトリにポップする)を読んで、それぞれのjsonオブジェクトと参照に対して結合操作を実行してから、それをテキストファイルに書き戻すだけです。SparkStreamingアプリケーションが遅すぎる

config = configparser.ConfigParser() 
config.read("config.conf") 

def getSparkSessionInstance(sparkConf): 
if ("sparkSessionSingletonInstance" not in globals()): 
    globals()["sparkSessionSingletonInstance"] = SparkSession \ 
     .builder \ 
     .config(conf=sparkConf) \ 
     .getOrCreate() 
return globals()["sparkSessionSingletonInstance"] 

# Création du contexte 
sc = SparkContext() 
ssc = StreamingContext(sc, int(config["Variables"]["batch_period_spark"])) 
sqlCtxt = getSparkSessionInstance(sc.getConf()) 
df_ref = sqlCtxt.read.json("file://" + config["Paths"]["path_ref"]) 
df_ref.createOrReplaceTempView("REF") 
df_ref.cache() 
output = config["Paths"]["path_DATAs_enri"] 


# Fonction de traitement des DATAs 
def process(rdd): 
     if rdd.count() > 0: 
       #print(rdd.toDebugString) 
       df_DATAs = sqlCtxt.read.json(rdd) 
       df_DATAs.createOrReplaceTempView("DATAs") 
       df_enri=sqlCtxt.sql("SELECT DATAs.*, REF.Name, REF.Mail FROM DATAs, REF WHERE DATAs.ID = REF.ID") 
       df_enri.createOrReplaceTempView("DATAs_enri") 
       df_enri.write.mode('append').json("file://" + output) 
       if(df_enri.count() < df_DATAs.count()): 
         df_fail = sqlCtxt.sql("SELECT * FROM DATAs WHERE DATAs.ID NOT IN (SELECT ID FROM DATAs_enri)") 
         df_fail.show() 


# Configuration du stream et lancement 
files = ssc.textFileStream("file://" + config["Paths"]["path_stream_DATAs"]) 
files.foreachRDD(process) 
print("[GO]") 
ssc.start() 
ssc.awaitTermination() 

はここに私のスパークconfigです::ここに私のコードで、それが働いている、

spark.master     local[*] 
spark.executor.memory   3g 
spark.driver.memory    3g 
spark.python.worker.memory  3g 
spark.memory.fraction   0.9 
spark.driver.maxResultSize  3g 
spark.memory.storageFraction 0.9 
spark.eventLog.enabled   true 

まあ、私は疑問を持っている:このプロセスは遅く、処理遅延が増加しています。私は現地で働いていますが、私はparrallelismがないことを恐れています...監視UIでは、私は一度に1人のエグゼクティブと1つの仕事しか見ることができません。それを行う簡単な方法はありますか? と同じようにDStreamの機能を変換しますか?私は行方不明の設定変数がありますか?

答えて

0

あなたのコードが遅い理由はいくつかあります。

私が見たように、労働者については、あなたが労働者の数を設定する場所は見当たりませんでした。だから、おそらく1を意味する労働者のデフォルト数で始まるでしょう。反対側では、あなたはそれほど大きくないかもしれない1つのファイルから読み込み、スパークは並列性を行っていません。あなたのコードのいくつかの手順をundesrtandする必要がある一方で

  1. あなたは数がたくさんあります:if rdd.count() > 0:; if(df_enri.count() < df_DATAs.count()):を、カウントは高価で、インクルードはあなたのストリーミングデータに削減相であり、あなたがやっていますカウントの3倍。
  2. ジョインも高価ですが、ストリーミングプロセスでの参加はそれほど良くありません。あなたはdf_ref.cache()を正しく実行しましたが、参加はシャッフルして高価です。

私があなたにお勧めするものは、失敗しないでください。コードから削除してください。それは動作しませんでした、ちょうどデータを保存しないでください。他のものは、hereが表示されているように、実行のためのより多くの労働者またはより多くのコアを設定してください:spark.executor.cores=2

+0

これらのアドバイスに感謝します。 別の質問があります。私の_if_の最初の_count_は、Sparkがrddを処理するのを防ぐためです。まだ処理するものがない場合でも、SparkStreamingがストリーム上の作品を開始するのは正常ですか?もし私がそれをしなければ、空のRDDを処理していると私に伝えます。 – Flibidi

+0

最初の 'count()'関数 'isEmpty()'を使うことをお勧めします。https://spark.apache.org /docs/2.1.0/api/python/pyspark.html#pyspark.RDD.isEmptyこれが空であるかどうかを確認する方が早いです。これはシャッフルを生成しません。 –