2017-05-24 6 views
1

私はJSON形式のレコードでカフカのトピックを作成しました。Spark StreamingとPythonを使用してKafkaからJSONレコードを消費する方法は?

私はkafka-console-consumer.shを使用して、これらのJSON文字列を消費することができる午前:

./kafka-console-consumer.sh --new-consumer \ 
    --topic test \ 
    --from-beginning \ 
    --bootstrap-server host:9092 \ 
    --consumer.config /root/client.properties 

は、どのように私はPythonでスパークストリーミングを使用してこれを行うことができますか?

答えて

2

Doh、なぜPythonはScalaではないのですか?そして、あなたの家の練習はAdvanced Sourcesから、その後;-)

をPythonのに以下のコードを書き換えることになるだろう。これらのソースのうち、カフカ、キネシスや水路、

スパーク2.1.1のとおりPython APIで利用できます。 KafkaUtils.createDirectStreamを使用してSpark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)で説明したようにspark-streaming-kafka-0-10_2.11ライブラリを使用してカフカトピックから

読むメッセージ:

基本的には、プロセスがにあります。あなたは、シリアル化の問題に直面することはありませんので、map演算子を使用して値

import org.apache.kafka.clients.consumer.ConsumerRecord 
import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark.streaming.kafka010._ 
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 

val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "localhost:9092,anotherhost:9092", 
    "key.deserializer" -> classOf[StringDeserializer], 
    "value.deserializer" -> classOf[StringDeserializer], 
    "group.id" -> "use_a_separate_group_id_for_each_stream", 
    "auto.offset.reset" -> "latest", 
    "enable.auto.commit" -> (false: java.lang.Boolean) 
) 

val topics = Array("topicA", "topicB") 
val stream = KafkaUtils.createDirectStream[String, String](
    streamingContext, 
    PreferConsistent, 
    Subscribe[String, String](topics, kafkaParams) 
) 

コピーConsumerRecords。

stream.map(record => (record.key, record.value)) 

あなたは鍵を送信しない場合は、単にrecord.valueで十分です。

stream.map(record => record.value) 

あなたが値を持ったら、あなたがfrom_json機能を使用し、JSONに文字列メッセージを変換:

from_json(E:カラム、スキーマ:StructType)は、JSON文字列を含む列を解析します指定されたスキーマを持つStructTypeに変換します。解析できない文字列の場合はnullを返します。

のようになります。コードは次のとおりです。

...foreach { rdd => 
    messagesRDD.toDF. 
    withColumn("json", from_json('value, jsonSchema)). 
    select("json.*").show(false) 
} 

が完了します!

+1

タイムアウトを取って答えを提供してくれてありがとう、私はそれを試み、あなたに知らせるでしょう。 –

+0

@pratikrudra解決策はあなたのために働いていますか? –

関連する問題