0
2つのDstreamを実行しようとしています。最初の1つでdfをtmpビューとして登録し、別のDstreamで次のように使用してください。spark streaming - あるストリームでtmpビューを作成し、別のストリームで使用する
dstream1.foreachRDD { rdd =>
import org.apache.spark.sql._
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate
import spark.implicits._
import spark.sql
val records = rdd.toDF("record")
records.createOrReplaceTempView("records")
}
dstream2.foreachRDD { rdd =>
import org.apache.spark.sql._
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate
import spark.implicits._
import spark.sql
val records2 = rdd.toDF("record2")
val oldRecord = spark.table("records")
records2.join(oldRecod).write.json(...)
}
streamingContext.remember(Seconds(60))
streamingContext.start()
streamingContext.awaitTermination()
私はつねに何か正しいことをしていないので、明らかに私はを受け取ります。
これを行う方法はありますか?
ありがとうございます!