Spark 2.0.2、Kafka 0.11.0、および を使用しています。スパークストリーミングでkafkaからのメッセージを消費しようとしています。スキーマを使用して、spark-kafkaのConsumerRecord値をDataframeに変換します
val topics = "notes"
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:7092",
"schema.registry.url" -> "http://localhost:7070",
"group.id" -> "connect-cluster1",
"value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
"key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer"
)
val topicSet: Set[String] = Set(topics)
val stream = KafkaUtils.createDirectStream[String, String](
SparkStream.ssc,
PreferConsistent,
Subscribe[String, String](topicSet, kafkaParams)
)
stream.foreachRDD (rdd => {
rdd.foreachPartition(iterator => {
while (iterator.hasNext) {
val next = iterator.next()
println(next.value())
}
})
})
カフカメッセージレコードが含まれている場合、出力は次のようになります:コードされ、次の
{"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee", "createdat": 1505312886984, "createdby": "karthik", "notes": "testing20"}
{"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee", "createdat": 1505312890472, "createdby": "karthik", "notes": "testing21"}
このように、メッセージを受け取っがアブロはconsumerRecordの値から分かるようにデコードされます。 今、私はデータフレーム形式でそれらのレコードが必要ですが、以下のように、私は手でさえスキーマで、ここから続行するのか分からない:
val sr : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070", 1000)
val m = sr.getLatestSchemaMetadata(topics + "-value")
val schemaId = m.getId
val schemaString = m.getSchema
val schemaRegistry : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070", 1000)
val decoder: KafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry)
val parser = new Schema.Parser()
val avroSchema = parser.parse(schemaString)
println(avroSchema)
次のように印刷されたスキーマを使用して:
{"type":"record","name":"notes","namespace":"db","fields":[{"name":"id","type":["null","string"],"default":null},{"name":"createdat","type":["null",{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}],"default":null},{"name":"createdby","type":["null","string"],"default":null},{"name":"notes","type":["null","string"],"default":null}],"connect.name":"db.notes"}
コンシューマーレコードの価値からデータフレームを取得する方法を理解できる人はいますか?私はUse schema to convert AVRO messages with Spark to DataFrame、Handling schema changes in running Spark Streaming applicationのような他の質問を見ましたが、最初の場所ではconsumerRecordを扱っていません。
: ストリームは、消費者レコードのDSTREAMはkafka010のkafkaUtils APIから返されます。これを理解できましたか? –