2017-09-13 18 views
2

Spark 2.0.2、Kafka 0.11.0、および を使用しています。スパークストリーミングでkafkaからのメッセージを消費しようとしています。スキーマを使用して、spark-kafkaのConsumerRecord値をDataframeに変換します

val topics = "notes" 
val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "localhost:7092", 
    "schema.registry.url" -> "http://localhost:7070", 
    "group.id" -> "connect-cluster1", 
    "value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer", 
    "key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer" 
) 
val topicSet: Set[String] = Set(topics) 
val stream = KafkaUtils.createDirectStream[String, String](
    SparkStream.ssc, 
    PreferConsistent, 
    Subscribe[String, String](topicSet, kafkaParams) 
) 
stream.foreachRDD (rdd => { 
    rdd.foreachPartition(iterator => { 
    while (iterator.hasNext) { 
     val next = iterator.next() 
     println(next.value()) 
    } 
    }) 
}) 

カフカメッセージレコードが含まれている場合、出力は次のようになります:コードされ、次の

{"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee", "createdat": 1505312886984, "createdby": "karthik", "notes": "testing20"} 
{"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee", "createdat": 1505312890472, "createdby": "karthik", "notes": "testing21"} 

このように、メッセージを受け取っがアブロはconsumerRecordの値から分かるようにデコードされます。 今、私はデータフレーム形式でそれらのレコードが必要ですが、以下のように、私は手でさえスキーマで、ここから続行するのか分からない:

val sr : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070", 1000) 
val m = sr.getLatestSchemaMetadata(topics + "-value") 
val schemaId = m.getId 
val schemaString = m.getSchema 

val schemaRegistry : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070", 1000) 
val decoder: KafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry) 
val parser = new Schema.Parser() 
val avroSchema = parser.parse(schemaString) 
println(avroSchema) 

次のように印刷されたスキーマを使用して:

{"type":"record","name":"notes","namespace":"db","fields":[{"name":"id","type":["null","string"],"default":null},{"name":"createdat","type":["null",{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}],"default":null},{"name":"createdby","type":["null","string"],"default":null},{"name":"notes","type":["null","string"],"default":null}],"connect.name":"db.notes"}

コンシューマーレコードの価値からデータフレームを取得する方法を理解できる人はいますか?私はUse schema to convert AVRO messages with Spark to DataFrameHandling schema changes in running Spark Streaming applicationのような他の質問を見ましたが、最初の場所ではconsumerRecordを扱っていません。

+0

: ストリームは、消費者レコードのDSTREAMはkafka010のkafkaUtils APIから返されます。これを理解できましたか? –

答えて

0

私はscala \ kafka \ sparkを初めて使ったので、これが正確に質問に答えるかどうかはわかりませんが、それは私を助けてくれました。私はこれよりも良い方法があると確信していますので、より多くの経験を積んだ人が来て、より良い答えを出すことができたらうれしいです。

// KafkaRDD 
stream.foreachRDD { rdd => { 

    // pull the values I'm looking for into a string array 
    var x = rdd.map(row => row.value()).collect() 

    // convert to dataframe 
    val df = spark.createDataFrame(x).toDF("record") 

    // write data frame to datastore (MySQL in my case) 
    df.write 
    .mode(SaveMode.Append) 
    .jdbc(url, table, props) 

    } 
} 
1

あなたはスニペットの下に使用することができます:私は似たような状況に実行しているよ

stream.foreachRDD(rdd => 
    if (!rdd.isEmpty()) { 
     val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) 
     import sqlContext.implicits._ 
     val topicValueStrings = rdd.map(record => (record.value()).toString) 
     val df = sqlContext.read.json(topicValueStrings) 
     df.show() 
    }) 
関連する問題