Spark 1.6のDataFrames APIを使用してSpark Streamingアプリケーションを構築しようとしています。私がウサギの穴をあまりにも遠くまで掘り下げる前に、私は誰かが私にDataFramesがスキーマの異なるデータを扱う方法を理解するのを助けてくれることを願っていました。Spark Streamingアプリケーションの実行中にスキーマの変更を処理する
考えられるのは、メッセージがAvroスキーマを使用してカフカに流入するという考えです。ストリーミングアプリケーションを再起動することなく、スキーマを下位互換性のある方法で進化させることができるはずです(アプリケーションロジックは引き続き機能します)。
スキーマレジストリとKafkaUtilsを使用してメッセージに埋め込まれたスキーマIDを使用して直接ストリームを作成し、AvroKafkaDecoder(Confluentから)を使用して新しいバージョンのメッセージを簡単に非直列化するように見えます。それはDStreamを持っている限り私を得る。
問題#1: そのDStreamには、異なるバージョンのスキーマを持つオブジェクトがあります。したがって、各オブジェクトをRowオブジェクトに変換する際には、データを正しく移行するための最新のリーダースキーマを渡す必要があります。最新のスキーマをsqlContext.createDataFrame(rowRdd、schema)呼び出しに渡す必要があります。 DStream内のオブジェクトはGenericData.Record型であり、最新のバージョンがどれであるかを簡単に知る方法はありません。私は2つの解決策を見ています。一つは、すべてのマイクロバッチでスキーマの最新バージョンを取得するためにスキーマレジストリを呼び出すことです。もう1つは、スキーマIDを添付するようにデコーダを変更することです。私は最大のidを見つけてローカルキャッシュからスキーマを取得するために、rddを繰り返し実行することができました。
私は誰かがすでにこの方法を再利用可能な方法でうまく解決してくれることを望んでいました。
問題/質問#2: スパークは、各パーティションのためにカフカから別のエグゼクティブを引き出す予定です。あるエグゼキュータが別の「最新の」スキーマを他のエキスパートと受け取った場合、アプリケーションにはどうなりますか。 1つのエグゼキュータによって作成されたDataFrameは、同じ時間枠の別のスキーマとは異なるスキーマを持ちます。私は実際にこれが本当の問題かどうかわからない。私はデータの流れを視覚化するのに問題があり、どのような種類の操作が問題を引き起こすのでしょうか。それが問題であれば、エグゼキュータとそれが複雑で非効率的に聞こえるいくつかのデータ共有が必要であることを意味します。
私はこれについて心配する必要はありますか?スキーマの違いを解決するには?
おかげで、 --Ben