2017-09-22 8 views
0

以下の簡単なプログラムは、kafkaストリームから読み込み、5分ごとにCSVファイルに書き込み、そのスパークストリーミングを行います。 "実行プログラム"(実行プログラムではない)のマイクロバッチの後にJava関数を呼び出せる方法はありますか?スパークストリーミングの各マイクロバッチ後にjava関数を呼び出す

ストリーム内の任意のコードを呼び出すのは良い方法ではありませんが、これはボリュームデータが少ない特殊なケースです。ご了承ください。ありがとう。

あなたはこのようなもので、これを達成することができるはず
public static void main(String[] args) throws Exception { 

    if (args.length == 0) 
     throw new Exception("Usage program configFilename"); 
    String configFilename = args[0]; 

    addShutdownHook(); 

    ConfigLoader.loadConfig(configFilename); 
    sparkSession = SparkSession 
      .builder() 
      .appName(TestKafka.class.getName()) 
      .master(ConfigLoader.getValue("master")).getOrCreate(); 
    SparkContext context = sparkSession.sparkContext(); 
    context.setLogLevel(ConfigLoader.getValue("logLevel")); 

    SQLContext sqlCtx = sparkSession.sqlContext(); 
    System.out.println("Spark context established"); 

    DataStreamReader kafkaDataStreamReader = sparkSession.readStream() 
      .format("kafka") 
      .option("kafka.bootstrap.servers", ConfigLoader.getValue("brokers")) 
      .option("group.id", ConfigLoader.getValue("groupId")) 
      .option("subscribe", ConfigLoader.getValue("topics")) 
      .option("failOnDataLoss", false); 
    Dataset<Row> rawDataSet = kafkaDataStreamReader.load(); 
    rawDataSet.printSchema(); 
    rawDataSet.createOrReplaceTempView("rawEventView1"); 

    rawDataSet = rawDataSet.withColumn("rawEventValue", rawDataSet.col("value").cast("string")); 
    rawDataSet.printSchema(); 
    rawDataSet.createOrReplaceTempView("eventView1"); 
    sqlCtx.sql("select * from eventView1") 
      .writeStream() 
      .format("csv") 
      .option("header", "true") 
      .option("delimiter", "~") 
      .option("checkpointLocation", ConfigLoader.getValue("checkpointPath")) 
      .option("path", ConfigLoader.getValue("recordsPath")) 
      .outputMode(OutputMode.Append()) 
      .trigger(ProcessingTime.create(Integer.parseInt(ConfigLoader.getValue("kafkaProcessingTime")) 
        , TimeUnit.SECONDS)) 
      .start() 
      .awaitTermination(); 
} 
+0

実行するコードは何ですか?それはどんな値も返さないので、それは副作用ですか?なぜそれは運転手で起こらなければならないのですか? – raam86

+0

これは、電子バッチが完了したことを電子メールで通知するようなものです。 – Manjesh

答えて

0

kafkaDataStreamReader.map{value -> mySideEffect(); value} 

これは私がこれを行うことはお勧めしませんどのようにこれまで、機能mySideEffectマイクロバッチがカフカから受信されるたびに呼び出されます、より良い方法は、あなたがCSVを維持するフォルダを見ることです、またはマイクロバッチがたった数秒ごとに電子メールでぶつかることを考えると、Web UIをチェックすることです。ストリーミングアプリケーションが起動していることを確認したい場合は、数秒ごとにスパークREST APIにクエリを実行して、それがまだ有効であることを確認してください。 https://spark.apache.org/docs/latest/monitoring.html

関連する問題