1

私はCloud Dataflow、PubSub & Bigqueryを使用してJSON Pubsubメッセージを読み込み、TableRowJsonCoderを使用してJSONをテーブルに変換してBigqueryに書き込みます。Cloud Dataflow、PubSub&Bigquery(TableRowJsonCoder)の問題

私の問題は一貫性があるため、以下のコードが動作することがあります。エラーはスローされません。私はPubsubのトピックに正しくメッセージを公開していることは確かです。私は、Dataflowが各メッセージを読んでいることも確信しています。私はgcloudコマンドラインツールを使ってこれをテストしました。私は、トピックに2つのサブスクリプションを持って

gcloud beta pubsub subscriptions pull --auto-ack SUBSCRIPTION-NAME 

、データフローによって読み取ら1とターミナルに私が読ん1。このコードは、JSONデータもテーブル形式にフォーマットして、それが私の指定されたデータセットとテーブルに書き込むのに成功しました:(

私の前提は何が起こっているのか分かりません。

私は50個のメッセージを送信すると仮定すると、データフローは要素の約半分しか読み込まないように見えますが、これは要素の存在と関係していますこの問題を解決する方法は?TableRowJSONCoderを使用してデータを読み取っています。

また、同じような問題が再び発生しているようですX要素の場合、そのうちのほんの一部だけがGroupbyキーを通過することになります。私がそれをさらにトラブルシューティングすることができれば、問題の私の説明はもっと深くなるでしょう。 「id」フィールドは常にunquieなので、複製とは関係ないと思いますが、間違っている可能性があります。

このメッセージを書いても、追加された要素は41 &に上昇しました。bigqueryへの出力は12に増加しました。私のテストデータは小さいです(常に100メッセージ以下)?たとえそれが最終的にすべての行を保存しても、1時間以上かかることはそれほど長くないようです。また、私には興味のThe succesfully inserted data

/* 
* Copyright (C) 2015 Google Inc. 
* 
* Licensed under the Apache License, Version 2.0 (the "License"); you may not 
* use this file except in compliance with the License. You may obtain a copy of 
* the License at 
* 
* http://www.apache.org/licenses/LICENSE-2.0 
* 
* Unless required by applicable law or agreed to in writing, software 
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
* License for the specific language governing permissions and limitations under 
* the License. 
*/ 

package com.example; 

import com.google.api.services.bigquery.model.TableFieldSchema; 
import com.google.api.services.bigquery.model.TableReference; 
import com.google.api.services.bigquery.model.TableRow; 
import com.google.api.services.bigquery.model.TableSchema; 
import com.google.cloud.dataflow.sdk.Pipeline; 
import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder; 
import com.google.cloud.dataflow.sdk.io.BigQueryIO; 
import com.google.cloud.dataflow.sdk.io.PubsubIO; 
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; 
import com.google.cloud.dataflow.sdk.options.PipelineOptions; 
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; 
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; 
import com.google.cloud.dataflow.sdk.transforms.windowing.Window; 
import com.google.cloud.dataflow.sdk.values.PCollection; 
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; 

import java.util.ArrayList; 
import java.util.List; 

import org.joda.time.Duration; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

/** 
* A starter example for writing Google Cloud Dataflow programs. 
* 
* <p>The example takes two strings, converts them to their upper-case 
* representation and logs them. 
* 
* <p>To run this starter example locally using DirectPipelineRunner, just 
* execute it without any additional parameters from your favorite development 
* environment. 
* 
* <p>To run this starter example using managed resource in Google Cloud 
* Platform, you should specify the following command-line options: 
* --project=<YOUR_PROJECT_ID> 
* --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE> 
* --runner=BlockingDataflowPipelineRunner 
*/ 
public class StarterPipeline { 

    private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class); 

    static final int WINDOW_SIZE = 1; // Default window duration in minutes 

    private final static String PROJECT_ID = "dataflow-project"; 
    private final static String PUBSUB_TOPIC = "projects/dataflow-project/topics/pub-sub-topic"; 
    private final static String DATASET_ID = "test_dataset"; 
    private final static String TABLE_ID = "test_table_version_one"; 


    private static TableSchema getSchema() { 
     List<TableFieldSchema> fields = new ArrayList<>(); 
     fields.add(new TableFieldSchema().setName("id").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("ip").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("installation_id").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("user_id").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("device_type").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("language").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("application_id").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("timestamp").setType("TIMESTAMP")); 
     TableSchema schema = new TableSchema().setFields(fields); 
     return schema; 
    } 

    private static TableReference getTableReference() { 
     TableReference tableRef = new TableReference(); 
     tableRef.setProjectId(PROJECT_ID); 
     tableRef.setDatasetId(DATASET_ID); 
     tableRef.setTableId(TABLE_ID); 
     return tableRef; 
    } 

    public static void main(String[] args) { 
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create(); 
    DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); 
    dataflowOptions.setStreaming(true); 
    Pipeline pipeline = Pipeline.create(dataflowOptions); 
    LOG.info("Reading from PubSub."); 
    PCollection<TableRow> input = pipeline 
     .apply(PubsubIO.Read.topic(PUBSUB_TOPIC).withCoder(TableRowJsonCoder.of())) 
      .apply(Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1)))); 
    input 
     .apply(BigQueryIO.Write.to(getTableReference()).withSchema(getSchema())); 

    pipeline.run(); 
    } 
} 

dataflow console

は、 "タイムスタンプ" と "ID" フィールドとタイムスタンプとレコードIDを指定するだろう。

+0

これは確かにはるかに高速です。これは、プロジェクトのネットワーク設定が誤って設定されている場合に発生します。私はあなたの仕事のjob_idを提供することができますので、私はさらに調査することができますか? タイムスタンプ/ IDの質問については、https://cloud.google.com/dataflow/model/pubsub-io#timestamps-and-record-ids – danielm

+0

@danielm 2017-01-23_09_48_10-1670593411236141809を参照してください。プロジェクト上記の-idは正しいものではありません。 –

+0

パイプラインを一晩実行した後、Pubsubの読み取りから63個の要素が追加され、17行が生成されました。ボトルネックはGroupByKeyで、Pubsubからの読み込みに長い時間がかかります。 –

答えて

0

問題は、GCE VMのネットワーク設定が間違っていることです。データフローでは、VMがTCP経由で通信できる必要があり、ファイアウォールルールでは許可されていません。通常、VM間のTCP接続を許可するためのファイアウォールルールを追加することでこれが解決されます。

一部のデータがパイプラインをゆっくりと通過している理由は、幸運にもデータが1台のマシンで処理される必要があることがあるからです。 Pubsubは最終的にタイムアウトしてメッセージを再試行するため、最終的にすべてが通過します。

関連する問題