2017-05-12 10 views
4

私はスパーク2.1を使用します。Javaで構造化ストリーミングを使用してKafkaからレコードを逆シリアル化する方法は?

私はSpark Structured Streamingを使用してKafkaからレコードを読み込み、逆シリアル化してから集計を適用しようとしています。

私は次のコードを持っている:私が欲しいもの

SparkSession spark = SparkSession 
      .builder() 
      .appName("Statistics") 
      .getOrCreate(); 

    Dataset<Row> df = spark 
      .readStream() 
      .format("kafka") 
      .option("kafka.bootstrap.servers", kafkaUri) 
      .option("subscribe", "Statistics") 
      .option("startingOffsets", "earliest") 
      .load(); 

    df.selectExpr("CAST(value AS STRING)") 

ではなくStringとしてキャストする私のオブジェクトにvalueフィールドをデシリアライズすることです。

私はこれのためのカスタムデシリアライザを持っています。

public StatisticsRecord deserialize(String s, byte[] bytes) 

Javaでこれを行うにはどうすればよいですか?


私が見つけた唯一の関連リンクはhttps://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.htmlですが、これはScala向けです。

+0

メッセージはJSON形式ですか? – abaghel

+0

データをJSONで保存するか、カスタムシリアライザ経由でシリアル化することができます。 – dchar

答えて

2

JSONメッセージのスキーマを定義します。

StructType schema = DataTypes.createStructType(new StructField[] { 
       DataTypes.createStructField("Id", DataTypes.IntegerType, false), 
       DataTypes.createStructField("Name", DataTypes.StringType, false), 
       DataTypes.createStructField("DOB", DataTypes.DateType, false) }); 

今読むメッセージは次のとおりです。 MessageDataはJSONメッセージのJavaBeanです。あなたのデータのためのJavaのカスタムデシリアライザを持っている場合は

Dataset<MessageData> df = spark 
      .readStream() 
      .format("kafka") 
      .option("kafka.bootstrap.servers", kafkaUri) 
      .option("subscribe", "Statistics") 
      .option("startingOffsets", "earliest") 
      .load() 
      .selectExpr("CAST(value AS STRING) as message") 
      .select(functions.from_json(functions.col("message"),schema).as("json")) 
      .select("json.*") 
      .as(Encoders.bean(MessageData.class)); 
+1

スキーマが正しく適用されていますが、すべての列に対してNULL値が返されます。私はdf.createOrReplaceTempView( "data")として列を読み込もうとしています。 StreamingQuery query = spark.sql( "SELECT * FROM data")。writeStream()。フォーマット( "コンソール")。私は何か間違っているのですか? – dchar

+0

データセット dfは以下のように直接読み取ることができます。 df.writeStream()。format( "console")。start(); – abaghel

+1

これは全く同じ結果をもたらしました。私はすべての列に "null"を含む上位20行を表示します。 – dchar

2

は、あなたがload後カフカから入手バイトにそれを使用しています。

df.select("value") 

そのラインはあなただけの単一の列valueDataset<Row>を与えます。


私はスカラ座のSpark APIを排他的に私は、「直列化復元」の場合処理するために、Scalaで次の操作を実行したいです:

import org.apache.spark.sql.Encoders 
implicit val statisticsRecordEncoder = Encoders.product[StatisticsRecord] 
val myDeserializerUDF = udf { bytes => deserialize("hello", bytes) } 
df.select(myDeserializerUDF($"value") as "value_des") 

何をしたいあなたを与える必要があります... Scalaで

カスタムオブジェクトにエンコーダを使用可能にする必要があること、またはSpark SQLはそのオブジェクトをデータセット内に置くことを拒否します。

関連する問題