私はkafkaコンシューマで次のようなspark SQL/Streamingクエリを持っています。バッチサイズが特定のサイズNに達したときにフェッチを条件付きにするように指定するにはどうすればいいですか?そうしないと、少なくともN個の行がある場合にのみストリーム処理を実行する方法は?
Dataset<VideoEventData> ds = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"))
.option("subscribe", prop.getProperty("kafka.topic"))
.option("kafka.max.partition.fetch.bytes", prop.getProperty("kafka.max.partition.fetch.bytes"))
.option("kafka.max.poll.records", prop.getProperty("kafka.max.poll.records"))
.load()
.selectExpr("CAST(value AS STRING) as message")
.select(functions.from_json(functions.col("message"),schema).as("json"))
.select("json.*")
.as(Encoders.bean(VideoEventData.class));
ありがとう私は解答#2を与えるでしょう – HODEH
@HODEH答えを受け入れてくれてありがとう...私はあなたが私がベースオフだったかを見て同じ自分を考えているようにあなたの開発に私を更新してください:) –
私はまだ解決策2を実装することができませんあなたはいくつかの助けを私に提供することができます@ JacekLaskowski – HODEH