Google Cloud Storage(GCS)の約2GB(圧縮されていない10GB)のJSONログファイルを読み込み、書き出しようとしていますGoogle Cloud Dataflow(GCDF)を介してBigQuery(BQ)への日付分割表に変換します。大規模なgzip JSONファイルをDataflow経由でBigQueryに読み込む
各ファイルには7日間のデータが保存され、日付範囲全体は約2年(730日およびカウント)です。私の現在のパイプラインは次のようになります。
p.apply("Read logfile", TextIO.Read.from(bucket))
.apply("Repartition", Repartition.of())
.apply("Parse JSON", ParDo.of(new JacksonDeserializer()))
.apply("Extract and attach timestamp", ParDo.of(new ExtractTimestamps()))
.apply("Format output to TableRow", ParDo.of(new TableRowConverter()))
.apply("Window into partitions", Window.into(new TablePartWindowFun()))
.apply("Write to BigQuery", BigQueryIO.Write
.to(new DayPartitionFunc("someproject:somedataset", tableName))
.withSchema(TableRowConverter.getSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
再分割は、私がパイプラインreshuffle after decompressingをしようとしている間に構築したものです、私は、それなしでパイプラインを実行しようとしています。 JSONの解析は、Jackon ObjectMapperと、対応するクラスhereのクラスを介して動作します。 TablePartWindowFunはhereから取り出され、PCollectionの各エントリにパーティションを割り当てるために使用されます。
パイプラインは、小さすぎるファイルに対しても機能しますが、実際のデータセットでは破損しません。私は十分な大きさのマシンタイプを選択し、n1-highmem-16マシンの最大100までの自動スケーリングを使用するだけでなく、最大数のワーカーを設定しようとしました。私はストリーミングとバッチモードとdisSizeGbの値を最大250GBから1ワーカーあたり1200GBにしようとしました。 GCS上のすべてのファイル
- 解凍し、そしてGCSのgzip transcoding
- を活用することはできませんように、労働者間の動的な作業分割を有効にする:
私は、現時点での考えることができる可能な解決策がありますループ内に多数の並列パイプラインを構築し、各パイプラインは90個のファイルのサブセットのみを処理します。
オプション2は、フレームワークをプログラミングするのが好きなようですが、別の解決策はありますか?
補遺:再分割して
12人の労働者と100人の労働者(タイプN1-HIGHMEM-4の)最大、約1時間のパイプラインを実行して、バッチモードで GZIP JSONファイルを読んだ後とレディングとレパティションの第一段階を終わらせる。その後、最大100人の作業者をスケーリングし、再分割されたPCollectionを処理します。この段階に到達したとき、最初のそれは150万要素/ sまでの処理だ
興味深いことに、その後、進展はのOutputCollectionのサイズ0にダウン:それが行われた後のグラフは次のようになります図のGroupByKeyのステップが最初に上がり、その後約3億から0に減少します(合計で約18億の要素があります)。それは何かを捨てているようです。また、最後にExpandIterable
とParDo(Streaming Write)
の実行時間は0です。画像は「後方に」実行する前に若干表示されます。 作業員のログには、com.google.api.client.http.HttpTransport
ロガーから届いているexception thrown while executing request
メッセージが表示されますが、Stackdriverで詳細情報が見つかりません。再分割なし
パイプラインがまったく同じステップで、メモリエラーのうちで
n1-highmem-2
のインスタンスを使用して失敗した(GroupByKey
後にすべてを)を読んだ後 - 大きなインスタンスタイプを使用して
java.util.concurrent.ExecutionException: java.io.IOException:
CANCELLED: Received RST_STREAM with error code 8 dataflow-...-harness-5l3s
talking to frontendpipeline-..-harness-pc98:12346
_ "実際のデータセットは壊れています" _ - どういうことが起こりますか?どのようなエラーが発生しますか? –
エラーの例を追加しました。 – Tobi
私はRepartitionステップなしで別の例を追加しました – Tobi