2017-06-05 10 views

誰もがOKです。データフロー初心者からの別のデータフロー質問。 (ちょうど今週それを試し始めました..)データの書き込みに関するGoogle Cloud Dataflowの問題(TextIOまたはDatastoreIO)

私は製品名のリストをとり、オートコンプリートデータを生成するためのデータパイプを作成しています。データ処理部はすべて正常に動作しているようですが、データストアにデータを書き込むために最後の ".apply"を追加すると、私は構文エラーが発生します。次のことを言うIDE:






package com.client.autocomplete; 

import com.client.autocomplete.AutocompleteOptions; 

import com.google.datastore.v1.Entity; 
import com.google.datastore.v1.Key; 
import com.google.datastore.v1.Value; 

import static com.google.datastore.v1.client.DatastoreHelper.makeKey; 
import static com.google.datastore.v1.client.DatastoreHelper.makeValue; 
import org.apache.beam.sdk.coders.DefaultCoder; 

import org.apache.beam.sdk.Pipeline; 
import org.apache.beam.sdk.PipelineResult; 
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; 
import org.apache.beam.sdk.values.PCollection; 
import org.apache.beam.sdk.values.PCollectionList; 
import com.google.api.services.bigquery.model.TableRow; 
import com.google.common.base.MoreObjects; 
import org.apache.beam.sdk.io.TextIO; 
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO; 
import org.apache.beam.sdk.options.PipelineOptionsFactory; 
import org.apache.beam.sdk.transforms.PTransform; 
import org.apache.beam.sdk.transforms.Create; 
import org.apache.beam.sdk.transforms.DoFn; 
import org.apache.beam.sdk.transforms.MapElements; 
import org.apache.beam.sdk.transforms.ParDo; 
import org.apache.beam.sdk.transforms.SimpleFunction; 
import org.apache.beam.sdk.transforms.GroupByKey; 
import org.apache.beam.sdk.transforms.DoFn.ProcessContext; 
import org.apache.beam.sdk.transforms.DoFn.ProcessElement; 
import org.apache.beam.sdk.extensions.jackson.ParseJsons; 
import org.apache.beam.sdk.values.KV; 
import org.apache.beam.sdk.options.Default; 
import org.apache.beam.sdk.options.Description; 
import org.apache.beam.sdk.options.PipelineOptionsFactory; 
import org.apache.beam.sdk.options.StreamingOptions; 
import org.apache.beam.sdk.options.Validation; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.io.IOException; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.List; 
import java.util.ArrayList; 

* A simple Dataflow pipeline to create autocomplete data from a list of 
* product names. It then loads that prefix data into Google Cloud Datastore for consumption by 
* a Google Cloud Function. That function will take in a prefix and return a list of 10 product names 
* Pseudo Code Steps 
* 1. Load a list of product names from Cloud Storage 
* 2. Generate prefixes for use with autocomplete, based on the product names 
* 3. Merge the prefix data together with 10 products per prefix 
* 4. Write that prefix data to the Cloud Datastore as a KV with a <String>, List<String> structure 

public class ClientAutocompletePipeline { 
    private static final Logger LOG = LoggerFactory.getLogger(ClientAutocompletePipeline.class); 

    * A DoFn that keys each product name by all of its prefixes. 
    * This creates one row in the PCollection for each prefix<->product_name pair 
    private static class AllPrefixes 
    extends DoFn<String, KV<String, String>> { 
     private final int minPrefix; 
     private final int maxPrefix; 

     public AllPrefixes(int minPrefix) { 
      this(minPrefix, 10); 

     public AllPrefixes(int minPrefix, int maxPrefix) { 
      this.minPrefix = minPrefix; 
      this.maxPrefix = maxPrefix; 
     public void processElement(ProcessContext c) { 
      String productName= c.element().toString(); 
      for (int i = minPrefix; i <= Math.min(productName.length(), maxPrefix); i++) { 
       c.output(KV.of(productName.substring(0, i), c.element())); 

    * Takes as input the top product names per prefix, and emits an entity 
    * suitable for writing to Cloud Datastore. 
    static class FormatForDatastore extends DoFn<KV<String, List<String>>, Entity> { 
     private String kind; 
     private String ancestorKey; 

     public FormatForDatastore(String kind, String ancestorKey) { 
      this.kind = kind; 
      this.ancestorKey = ancestorKey; 

     public void processElement(ProcessContext c) { 
      // Initialize an EntityBuilder and get it a valid key 
      Entity.Builder entityBuilder = Entity.newBuilder(); 
      Key key = makeKey(kind, ancestorKey).build(); 

      // New HashMap to hold all the properties of the Entity 
      Map<String, Value> properties = new HashMap<>(); 
      String prefix = c.element().getKey(); 
      String productsString = "Products["; 

      // iterate through the product names and add each one to the productsString 
      for (String productName : c.element().getValue()) { 
       // products.add(productName); 
       productsString += productName + ", "; 
      productsString += "]"; 

      properties.put("prefix", makeValue(prefix).build());    
      properties.put("products", makeValue(productsString).build()); 

    * Options supported by this class. 
    * <p>Inherits standard Beam example configuration options. 
    public interface Options 
    extends AutocompleteOptions { 
     @Description("Input text file") 
     String getInputFile(); 
     void setInputFile(String value); 

     @Description("Cloud Datastore entity kind") 
     String getKind(); 
     void setKind(String value); 

     @Description("Whether output to Cloud Datastore") 
     Boolean getOutputToDatastore(); 
     void setOutputToDatastore(Boolean value); 

     @Description("Cloud Datastore ancestor key") 
     String getDatastoreAncestorKey(); 
     void setDatastoreAncestorKey(String value); 

     @Description("Cloud Datastore output project ID, defaults to project ID") 
     String getOutputProject(); 
     void setOutputProject(String value); 

    public static void main(String[] args) throws IOException{ 

     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); 

     // create the pipeline 
     Pipeline p = Pipeline.create(options); 

     PCollection<String> toWrite = p 

      // A step to read in the product names from a text file on GCS 

      // Next expand the product names into KV pairs with prefix as key (<KV<String, String>>) 
      .apply("Explode Prefixes", ParDo.of(new AllPrefixes(2))) 

      // Apply a GroupByKey transform to the PCollection "flatCollection" to create "productsGroupedByPrefix". 
      .apply(GroupByKey.<String, String>create()) 

      // Now format the PCollection for writing into the Google Datastore 
      .apply("FormatForDatastore", ParDo.of(new FormatForDatastore(options.getKind(), 

      // Write the processed data to the Google Cloud Datastore 
      // NOTE: This is the line that I'm getting the error on!! 
        options.getOutputProject(), options.getOutputProject())))); 

     // Run the pipeline. 
     PipelineResult result = p.run(); 




PCollection<String> toWrite = p 
    .apply("Explode Prefixes", ...) 
    .apply(GroupByKey.<String, String>create()) 
    .apply("FormatForDatastore", ParDo.of(new FormatForDatastore(
     options.getKind(), options.getDatastoreAncestorKey())) 

は具体的に、あなたはapply("FormatForDatastore", ...)を閉じるために別のかっこが必要です。今、それは働いていないParDo.of(...).apply(...)に電話しようとしています。


あなたのご意見ありがとうございます。 'options.getDatastoreAncestorKey())の後ろに近い括弧を追加する必要があります)' ?? '型の不一致:ParDo.SingleOutput から変換できません。この場合、私はそれを行うと赤い構文エラーが表示されます。 >、エンティティ>をPTransform <?スーパーPCollection >>、OutputT> ' – MrSimmonsSr


はい、そのparenを追加する必要があります。後のエラーは 'KV >'のコレクションが入力として期待される変換を持っているが、それに 'KV >'のコレクションを与えた変換を示しています。 'FormatForDatastore''DoFn'の型は、' DoFn >、Entity> 'を拡張しなければなりません。 –
