1日1回更新される小さなハイブテーブル(約50000レコード)があります。キャッシュデータフレームを更新しますか?
このテーブルにはキャッシュされたデータフレームがあり、スパークストリーミングデータと結合されています。ベースハイブに新しいデータが読み込まれたら、データフレームをどのようにリフレッシュするのですか?
DataFrame tempApp = hiveContext.table("emp_data");
//Get Max Load-Date
Date max_date = max_date = tempApp.select(max("load_date")).collect()[0].getDate(0);
//Get data for latest date and cache. This will be used to join with stream data.
DataFrame emp= hiveContext.table("emp_data").where("load_date='" + max_date + "'").cache();
// Get message from Kafka Stream
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(....);
JavaDStream<MobileFlowRecord> rddMobileFlorRecs = messages.map(Record::parseFromMessage);
kafkaRecs.foreachRDD(rdd->{DataFrame recordDataFrame = hiveContext.createDataFrame(rdd, Record.class);
DataFrame joinedDataSet = recordDataFrame.join(emp,
recordDataFrame.col("application").equalTo(app.col("emp_id"));
joinedDataSet. <Do furthur processing>
});
私があなたの解決策を理解しているかどうかは疑問です。データセットのキャッシングとキャッシュ解除は問題を解決するかもしれませんが、キャッシュは1回の反復でのみ有効であるため、キャッシングの目的を破ります。サンプルコードを追加してより明確にしました。第二に、私はテストしました、各反復でキャッシングとアンキャッシングは約3秒の遅延を追加しています。これを達成するための他の方法があるのではないかと思いますか? – Akhil