2017-02-15 20 views
1

Google Cloud Storage(GCS)の約2GB(圧縮されていない10GB)のJSONログファイルを読み込み、書き出しようとしていますGoogle Cloud Dataflow(GCDF)を介してBigQuery(BQ)への日付分割表に変換します。大規模なgzip JSONファイルをDataflow経由でBigQueryに読み込む

各ファイルには7日間のデータが保存され、日付範囲全体は約2年(730日およびカウント)です。私の現在のパイプラインは次のようになります。

p.apply("Read logfile", TextIO.Read.from(bucket)) 
.apply("Repartition", Repartition.of()) 
.apply("Parse JSON", ParDo.of(new JacksonDeserializer())) 
.apply("Extract and attach timestamp", ParDo.of(new ExtractTimestamps())) 
.apply("Format output to TableRow", ParDo.of(new TableRowConverter())) 
.apply("Window into partitions", Window.into(new TablePartWindowFun())) 
.apply("Write to BigQuery", BigQueryIO.Write 
     .to(new DayPartitionFunc("someproject:somedataset", tableName)) 
     .withSchema(TableRowConverter.getSchema()) 
     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); 

再分割は、私がパイプラインreshuffle after decompressingをしようとしている間に構築したものです、私は、それなしでパイプラインを実行しようとしています。 JSONの解析は、Jackon ObjectMapperと、対応するクラスhereのクラスを介して動作します。 TablePartWindowFunはhereから取り出され、PCollectionの各エントリにパーティションを割り当てるために使用されます。

パイプラインは、小さすぎるファイルに対しても機能しますが、実際のデータセットでは破損しません。私は十分な大きさのマシンタイプを選択し、n1-highmem-16マシンの最大100までの自動スケーリングを使用するだけでなく、最大数のワーカーを設定しようとしました。私はストリーミングとバッチモードとdisSizeGbの値を最大250GBから1ワーカーあたり1200GBにしようとしました。 GCS上のすべてのファイル

  1. 解凍し、そしてGCSのgzip transcoding
  2. を活用することはできませんように、労働者間の動的な作業分割を有効にする:

    私は、現時点での考えることができる可能な解決策がありますループ内に多数の並列パイプラインを構築し、各パイプラインは90個のファイルのサブセットのみを処理します。

オプション2は、フレームワークをプログラミングするのが好きなようですが、別の解決策はありますか?

補遺:再分割して

12人の労働者と100人の労働者(タイプN1-HIGHMEM-4の)最大、約1時間のパイプラインを実行して、バッチモードで GZIP JSONファイルを読んだ後とレディングとレパティションの第一段階を終わらせる。その後、最大100人の作業者をスケーリングし、再分割されたPCollectionを処理します。この段階に到達したとき、最初のそれは150万要素/ sまでの処理だ

Write to BQ Service Graph

興味深いことに、その後、進展はのOutputCollectionのサイズ0にダウン:それが行われた後のグラフは次のようになります図のGroupByKeyのステップが最初に上がり、その後約3億から0に減少します(合計で約18億の要素があります)。それは何かを捨てているようです。また、最後にExpandIterableParDo(Streaming Write)の実行時間は0です。画像は「後方に」実行する前に若干表示されます。 作業員のログには、com.google.api.client.http.HttpTransportロガーから届いているexception thrown while executing requestメッセージが表示されますが、Stackdriverで詳細情報が見つかりません。再分割なし

パイプラインがまったく同じステップで、メモリエラーのうちでn1-highmem-2のインスタンスを使用して失敗した(GroupByKey後にすべてを)を読んだ後 - 大きなインスタンスタイプを使用して

java.util.concurrent.ExecutionException: java.io.IOException: 
CANCELLED: Received RST_STREAM with error code 8 dataflow-...-harness-5l3s 
talking to frontendpipeline-..-harness-pc98:12346 
+0

_ "実際のデータセットは壊れています" _ - どういうことが起こりますか?どのようなエラーが発生しますか? –

+0

エラーの例を追加しました。 – Tobi

+0

私はRepartitionステップなしで別の例を追加しました – Tobi

答えて

1

Google Cloud DataflowチームのDanのおかげで、彼はhereを提供してくれたので、私はこの問題を解決できました。唯一の変更は、私が作っ:

  • がシステムを圧倒しないように、他の後に1つのパイプラインを実行し、175 =(25週)大きなチャンクでの日にわたってループ。ループでは、以前の繰り返しの最後のファイルが再処理され、startDateが元のデータと同じ速度(175日)で前方に移動することを確認します。 WriteDisposition.WRITE_TRUNCATEが使用されているため、チャンクの最後の不完全な日は、このように正しい完全なデータで上書きされます。私のデータはUTC

  • にないよう再分割/再シャッフルを使用して

  • は、プロセスをスピードアップし、代わりにインスタントタイプの日時を使用してスムーズな自動スケーリングに

  • を可能にするために、gzipで圧縮されたファイルを読んだ後、上記の変換します

UPDATE(Apacheのビーム2.0):Apacheのビーム2.0のリリースに伴い

ソリューションは、はるかに簡単になりました。 BigQuery出力テーブルのシャーディングがサポートされるようになりました。out of the box

0

のような例外につながることであってもよいですパイプラインを実行するときにより大きい値の--numWorkersを設定することで、パイプラインに多くのリソースを割り当てることに価値があります。これは、「一般的なエラーと動作のコース」のサブ章で、オンラインでdocumentの「パイプラインのトラブルシューティング」で説明されている解決策の1つです。

+0

私はすでにこれを試していますが、最大のマシンタイプを使用しても失敗します。 – Tobi

関連する問題