Doh、なぜPythonはScalaではないのですか?そして、あなたの家の練習はAdvanced Sourcesから、その後;-)
をPythonのに以下のコードを書き換えることになるだろう。これらのソースのうち、カフカ、キネシスや水路、
スパーク2.1.1のとおりPython APIで利用できます。 KafkaUtils.createDirectStream
を使用してSpark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)で説明したようにspark-streaming-kafka-0-10_2.11
ライブラリを使用してカフカトピックから
読むメッセージ:
基本的には、プロセスがにあります。あなたは、シリアル化の問題に直面することはありませんので、map
演算子を使用して値へ
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
コピーConsumerRecords。
stream.map(record => (record.key, record.value))
あなたは鍵を送信しない場合は、単にrecord.value
で十分です。
stream.map(record => record.value)
あなたが値を持ったら、あなたがfrom_json機能を使用し、JSONに文字列メッセージを変換:
from_json(E:カラム、スキーマ:StructType)は、JSON文字列を含む列を解析します指定されたスキーマを持つStructType
に変換します。解析できない文字列の場合はnull
を返します。
のようになります。コードは次のとおりです。
...foreach { rdd =>
messagesRDD.toDF.
withColumn("json", from_json('value, jsonSchema)).
select("json.*").show(false)
}
が完了します!
タイムアウトを取って答えを提供してくれてありがとう、私はそれを試み、あなたに知らせるでしょう。 –
@pratikrudra解決策はあなたのために働いていますか? –