以下の簡単なプログラムは、kafkaストリームから読み込み、5分ごとにCSVファイルに書き込み、そのスパークストリーミングを行います。 "実行プログラム"(実行プログラムではない)のマイクロバッチの後にJava関数を呼び出せる方法はありますか?スパークストリーミングの各マイクロバッチ後にjava関数を呼び出す
ストリーム内の任意のコードを呼び出すのは良い方法ではありませんが、これはボリュームデータが少ない特殊なケースです。ご了承ください。ありがとう。
あなたはこのようなもので、これを達成することができるはずpublic static void main(String[] args) throws Exception {
if (args.length == 0)
throw new Exception("Usage program configFilename");
String configFilename = args[0];
addShutdownHook();
ConfigLoader.loadConfig(configFilename);
sparkSession = SparkSession
.builder()
.appName(TestKafka.class.getName())
.master(ConfigLoader.getValue("master")).getOrCreate();
SparkContext context = sparkSession.sparkContext();
context.setLogLevel(ConfigLoader.getValue("logLevel"));
SQLContext sqlCtx = sparkSession.sqlContext();
System.out.println("Spark context established");
DataStreamReader kafkaDataStreamReader = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", ConfigLoader.getValue("brokers"))
.option("group.id", ConfigLoader.getValue("groupId"))
.option("subscribe", ConfigLoader.getValue("topics"))
.option("failOnDataLoss", false);
Dataset<Row> rawDataSet = kafkaDataStreamReader.load();
rawDataSet.printSchema();
rawDataSet.createOrReplaceTempView("rawEventView1");
rawDataSet = rawDataSet.withColumn("rawEventValue", rawDataSet.col("value").cast("string"));
rawDataSet.printSchema();
rawDataSet.createOrReplaceTempView("eventView1");
sqlCtx.sql("select * from eventView1")
.writeStream()
.format("csv")
.option("header", "true")
.option("delimiter", "~")
.option("checkpointLocation", ConfigLoader.getValue("checkpointPath"))
.option("path", ConfigLoader.getValue("recordsPath"))
.outputMode(OutputMode.Append())
.trigger(ProcessingTime.create(Integer.parseInt(ConfigLoader.getValue("kafkaProcessingTime"))
, TimeUnit.SECONDS))
.start()
.awaitTermination();
}
実行するコードは何ですか?それはどんな値も返さないので、それは副作用ですか?なぜそれは運転手で起こらなければならないのですか? – raam86
これは、電子バッチが完了したことを電子メールで通知するようなものです。 – Manjesh