3

データベースの履歴データをSparkにロードし、新しいストリーミングデータをSparkに追加し続けるSparkのユースケースを用意しています。最新のデータセット。Sparkのバッチデータセットにストリーミングデータセットを追加する

私が知る限り、Spark SQLもSpark Streamingも、履歴データとストリーミングデータを組み合わせることはできません。それから、Spark 2.0のストラクチャードストリーミングがこの問題のために構築されているようです。しかし、いくつかの実験の後、私はまだそれを理解することはできません。ここに私のコードは次のとおりです。

SparkSession spark = SparkSession 
      .builder() 
      .config(conf) 
      .getOrCreate(); 

    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); 

    // Load historical data from MongoDB 
    JavaMongoRDD<Document> mongordd = MongoSpark.load(jsc); 


    // Create typed dataset with customized schema 
    JavaRDD<JavaRecordForSingleTick> rdd = mongordd.flatMap(new FlatMapFunction<Document, JavaRecordForSingleTick>() {...}); 
    Dataset<Row> df = spark.sqlContext().createDataFrame(rdd, JavaRecordForSingleTick.class); 
    Dataset<JavaRecordForSingleTick> df1 = df.as(ExpressionEncoder.javaBean(JavaRecordForSingleTick.class)); 


    // ds listens to a streaming data source 
    Dataset<Row> ds = spark.readStream() 
      .format("socket") 
      .option("host", "127.0.0.1") 
      .option("port", 11111) 
      .load(); 

    // Create the typed dataset with customized schema 
    Dataset<JavaRecordForSingleTick> ds1 = ds 
      .as(Encoders.STRING()) 
      .flatMap(new FlatMapFunction<String, JavaRecordForSingleTick>() { 
     @Override 
     public Iterator<JavaRecordForSingleTick> call(String str) throws Exception { 
     ... 
     } 
    }, ExpressionEncoder.javaBean(JavaRecordForSingleTick.class)); 


    // ds1 and df1 have the same schema. ds1 gets data from the streaming data source, df1 is the dataset with historical data 

    ds1 = ds1.union(df1); 
    StreamingQuery query = ds1.writeStream().format("console").start(); 
    query.awaitTermination(); 

私はエラーだ「org.apache.spark.sql.AnalysisException:/データセットがサポートされていないストリーミングやバッチデータフレームの間の連合を。」私は2つのデータセットを結合します。

誰でもお手伝いできますか?間違った方向に行くのですか?

+0

Spark 2.0のストラクチャードストリーミングはAlphaにあります。多くのものはまだサポートされていません。代わりにステートフルなストリーミングを使用できないのだろうかと思います。ステートフルストリーミングでは、履歴データを使用して状態をブートストラップし、ストリーミングデータを好きなように追加できます。詳細は、この[Databrickのblogpost](https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-apache-spark-streaming.html)を参照してください。 –

+0

@GlennieHellesSindholtこんにちは、Glennie、ありがとうございます。 MapWithState()は、現在の状態(キー値のペア)を新しいストリーミングデータで置き換える/更新するのに最適です。私のユースケースでは、私のRDDはキー値のペアではなく、古いデータを更新する必要はありません。 mapWithState()を使用するのは使いすぎですか? –

+0

私は 'mapWithState'は明白な選択ではないことに同意します。もしあなたが集計を持っていないのであれば、履歴データを必要としない場合はなぜストリームに入れたいのですか? –

答えて

1

このタイプの機能をサポートする点でMongoDBのスパークコネクタは話せません.Googleにはそのような機能はあまりないようです。しかし、スパークデータベースエコシステムには他にもデータベースがあります。私はanother answerでスパークデータベースエコシステムにあるもののほとんどをカバーしました。私はSnappyDataMemSQLがそのリストにあることを知っていますが、探している機能の種類をどのデータベースが簡単に許可しているのかを正確に言うことはできません。しかし、両方のために、リレーショナル形式のデータが必要な場合があります。

関連する問題