2016-12-16 15 views
3

Spark 1.6のDataFrames APIを使用してSpark Streamingアプリケーションを構築しようとしています。私がウサギの穴をあまりにも遠くまで掘り下げる前に、私は誰かが私にDataFramesがスキーマの異なるデータを扱う方法を理解するのを助けてくれることを願っていました。Spark Streamingアプリケーションの実行中にスキーマの変更を処理する

考えられるのは、メッセージがAvroスキーマを使用してカフカに流入するという考えです。ストリーミングアプリケーションを再起動することなく、スキーマを下位互換性のある方法で進化させることができるはずです(アプリケーションロジックは引き続き機能します)。

スキーマレジストリとKafkaUtilsを使用してメッセージに埋め込まれたスキーマIDを使用して直接ストリームを作成し、AvroKafkaDecoder(Confluentから)を使用して新しいバージョンのメッセージを簡単に非直列化するように見えます。それはDStreamを持っている限り私を得る。

問題#1: そのDStreamには、異なるバージョンのスキーマを持つオブジェクトがあります。したがって、各オブジェクトをRowオブジェクトに変換する際には、データを正しく移行するための最新のリーダースキーマを渡す必要があります。最新のスキーマをsqlContext.createDataFrame(rowRdd、schema)呼び出しに渡す必要があります。 DStream内のオブジェクトはGenericData.Record型であり、最新のバージョンがどれであるかを簡単に知る方法はありません。私は2つの解決策を見ています。一つは、すべてのマイクロバッチでスキーマの最新バージョンを取得するためにスキーマレジストリを呼び出すことです。もう1つは、スキーマIDを添付するようにデコーダを変更することです。私は最大のidを見つけてローカルキャッシュからスキーマを取得するために、rddを繰り返し実行することができました。

私は誰かがすでにこの方法を再利用可能な方法でうまく解決してくれることを望んでいました。

問題/質問#2: スパークは、各パーティションのためにカフカから別のエグゼクティブを引き出す予定です。あるエグゼキュータが別の「最新の」スキーマを他のエキスパートと受け取った場合、アプリケーションにはどうなりますか。 1つのエグゼキュータによって作成されたDataFrameは、同じ時間枠の別のスキーマとは異なるスキーマを持ちます。私は実際にこれが本当の問題かどうかわからない。私はデータの流れを視覚化するのに問題があり、どのような種類の操作が問題を引き起こすのでしょうか。それが問題であれば、エグゼキュータとそれが複雑で非効率的に聞こえるいくつかのデータ共有が必要であることを意味します。

私はこれについて心配する必要はありますか?スキーマの違いを解決するには?

おかげで、 --Ben

答えて

2

私はこれを解決したと考えています。私はConfluentのスキーマレジストリとKafkaAvroDecoderを使用しています。簡略化されたコードは次のようになります。

// Get the latest schema here. This schema will be used inside the 
// closure below to ensure that all executors are using the same 
// version for this time slice. 
val sr : CachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000) 
val m = sr.getLatestSchemaMetadata(subject) 
val schemaId = m.getId 
val schemaString = m.getSchema 

val outRdd = rdd.mapPartitions(partitions => { 
    // Note: we cannot use the schema registry from above because this code 
    // will execute on remote machines, requiring the schema registry to be 
    // serialized. We could use a pool of these. 
    val schemaRegistry : CachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000) 
    val decoder: KafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry) 
    val parser = new Schema.Parser() 
    val avroSchema = parser.parse(schemaString) 
    val avroRecordConverter = AvroSchemaConverter.createConverterToSQL(avroSchema) 

    partitions.map(input => { 
    // Decode the message using the latest version of the schema. 
    // This will apply Avro's standard schema evolution rules 
    // (for compatible schemas) to migrate the message to the 
    // latest version of the schema. 
    val record = decoder.fromBytes(messageBytes, avroSchema).asInstanceOf[GenericData.Record] 
    // Convert record into a DataFrame with columns according to the schema 
    avroRecordConverter(record).asInstanceOf[Row] 
    }) 
}) 

// Get a Spark StructType representation of the schema to apply 
// to the DataFrame. 
val sparkSchema = AvroSchemaConverter.toSqlType(
     new Schema.Parser().parse(schemaString) 
    ).dataType.asInstanceOf[StructType] 
sqlContext.createDataFrame(outRdd, sparkSchema) 
関連する問題