私はKafkaとSpark Structured Streamingを使用しています。私は以下の形式でカフカのメッセージを受け取ります。Spark Structured Streamingでバイナリデータを処理する
{"deviceId":"001","sNo":1,"data":"aaaaa"}
{"deviceId":"002","sNo":1,"data":"bbbbb"}
{"deviceId":"001","sNo":2,"data":"ccccc"}
{"deviceId":"002","sNo":2,"data":"ddddd"}
私は以下のようにそれを読んでいます。
Dataset<String> data = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
Dataset<DeviceData> ds = data.as(ExpressionEncoder.javaBean(DeviceData.class)).orderBy("deviceId","sNo");
ds.foreach(event ->
processData(event.getDeviceId(),event.getSNo(),event.getData().getBytes())
);}
private void processData(String deviceId,int SNo, byte[] data)
{
//How to check previous processed Dataset???
}
私のjsonメッセージでは、 "data"はbyte []の文字列形式です。私は与えられた "deviceId"のバイナリ "data"を "sNo"の順に処理する必要があるという要件があります。ですから "deviceId" = "001"の場合、 "sNo" = 1、 "sNo" = 2というようにバイナリデータを処理しなければなりません。ストラクチャードストリーミングで以前に処理されたデータセットの状態を確認するにはどうすればよいですか?サンプルやリンクは大きな助けになります。私はスパークする初心者ですので、私と一緒に裸をしてください。ありがとう。
これまでに何を試しましたか? – Jan
私のコードを更新しました。チェックしてください。私はorderByを実行していて、次にそれぞれのデータを処理しています。ストリーミングで受け取ったデータセットから現在のデータと前のデータを処理する方法をprocessDataメソッドで突き詰めました。 – user7615505