ストリーミングアプリケーションでパフォーマンスの問題が発生しています。私は、DirectStreamを使用してデータをデータフレームに変換することで、カフカのトピックから自分のデータを読み込んでいます。データフレームで何らかの集約操作を行った後、結果をregisterTempTableに保存しています。 registerTempTableは、次の分のデータフレーム圧縮に使用されます。比較結果はHDFSに保存され、データは既存のregisterTempTableに上書きされます。スパークストリーミングパフォーマンスの問題。毎分処理時間が増えています
これは私がパフォーマンス上の問題に直面しています。私のストリーミングジョブは、最初の15秒、2番目の分18秒、3番目の分20秒で台無しになるので、処理時間が長くなります。期間中、私のストリーミングジョブがキューに入れられます。
アプリケーションについて
ストリーミングは60秒ごとに実行されます。 スパークバージョン2.1.1(私はpysparkを使用しています) カフカトピックには4つのパーティションがあります。
問題を解決するため、以下の手順を試しました。 ステップ1:私の仕事を提出している間、私は "spark.sql.shuffle.partitions = 4"を与えています。 ステップ2:私のデータフレームをテキストファイルとして保存していますが、私はcoalesce(4)を使用しています。
毎分スパンURLが表示されたら、ファイルステージとして保存すると、最初の14分、2分28分、3分42分の2倍になります。
こんにちは、
返事のおかげで、
申し訳ありませんが、私はスパークに新しいです。正確に何が変わる必要があるのか分かりません。私を助けてください。私は "df_data"をunpersistする必要がありますか? 私の "df_data"データフレームもキャッシュして固定解除します。しかし、まだ、私は同じ問題に直面しています。
チェックポイントを有効にする必要がありますか? "ssc.checkpoint("/user/test/checkpoint ")を追加するのと同様に、このcode.createDirectStreamはチェックポイントをサポートしますか?または、オフセット値を有効にする必要がありますか?ここに何が必要なのか教えてください。
if __name__ == "__main__":
sc = SparkContext(appName="PythonSqlNetworkWordCount")
sc.setLogLevel("ERROR")
sparkSql = SQLContext(sc)
ssc = StreamingContext(sc, 60)
zkQuorum = {"metadata.broker.list" : "m2.tatahdp.com:9092"}
topic = ["kpitmumbai"]
kvs = KafkaUtils.createDirectStream(ssc,topic,zkQuorum)
schema = StructType([StructField('A', StringType(), True), StructField('B', LongType(), True), StructField('C', DoubleType(), True), StructField('D', LongType(), True)])
first_empty_df = sqlCtx.createDataFrame(sc.emptyRDD(), schema)
first_empty_df.registerTempTable("streaming_tbl")
lines = kvs.map(lambda x :x[1])
lines.foreachRDD(lambda rdd: empty_rdd() if rdd.count() == 0 else
CSV(rdd))
ssc.start()
ssc.awaitTermination()
def CSV(rdd1):
spark = getSparkSessionInstance(rdd1.context.getConf())
psv_data = rdd1.map(lambda l: l.strip("\s").split("|"))
data_1 = psv_data.map(lambda l : Row(
A=l[0],
B=l[1],
C=l[2],
D=l[3])
hasattr(data_1 ,"toDF")
df_2= data_1.toDF()
df_last_min_data = sqlCtx.sql("select A,B,C,D,sample from streaming_tbl")#first time will be empty and next min onwards have values
df_data = df_2.groupby(['A','B']).agg(func.sum('C').alias('C'),func.sum('D').alias('D'))
df_data.registerTempTable("streaming_tbl")
con=(df_data.A==df_last_min_data.A) & (df_data.B==df_last_min_data.B)
path1=path="/user/test/str" + str(Starttime).replace(" ","").replace("-","").replace(":","")
df_last_min_data.join(df_data,con,"inner").select(df_last_min_data.A,df_last_min_data.b,df_data.C,df_data.D).write.csv(path=path1,mode="append")
再度お返事ありがとうございます。
ご返信ありがとうございます。上記のコードを参考に追加しました。私は変更が必要なところでお手伝いできますか? –