私はスパークストリーミングに取り組んでいます。スパークストリーミングのクエリを動的に切り替える
10回のクエリ実行ごとに入力クエリを変更したい場合、または10分ごとにクエリを切り替える場合は、その可能性がありますか?私がストリーミング・コンテキストを作成したら、私はその計算のロジックを変更できませんでした。私は疑問に思っています、私のSQLクエリを動的に切り替える可能性はありますか? 親切にお世話になりました。
私はスパークストリーミングに取り組んでいます。スパークストリーミングのクエリを動的に切り替える
10回のクエリ実行ごとに入力クエリを変更したい場合、または10分ごとにクエリを切り替える場合は、その可能性がありますか?私がストリーミング・コンテキストを作成したら、私はその計算のロジックを変更できませんでした。私は疑問に思っています、私のSQLクエリを動的に切り替える可能性はありますか? 親切にお世話になりました。
ここで私は ヴァル・ワード、火花ページから例を取る:例えば、DSTREAM [文字列] = ...
words.foreachRDD { rdd =>
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val wordsDataFrame = rdd.toDF("word")
wordsDataFrame.createOrReplaceTempView("words")
val wordCountsDataFrame =
spark.sql(dynamicQuery)
wordCountsDataFrame.show()
}
私は動的にクエリを切り替えたいです私は、クエリ実行の各トリガごとに異なるクエリに切り替えます。私は以下のアプローチを試みましたが正常に動作しますが、理由はわかりません:
if(counter % 2 == 0) val wordCountsDataFrame = spark.sql(dynamicQuery_1)
else spark.sql(dynamicQuery_2)
クエリの実行でカウンタが増分します。
私の質問は: 1)これを実装するより良い方法はありますか? 2)私が理解しているように、いったんスパークストリーミングコンテキストが作成されると、その計算ロジックを変更することはできませんでした。これは、DAGSchedulerとTaskSchedulerがワークフローを動的に管理できることを意味しますか?
あなたは何をしようとしているのかの例を挙げることができますか?現在のところ、この質問には1つの近い投票があり、将来閉鎖される可能性があります。また、回答が得られる可能性も大幅に向上します。 – Alexei
あなたのコメントをありがとう、私は以下の例を追加しました。 –