1

私はkafkaコンシューマで次のようなspark SQL/Streamingクエリを持っています。バッチサイズが特定のサイズNに達したときにフェッチを条件付きにするように指定するにはどうすればいいですか?そうしないと、少なくともN個の行がある場合にのみストリーム処理を実行する方法は?

Dataset<VideoEventData> ds = spark 
     .readStream() 
     .format("kafka") 
     .option("kafka.bootstrap.servers", prop.getProperty("kafka.bootstrap.servers")) 
     .option("subscribe", prop.getProperty("kafka.topic")) 
     .option("kafka.max.partition.fetch.bytes", prop.getProperty("kafka.max.partition.fetch.bytes")) 
     .option("kafka.max.poll.records", prop.getProperty("kafka.max.poll.records")) 
     .load() 
     .selectExpr("CAST(value AS STRING) as message") 
     .select(functions.from_json(functions.col("message"),schema).as("json")) 
     .select("json.*") 
     .as(Encoders.bean(VideoEventData.class)); 

答えて

1

は、私は私が箱から出してスパーク構造化ストリーミング(と一般的にはスパーク)でのことはできませんサイズのN

の正確なデータセットを持っていることをgurateedされた私のロジックを実行します。

次のオプションがあります。

  1. はカフカの消費者のプロパティを使用してカフカソースの後ろに座っているカフカの消費者を設定します。

  2. ステートフルアグリゲーションの一部として、行をバッファリングします。

  3. バッファリング自体を処理するカスタムソースを作成します。 2.については

私は希望Sourceストリーミングカスタムステートフルを実装3.あなたのサイズN

を与え、最終的だろう「塊」を超える蓄積する状態でKeyValueGroupedDataset.flatMapGroupsWithStateを使用することができますgetOffsetgetBatchを、getOffsetが少なくともNの行がある場合にのみオフセットを与えるように実装します。

免責事項:これまで私自身はどちらの解決方法も行っていませんでしたが、実用的に見えます。

+0

ありがとう私は解答#2を与えるでしょう – HODEH

+0

@HODEH答えを受け入れてくれてありがとう...私はあなたが私がベースオフだったかを見て同じ自分を考えているようにあなたの開発に私を更新してください:) –

+0

私はまだ解決策2を実装することができませんあなたはいくつかの助けを私に提供することができます@ JacekLaskowski – HODEH

0

あなたはカフカの消費者自体を構成することによってそれを行うことができます:私のロジックを実行するために、私がサイズNの現在のコードの正確なDataset<VideoEventData>を持っていることをgurateedされます。 fetch.min.bytesを最小値に設定してください。それはカフカに十分なデータがあるまで待つように指示します。

カフカの待ち時間を制御する関連設定fetch.max.wait.msがあります。この値はデフォルトで500ミリ秒です。あなたはhereをもっと読むことができます。

関連する問題