SparkStreamingアプリケーション(Python)をデベロップしている間、私はそれがどのようにうまく動作するのかよく分かりません。 jsonファイルストリーム(ディレクトリにポップする)を読んで、それぞれのjsonオブジェクトと参照に対して結合操作を実行してから、それをテキストファイルに書き戻すだけです。SparkStreamingアプリケーションが遅すぎる
config = configparser.ConfigParser()
config.read("config.conf")
def getSparkSessionInstance(sparkConf):
if ("sparkSessionSingletonInstance" not in globals()):
globals()["sparkSessionSingletonInstance"] = SparkSession \
.builder \
.config(conf=sparkConf) \
.getOrCreate()
return globals()["sparkSessionSingletonInstance"]
# Création du contexte
sc = SparkContext()
ssc = StreamingContext(sc, int(config["Variables"]["batch_period_spark"]))
sqlCtxt = getSparkSessionInstance(sc.getConf())
df_ref = sqlCtxt.read.json("file://" + config["Paths"]["path_ref"])
df_ref.createOrReplaceTempView("REF")
df_ref.cache()
output = config["Paths"]["path_DATAs_enri"]
# Fonction de traitement des DATAs
def process(rdd):
if rdd.count() > 0:
#print(rdd.toDebugString)
df_DATAs = sqlCtxt.read.json(rdd)
df_DATAs.createOrReplaceTempView("DATAs")
df_enri=sqlCtxt.sql("SELECT DATAs.*, REF.Name, REF.Mail FROM DATAs, REF WHERE DATAs.ID = REF.ID")
df_enri.createOrReplaceTempView("DATAs_enri")
df_enri.write.mode('append').json("file://" + output)
if(df_enri.count() < df_DATAs.count()):
df_fail = sqlCtxt.sql("SELECT * FROM DATAs WHERE DATAs.ID NOT IN (SELECT ID FROM DATAs_enri)")
df_fail.show()
# Configuration du stream et lancement
files = ssc.textFileStream("file://" + config["Paths"]["path_stream_DATAs"])
files.foreachRDD(process)
print("[GO]")
ssc.start()
ssc.awaitTermination()
はここに私のスパークconfigです::ここに私のコードで、それが働いている、
spark.master local[*]
spark.executor.memory 3g
spark.driver.memory 3g
spark.python.worker.memory 3g
spark.memory.fraction 0.9
spark.driver.maxResultSize 3g
spark.memory.storageFraction 0.9
spark.eventLog.enabled true
まあ、私は疑問を持っている:このプロセスは遅く、処理遅延が増加しています。私は現地で働いていますが、私はparrallelismがないことを恐れています...監視UIでは、私は一度に1人のエグゼクティブと1つの仕事しか見ることができません。それを行う簡単な方法はありますか? と同じようにDStreamの機能を変換しますか?私は行方不明の設定変数がありますか?
これらのアドバイスに感謝します。 別の質問があります。私の_if_の最初の_count_は、Sparkがrddを処理するのを防ぐためです。まだ処理するものがない場合でも、SparkStreamingがストリーム上の作品を開始するのは正常ですか?もし私がそれをしなければ、空のRDDを処理していると私に伝えます。 – Flibidi
最初の 'count()'関数 'isEmpty()'を使うことをお勧めします。https://spark.apache.org /docs/2.1.0/api/python/pyspark.html#pyspark.RDD.isEmptyこれが空であるかどうかを確認する方が早いです。これはシャッフルを生成しません。 –