2017-08-07 11 views
0

GCSにcsv(gzip圧縮)ファイルがあります。私はこれらのファイルを読んで、BigQueryにデータを送信したいと思います。Google Cloud Dataflow(Apache Beam) - ヘッダー付きのgzip形式のcsvファイルの処理方法

ヘッダー情報は変更できます(ただし、すべての列は事前にわかっています)。ヘッダーを削除するだけでは不十分です。最初の行を読み込み、残りの行に列情報を追加する必要があります。

どうすれば可能ですか?

まず、この記事のようなカスタムソースを実装する必要があると思います。
Reading CSV header with Dataflow
しかし、このソリューションでは、まずGzipをどのように圧縮解除できるかわかりません。どういうわけかをTextIOのように使用できますか? (私はパラメータcompression_typein a python Classを見つけましたが、私は、Javaを使用していますし、Java FileBasedSourceクラスに似たものを見つけることができませんでした。)

それはファイルが分割不可能になるので、私の場合には、それはありますが、また、私は(これにビットoverkillingを感じますはい)。

GoogleCloudStorageを使用して、ファイルとその最初の行を私のmain()ファンクションの最初の場所で直接読み取ってから、パイプラインに進むことができます。

データフローのヘッダーを利用している間にCSVファイルを読み取るためのベストプラクティス(データフロー方法)があるかどうかを確認したいのですが?

+0

TextIO変換をサブクラス化して特殊な動作を追加するのはかなり簡単です。 – Pablo

+0

@Pabloには、TextIOを拡張して特殊な動作を追加するために参照できる例がありますか?私はファイル処理の振る舞いを拡張するために 'Source'を使う必要があると思っていましたが、それを行う方法はわかっていましたが、' TextIO'のgzファイルの自動圧縮解除の利点を失いました。 –

答えて

1

あなたが正しく達成しようとしていることを理解していれば、ここでは、SideInput(、example)の回答が考えられます。これにより、ファイルの行ごとにヘッダを処理できるようになります。

一般的な考え方は、ヘッダーを個別のPCollectionViewとして発行し、これをサイドライン処理としてSideInputとして使用することです。 SideOutput(doc

質問を正しく読んでいると、ヘッダーの内容がファイルごとに異なるように聞こえることがあります。その場合は、View.asMapを使用して、各ファイルのヘッダーのマップを保持できます。残念ながら、現在読み込まれているファイル名を追跡することは、現在のところサポートされていませんが、this postで説明されている作業領域があります。

+0

返事ありがとうございます。 (1)最初にヘッダ行のみを抽出するPCollectionを作成する(2)Combine.globallyを使用してPCollectionViewとして変換し、View.asSingleton(3)はcsvから残りのすべての行(実際のデータ)を抽出する別のPCollectionを作成する私が間違っている場合は、私を修正してください。ヘッダーはあなたが理解するように実際にファイルごとに異なりますが、私はファイルごとにジョブを提出することに決めました。私たちの場合は受け入れられます。 –

+0

私はヘッダーを読んで上の方法でそれを利用することができます。ありがとう:) –

関連する問題