0
私は構造化ストリーミングを使用してKafkaからSparkにメッセージを読み込むアプリケーションを作成しています。最新のn個のレコードをクエリ用に保存するスパーク構造のストリーミング
着信メッセージは、文字列形式の取引関連のFIXメッセージです。それらはJava POJOに変換されます。以下
サンプルコード:私はそのデータセットの上でスパークSQLをサポートすることができるように
SparkSession spark = createSparkSession();
// Subscribe to 1 topic
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics)
.load();
df.as(Encoders.STRING()).map(new MapFunction<String, OrderData>() {
@Override
public OrderData call(String arg0) throws Exception {
// TODO Auto-generated method stub
return OrderData(arg0);
}
}, Encoders.bean(OrderData.class));
私の質問は、どのように私は、DataSet内のメモリ内の最後のnレコードを保存することができますか?