2017-08-22 4 views
0

私は構造化ストリーミングを使用してKafkaからSparkにメッセージを読み込むアプリケーションを作成しています。最新のn個のレコードをクエリ用に保存するスパーク構造のストリーミング

着信メッセージは、文字列形式の取引関連のFIXメッセージです。それらはJava POJOに変換されます。以下

サンプルコード:私はそのデータセットの上でスパークSQLをサポートすることができるように

SparkSession spark = createSparkSession(); 

    // Subscribe to 1 topic 
    Dataset<Row> df = spark 
     .readStream() 
     .format("kafka") 
     .option("kafka.bootstrap.servers", brokers) 
     .option("subscribe", topics) 
     .load(); 

    df.as(Encoders.STRING()).map(new MapFunction<String, OrderData>() { 

     @Override 
     public OrderData call(String arg0) throws Exception { 
      // TODO Auto-generated method stub 
      return OrderData(arg0); 
     } 
    }, Encoders.bean(OrderData.class)); 

私の質問は、どのように私は、DataSet内のメモリ内の最後のnレコードを保存することができますか?

答えて

0

は、このようないくつかのものが動作することを考え出した:。。

dataset.writeStream()フォーマット( "メモリ")QUERYNAME( "orderdataDS") .start(); enter code here