0
各行がJsonストリングであるDataSetを持っていますJsonストリームをプリントするか、バッチごとにJsonストリームをカウントします。Spark Structured Streamingを使用してJsonでエンコードされたメッセージを印刷するには
ここに私のコードは、これまで
val ds = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers",bootstrapServers"))
.option("subscribe", topicName)
.option("checkpointLocation", hdfsCheckPointDir)
.load();
val ds1 = ds.select(from_json(col("value").cast("string"), schema) as 'payload)
val ds2 = ds1.select($"payload.info")
val query = ds2.writeStream.outputMode("append").queryName("table").format("memory").start()
query.awaitTermination()
select * from table; //I don't see anything and there are no errors. however when I run my kafka consumer separately(independent of spark I can see the data)
私の質問は本当に私はちょうど私が構造化されたストリーミングを使用してカフカから受け付けておりますデータを印刷実行するには何が必要ですされているのですか? KafkaのメッセージはJSONでエンコードされた文字列なので、jsonでエンコードされた文字列を構造体に変換し、最終的にはデータセットに変換しています。私はSpark 2.1.0を使用しています
Sparkのユーザーメーリングリスト(TD付き)についても議論していますか? 2つのユースケースがどのように異なっているかを調べようとしています。 –
こんにちは!はい、私たちはその変換を完了していません。私はちょうど構造化されたストリーミングを使用してデータを印刷しようとしています、そして私はそれで苦労しています:( – user1870400
私は 'val query = ds.writeStream.outputMode(" append ")。 ) 'しかし、それはどちらもうまくいかなかった – user1870400