データベースの履歴データをSparkにロードし、新しいストリーミングデータをSparkに追加し続けるSparkのユースケースを用意しています。最新のデータセット。Sparkのバッチデータセットにストリーミングデータセットを追加する
私が知る限り、Spark SQLもSpark Streamingも、履歴データとストリーミングデータを組み合わせることはできません。それから、Spark 2.0のストラクチャードストリーミングがこの問題のために構築されているようです。しかし、いくつかの実験の後、私はまだそれを理解することはできません。ここに私のコードは次のとおりです。
SparkSession spark = SparkSession
.builder()
.config(conf)
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
// Load historical data from MongoDB
JavaMongoRDD<Document> mongordd = MongoSpark.load(jsc);
// Create typed dataset with customized schema
JavaRDD<JavaRecordForSingleTick> rdd = mongordd.flatMap(new FlatMapFunction<Document, JavaRecordForSingleTick>() {...});
Dataset<Row> df = spark.sqlContext().createDataFrame(rdd, JavaRecordForSingleTick.class);
Dataset<JavaRecordForSingleTick> df1 = df.as(ExpressionEncoder.javaBean(JavaRecordForSingleTick.class));
// ds listens to a streaming data source
Dataset<Row> ds = spark.readStream()
.format("socket")
.option("host", "127.0.0.1")
.option("port", 11111)
.load();
// Create the typed dataset with customized schema
Dataset<JavaRecordForSingleTick> ds1 = ds
.as(Encoders.STRING())
.flatMap(new FlatMapFunction<String, JavaRecordForSingleTick>() {
@Override
public Iterator<JavaRecordForSingleTick> call(String str) throws Exception {
...
}
}, ExpressionEncoder.javaBean(JavaRecordForSingleTick.class));
// ds1 and df1 have the same schema. ds1 gets data from the streaming data source, df1 is the dataset with historical data
ds1 = ds1.union(df1);
StreamingQuery query = ds1.writeStream().format("console").start();
query.awaitTermination();
私はエラーだ「org.apache.spark.sql.AnalysisException:/データセットがサポートされていないストリーミングやバッチデータフレームの間の連合を。」私は2つのデータセットを結合します。
誰でもお手伝いできますか?間違った方向に行くのですか?
Spark 2.0のストラクチャードストリーミングはAlphaにあります。多くのものはまだサポートされていません。代わりにステートフルなストリーミングを使用できないのだろうかと思います。ステートフルストリーミングでは、履歴データを使用して状態をブートストラップし、ストリーミングデータを好きなように追加できます。詳細は、この[Databrickのblogpost](https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-apache-spark-streaming.html)を参照してください。 –
@GlennieHellesSindholtこんにちは、Glennie、ありがとうございます。 MapWithState()は、現在の状態(キー値のペア)を新しいストリーミングデータで置き換える/更新するのに最適です。私のユースケースでは、私のRDDはキー値のペアではなく、古いデータを更新する必要はありません。 mapWithState()を使用するのは使いすぎですか? –
私は 'mapWithState'は明白な選択ではないことに同意します。もしあなたが集計を持っていないのであれば、履歴データを必要としない場合はなぜストリームに入れたいのですか? –