0

各行がJsonストリングであるDataSetを持っていますJsonストリームをプリントするか、バッチごとにJsonストリームをカウントします。Spark Structured Streamingを使用してJsonでエンコードされたメッセージを印刷するには

ここに私のコードは、これまで

val ds = sparkSession.readStream() 
       .format("kafka") 
       .option("kafka.bootstrap.servers",bootstrapServers")) 
       .option("subscribe", topicName) 
       .option("checkpointLocation", hdfsCheckPointDir) 
       .load(); 

val ds1 = ds.select(from_json(col("value").cast("string"), schema) as 'payload) 
val ds2 = ds1.select($"payload.info") 
val query = ds2.writeStream.outputMode("append").queryName("table").format("memory").start() 
query.awaitTermination() 

select * from table; //I don't see anything and there are no errors. however when I run my kafka consumer separately(independent of spark I can see the data) 

私の質問は本当に私はちょうど私が構造化されたストリーミングを使用してカフカから受け付けておりますデータを印刷実行するには何が必要ですされているのですか? KafkaのメッセージはJSONでエンコードされた文字列なので、jsonでエンコードされた文字列を構造体に変換し、最終的にはデータセットに変換しています。私はSpark 2.1.0を使用しています

+0

Sparkのユーザーメーリングリスト(TD付き)についても議論していますか? 2つのユースケースがどのように異なっているかを調べようとしています。 –

+0

こんにちは!はい、私たちはその変換を完了していません。私はちょうど構造化されたストリーミングを使用してデータを印刷しようとしています、そして私はそれで苦労しています:( – user1870400

+0

私は 'val query = ds.writeStream.outputMode(" append ")。 ) 'しかし、それはどちらもうまくいかなかった – user1870400

答えて

0
val ds1 = ds.select(from_json(col("value").cast("string"), schema) as payload).select($"payload.*") 

あなたのデータはコンソールに表示されます。

ds1.writeStream.format("console").option("truncate", "false").start().awaitTermination() 

は、常に状況のこれらの種類にawaitTermination()またはthread.Sleep(time in seconds)のようなものを使用します。

関連する問題