私はスパーク2.1を使用します。Javaで構造化ストリーミングを使用してKafkaからレコードを逆シリアル化する方法は?
私はSpark Structured Streamingを使用してKafkaからレコードを読み込み、逆シリアル化してから集計を適用しようとしています。
私は次のコードを持っている:私が欲しいもの
SparkSession spark = SparkSession
.builder()
.appName("Statistics")
.getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaUri)
.option("subscribe", "Statistics")
.option("startingOffsets", "earliest")
.load();
df.selectExpr("CAST(value AS STRING)")
ではなくString
としてキャストする私のオブジェクトにvalue
フィールドをデシリアライズすることです。
私はこれのためのカスタムデシリアライザを持っています。
public StatisticsRecord deserialize(String s, byte[] bytes)
Javaでこれを行うにはどうすればよいですか?
私が見つけた唯一の関連リンクはhttps://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.htmlですが、これはScala向けです。
メッセージはJSON形式ですか? – abaghel
データをJSONで保存するか、カスタムシリアライザ経由でシリアル化することができます。 – dchar