2017-05-25 18 views
1

Pubsubからメッセージを読み込み、Java用Apache Beam SDK 2.0.0を使用してTableRowsをBigQuery(BQ)に書き込むパイプラインを実装しています。BigQueryに書き込むときのGroupByKeyサブタスクの要素の累積Apache beam v2.0

これは、コードの関連部分である:

tableRowPCollection 
      .apply(BigQueryIO.writeTableRows().to(this.tableId) 
       .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) 
       .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); 

このコードは、データフローパイプラインのフードの下のタスク群を生成します。これらのタスクの1つはGroupByKeyです。この作業では、この印刷画面に表示されているように、パイプラインに要素を累積しています。 GBK elements accumulation image. ドキュメントを読んだ後、私はこの問題がウィンドウの設定に関係していると思われます。しかし、暗黙にであるので、私はウィンドウ構成を変更する方法を見つけることができませんでしたWindow.Assignは、再配置タスク内で変換します。

この暗黙的なウィンドウにウィンドウパラメータを設定したりトリガを付けたりする方法はありますか?またはBQにTableRowを挿入する独自のDoFnを作成する必要がありますか?

ありがとうございます!


[更新]

Iは約そしてGroupByKeyサブタスクが速くなり、互いに近似要素の数に来て出てくるが(時々あったこと後日間実行パイプラインを残し同じ)。さらに、私はまた、Watermarkが現在の日付に近づき、より速く増加していることに気付きました。そこで、「問題」が解決されました。

+0

問題点は何ですか? GroupByKeyは、特定のキーに関連付けられた値をグループ化します。そのため、N個のキーがある場合、N個の要素がGroupByKeyから出てくることになります。あなたが意図したとおりに働いているように、あなたが説明したことは聞こえる。 –

+0

@BenChambers要点は、BQにデータを挿入するのに時間がかかりすぎていないということです。私はこのサブタスクで使用されるトリガを選択したいと思う。 –

答えて

0

ReshuffleがBigQueryシンクに導入された待機はありません。むしろ、BigQueryに書き込む行のバッチを作成するために使用されます。 GroupByKeyから出てくる要素の数は、各出力要素が入力要素のバッチ(またはグループ)を表すため、小さくなります。

ExpandIterableReshuffleの出力)の出力として出てくる要素の総数を確認する必要があります。

+0

私はパイプラインを1日ほど走らせたままで、その後、 'GroupByKey'サブタスクはより速くなり、出入りする要素の数は互いに近似していました(時には同じでした)。さらに、私はまた、「透かし」が現在の日付に近づき、より速く増加していることに気付きました。 –

関連する問題