私はCloud Dataflow、PubSub & Bigqueryを使用してJSON Pubsubメッセージを読み込み、TableRowJsonCoderを使用してJSONをテーブルに変換してBigqueryに書き込みます。Cloud Dataflow、PubSub&Bigquery(TableRowJsonCoder)の問題
私の問題は一貫性があるため、以下のコードが動作することがあります。エラーはスローされません。私はPubsubのトピックに正しくメッセージを公開していることは確かです。私は、Dataflowが各メッセージを読んでいることも確信しています。私はgcloudコマンドラインツールを使ってこれをテストしました。私は、トピックに2つのサブスクリプションを持って
gcloud beta pubsub subscriptions pull --auto-ack SUBSCRIPTION-NAME
、データフローによって読み取ら1とターミナルに私が読ん1。このコードは、JSONデータもテーブル形式にフォーマットして、それが私の指定されたデータセットとテーブルに書き込むのに成功しました:(
私の前提は何が起こっているのか分かりません。
私は50個のメッセージを送信すると仮定すると、データフローは要素の約半分しか読み込まないように見えますが、これは要素の存在と関係していますこの問題を解決する方法は?TableRowJSONCoderを使用してデータを読み取っています。
また、同じような問題が再び発生しているようですX要素の場合、そのうちのほんの一部だけがGroupbyキーを通過することになります。私がそれをさらにトラブルシューティングすることができれば、問題の私の説明はもっと深くなるでしょう。 「id」フィールドは常にunquieなので、複製とは関係ないと思いますが、間違っている可能性があります。
このメッセージを書いても、追加された要素は41 &に上昇しました。bigqueryへの出力は12に増加しました。私のテストデータは小さいです(常に100メッセージ以下)?たとえそれが最終的にすべての行を保存しても、1時間以上かかることはそれほど長くないようです。また、私には興味のThe succesfully inserted data
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.example;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import java.util.ArrayList;
import java.util.List;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A starter example for writing Google Cloud Dataflow programs.
*
* <p>The example takes two strings, converts them to their upper-case
* representation and logs them.
*
* <p>To run this starter example locally using DirectPipelineRunner, just
* execute it without any additional parameters from your favorite development
* environment.
*
* <p>To run this starter example using managed resource in Google Cloud
* Platform, you should specify the following command-line options:
* --project=<YOUR_PROJECT_ID>
* --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
* --runner=BlockingDataflowPipelineRunner
*/
public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
static final int WINDOW_SIZE = 1; // Default window duration in minutes
private final static String PROJECT_ID = "dataflow-project";
private final static String PUBSUB_TOPIC = "projects/dataflow-project/topics/pub-sub-topic";
private final static String DATASET_ID = "test_dataset";
private final static String TABLE_ID = "test_table_version_one";
private static TableSchema getSchema() {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("id").setType("STRING"));
fields.add(new TableFieldSchema().setName("ip").setType("STRING"));
fields.add(new TableFieldSchema().setName("installation_id").setType("STRING"));
fields.add(new TableFieldSchema().setName("user_id").setType("STRING"));
fields.add(new TableFieldSchema().setName("device_type").setType("STRING"));
fields.add(new TableFieldSchema().setName("language").setType("STRING"));
fields.add(new TableFieldSchema().setName("application_id").setType("STRING"));
fields.add(new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"));
TableSchema schema = new TableSchema().setFields(fields);
return schema;
}
private static TableReference getTableReference() {
TableReference tableRef = new TableReference();
tableRef.setProjectId(PROJECT_ID);
tableRef.setDatasetId(DATASET_ID);
tableRef.setTableId(TABLE_ID);
return tableRef;
}
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setStreaming(true);
Pipeline pipeline = Pipeline.create(dataflowOptions);
LOG.info("Reading from PubSub.");
PCollection<TableRow> input = pipeline
.apply(PubsubIO.Read.topic(PUBSUB_TOPIC).withCoder(TableRowJsonCoder.of()))
.apply(Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1))));
input
.apply(BigQueryIO.Write.to(getTableReference()).withSchema(getSchema()));
pipeline.run();
}
}
は、 "タイムスタンプ" と "ID" フィールドとタイムスタンプとレコードIDを指定するだろう。
これは確かにはるかに高速です。これは、プロジェクトのネットワーク設定が誤って設定されている場合に発生します。私はあなたの仕事のjob_idを提供することができますので、私はさらに調査することができますか? タイムスタンプ/ IDの質問については、https://cloud.google.com/dataflow/model/pubsub-io#timestamps-and-record-ids – danielm
@danielm 2017-01-23_09_48_10-1670593411236141809を参照してください。プロジェクト上記の-idは正しいものではありません。 –
パイプラインを一晩実行した後、Pubsubの読み取りから63個の要素が追加され、17行が生成されました。ボトルネックはGroupByKeyで、Pubsubからの読み込みに長い時間がかかります。 –