2016-12-28 1 views
0

1日1回更新される小さなハイブテーブル(約50000レコード)があります。キャッシュデータフレームを更新しますか?

このテーブルにはキャッシュされたデータフレームがあり、スパークストリーミングデータと結合されています。ベースハイブに新しいデータが読み込まれたら、データフレームをどのようにリフレッシュするのですか?

DataFrame tempApp = hiveContext.table("emp_data"); 

//Get Max Load-Date 
Date max_date = max_date = tempApp.select(max("load_date")).collect()[0].getDate(0); 

//Get data for latest date and cache. This will be used to join with stream data. 
DataFrame emp= hiveContext.table("emp_data").where("load_date='" + max_date + "'").cache(); 

// Get message from Kafka Stream 
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(....); 

JavaDStream<MobileFlowRecord> rddMobileFlorRecs = messages.map(Record::parseFromMessage); 

kafkaRecs.foreachRDD(rdd->{DataFrame recordDataFrame = hiveContext.createDataFrame(rdd, Record.class); 

DataFrame joinedDataSet = recordDataFrame.join(emp, 
recordDataFrame.col("application").equalTo(app.col("emp_id")); 
joinedDataSet. <Do furthur processing> 
}); 

答えて

0

スパーク自動的に RDDまたはデータフレームが使用されなくなった場合。 RDDまたはDataframeがキャッシュされているかどうかを知るために、Spark UI→Storageタブレットに入り、Memoryの詳細を参照してください。 df.unpersist()またはsqlContext.uncacheTable("sparktable")uncacheTable APiを使用すると、メモリからdfまたはテーブルを削除できます。このオプションは新しいSparksessionAPiでは使用できませんが、下位互換性は常に存在します。 Lazy Evaluationのために作られたSparkは、あなたが何らかの行動を言うまで、RDDやDataFrameにデータをロードしたり処理したりしない限り、

あなたのためにjoinを実行した後、unpersist()をデータフレーム用に実行します。パフォーマンスを向上させ、問題を解決します。

Databricks

+0

私があなたの解決策を理解しているかどうかは疑問です。データセットのキャッシングとキャッシュ解除は問題を解決するかもしれませんが、キャッシュは1回の反復でのみ有効であるため、キャッシングの目的を破ります。サンプルコードを追加してより明確にしました。第二に、私はテストしました、各反復でキャッシングとアンキャッシングは約3秒の遅延を追加しています。これを達成するための他の方法があるのではないかと思いますか? – Akhil

0

手動で行うことができます。このような何か:

DataFrame refresh(DataFrame orig) { 
    if (orig != null) { 
     orig.unpersist(); 
    } 
    DataFrame res = get the dataframe as you normally would 
    res.cache() 
    return res 

今、一日一回これを呼び出すか、あなたは、このようにリフレッシュしたい時はいつでも:

DataFrame join_df = refresh(join_df) 

これは基本的にありませんが、以前のバージョンのunpersistsを(キャッシュを削除します)、読み込みされます新しいものをキャッシュしてからキャッシュします。実際には、データフレームがリフレッシュされます。

データフレームは、キャッシュが遅延しているためリフレッシュ後に最初に使用された後にのみメモリに保持されることに注意してください。

関連する問題