2017-02-24 17 views
0

私はKafkaとSpark Structured Streamingを使用しています。私は以下の形式でカフカのメッセージを受け取ります。Spark Structured Streamingでバイナリデータを処理する

{"deviceId":"001","sNo":1,"data":"aaaaa"} 
{"deviceId":"002","sNo":1,"data":"bbbbb"} 
{"deviceId":"001","sNo":2,"data":"ccccc"} 
{"deviceId":"002","sNo":2,"data":"ddddd"} 

私は以下のようにそれを読んでいます。

Dataset<String> data = spark 
     .readStream() 
     .format("kafka") 
     .option("kafka.bootstrap.servers", bootstrapServers) 
     .option(subscribeType, topics) 
     .load() 
     .selectExpr("CAST(value AS STRING)") 
     .as(Encoders.STRING()); 
Dataset<DeviceData> ds = data.as(ExpressionEncoder.javaBean(DeviceData.class)).orderBy("deviceId","sNo"); 
ds.foreach(event -> 
     processData(event.getDeviceId(),event.getSNo(),event.getData().getBytes()) 
);} 

private void processData(String deviceId,int SNo, byte[] data) 
{ 
    //How to check previous processed Dataset??? 
} 

私のjsonメッセージでは、 "data"はbyte []の文字列形式です。私は与えられた "deviceId"のバイナリ "data"を "sNo"の順に処理する必要があるという要件があります。ですから "deviceId" = "001"の場合、 "sNo" = 1、 "sNo" = 2というようにバイナリデータを処理しなければなりません。ストラクチャードストリーミングで以前に処理されたデータセットの状態を確認するにはどうすればよいですか?サンプルやリンクは大きな助けになります。私はスパークする初心者ですので、私と一緒に裸をしてください。ありがとう。

+0

これまでに何を試しましたか? – Jan

+0

私のコードを更新しました。チェックしてください。私はorderByを実行していて、次にそれぞれのデータを処理しています。ストリーミングで受け取ったデータセットから現在のデータと前のデータを処理する方法をprocessDataメソッドで突き詰めました。 – user7615505

答えて

関連する問題