0

ストリーミングアプリケーションでパフォーマンスの問題が発生しています。私は、DirectStreamを使用してデータをデータフレームに変換することで、カフカのトピックから自分のデータを読み込んでいます。データフレームで何らかの集約操作を行った後、結果をregisterTempTableに保存しています。 registerTempTableは、次の分のデータフレーム圧縮に使用されます。比較結果はHDFSに保存され、データは既存のregisterTempTableに上書きされます。スパークストリーミングパフォーマンスの問題。毎分処理時間が増えています

これは私がパフォーマンス上の問題に直面しています。私のストリーミングジョブは、最初の15秒、2番目の分18秒、3番目の分20秒で台無しになるので、処理時間が長くなります。期間中、私のストリーミングジョブがキューに入れられます。

アプリケーションについて

ストリーミングは60秒ごとに実行されます。 スパークバージョン2.1.1(私はpysparkを使用しています) カフカトピックには4つのパーティションがあります。

問題を解決するため、以下の手順を試しました。 ステップ1:私の仕事を提出している間、私は "spark.sql.shuffle.partitions = 4"を与えています。 ステップ2:私のデータフレームをテキストファイルとして保存していますが、私はcoalesce(4)を使用しています。

毎分スパンURLが表示されたら、ファイルステージとして保存すると、最初の14分、2分28分、3分42分の2倍になります。

スパークUIの結果。 enter image description here

こんにちは、

返事のおかげで、

申し訳ありませんが、私はスパークに新しいです。正確に何が変わる必要があるのか​​分かりません。私を助けてください。私は "df_data"をunpersistする必要がありますか? 私の "df_data"データフレームもキャッシュして固定解除します。しかし、まだ、私は同じ問題に直面しています。

チェックポイントを有効にする必要がありますか? "ssc.checkpoint("/user/test/checkpoint ")を追加するのと同様に、このcode.createDirectStreamはチェックポイントをサポートしますか?または、オフセット値を有効にする必要がありますか?ここに何が必要なのか教えてください。

if __name__ == "__main__": 
    sc = SparkContext(appName="PythonSqlNetworkWordCount") 
    sc.setLogLevel("ERROR") 
    sparkSql = SQLContext(sc) 
    ssc = StreamingContext(sc, 60) 
    zkQuorum = {"metadata.broker.list" : "m2.tatahdp.com:9092"} 
    topic = ["kpitmumbai"] 
    kvs = KafkaUtils.createDirectStream(ssc,topic,zkQuorum) 
    schema = StructType([StructField('A', StringType(), True), StructField('B', LongType(), True), StructField('C', DoubleType(), True), StructField('D', LongType(), True)]) 
    first_empty_df = sqlCtx.createDataFrame(sc.emptyRDD(), schema) 
    first_empty_df.registerTempTable("streaming_tbl") 
    lines = kvs.map(lambda x :x[1]) 
    lines.foreachRDD(lambda rdd: empty_rdd() if rdd.count() == 0 else 
    CSV(rdd)) 
    ssc.start() 
    ssc.awaitTermination() 

def CSV(rdd1): 

    spark = getSparkSessionInstance(rdd1.context.getConf()) 
    psv_data = rdd1.map(lambda l: l.strip("\s").split("|")) 
    data_1 = psv_data.map(lambda l : Row(
      A=l[0], 
      B=l[1], 
      C=l[2], 
      D=l[3]) 
    hasattr(data_1 ,"toDF") 
    df_2= data_1.toDF() 
    df_last_min_data = sqlCtx.sql("select A,B,C,D,sample from streaming_tbl")#first time will be empty and next min onwards have values 
    df_data = df_2.groupby(['A','B']).agg(func.sum('C').alias('C'),func.sum('D').alias('D')) 
    df_data.registerTempTable("streaming_tbl") 
    con=(df_data.A==df_last_min_data.A) & (df_data.B==df_last_min_data.B) 
    path1=path="/user/test/str" + str(Starttime).replace(" ","").replace("-","").replace(":","") 
    df_last_min_data.join(df_data,con,"inner").select(df_last_min_data.A,df_last_min_data.b,df_data.C,df_data.D).write.csv(path=path1,mode="append") 

再度お返事ありがとうございます。

答えて

0

データフレームで何らかの集約操作を行った後、結果をregisterTempTableに保存しています。 registerTempTableは、次の分のデータフレーム圧縮に使用されます。比較結果はHDFSに保存され、データは既存のregisterTempTableに上書きされます。

最も可能性の高い問題は、テーブルをチェックポイントしないで、各繰り返しで成長し続けることです。これにより、特にデータがキャッシュされていない場合は、各反復がますます高価になります。

全体的にステートフルな操作が必要な場合は、既存のステートフル変換を優先する必要があります。 「古い」ストリーミングと構造化されたストリーミングの両方には、さまざまなシナリオで使用できる独自のバリアントが付属しています。

+0

ご返信ありがとうございます。上記のコードを参考に追加しました。私は変更が必要なところでお手伝いできますか? –

関連する問題