5

Pub/Subからデータを読み取り、Datastoreに書き込むパイプラインを作成できますか?私のコードでは、入力としてPubsubIOを指定し、境界のあるPCollectionを取得するためにウィンドウを適用していますが、DatastoreIO.writeToをoptions.setStreamingとともに使用することはできないようです入力としてPubsubIO。これを回避する方法はありますか?あるいは単にpubsubから読み込んでデータストアに書き込むことはできませんか?PubsubIOからDatastoreIOへの書き込み

は、ここに私のコードです:

DataflowPipelineOptions options = PipelineOptionsFactory.create() 
      .as(DataflowPipelineOptions.class); 

    options.setRunner(DataflowPipelineRunner.class); 
    options.setProject(projectName); 
    options.setStagingLocation("gs://my-staging-bucket/staging"); 
    options.setStreaming(true); 

    Pipeline p = Pipeline.create(options); 

    PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/event-streaming")); 
    PCollection<String> inputWindow = input.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))).triggering(AfterPane.elementCountAtLeast(1)).discardingFiredPanes().withAllowedLateness(Duration.standardHours(1))); 
    PCollection<String> inputDecode = inputWindow.apply(ParDo.of(new DoFn<String, String>() { 
     private static final long serialVersionUID = 1L; 
     public void processElement(ProcessContext c) { 
      String msg = c.element(); 
      byte[] decoded = Base64.decodeBase64(msg.getBytes()); 
      String outmsg = new String(decoded); 
      c.output(outmsg); 
     } 
    })); 
    PCollection<DatastoreV1.Entity> inputEntity = inputDecode.apply(ParDo.of(new CreateEntityFn("stream", "events"))); 

    inputEntity.apply(DatastoreIO.writeTo(datasetid)); 


    p.run(); 

そして、これは私が手例外です:DatastoreIOシンクが現在ストリーミングランナーではサポートされていません

Exception in thread "main" java.lang.UnsupportedOperationException: The Write transform is not supported by the Dataflow streaming runner. 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:488) 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:480) 
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74) 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:314) 
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358) 
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267) 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:312) 
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358) 
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267) 
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:159) 
at my.own.project.google.dataflow.EventStreamingDataflow.main(EventStreamingDataflow.java:104) 

答えて

5

。ストリーミングパイプラインからDatastoreに書き込むには、Datastore APIをDoFnから直接呼び出すことができます。

+0

おかげで、これは参考になりました。しかし、私はAppEngineアプリケーションではないDataflowアプリケーションからDatastore APIを呼び出す際に問題に直面しています。AppStore APIは、AppEngine上で動作するアプリケーションでのみ使用可能なAppEngine機能に多くの依存があるようです。それから、私が必要としているものを正確に提供するようなRemote APIが見つかりましたが、まだそれを使用するのが困難に直面しています。サービスアカウントで認証する必要はありますか?この[ページ](https://cloud.google.com/appengine/docs/java/tools/remoteapi)のコードサンプルに従っていますが、HttpResponseException、302が発生しています – lilline

+0

Datastoreインスタンスに書き込もうとしていますかデータフローパイプラインとは異なるプロジェクトに属していますか?もしそうなら、それを設定する方法はhttps://cloud.google.com/dataflow/security-and-permissions#cross-projectを見てください。 – danielm

+0

いいえ、データストアインスタンスはデータフローと同じプロジェクトの一部ですが、私はその302の問題を乗り越えました。 ParDoが親パイプラインとは別のスレッドまたはインスタンスでDoFn関数を実行していて、リモートAPIインストーラがシリアル化可能ではなく、インストーラが実行されていない場合、ParDoでRemote APIを使用することは可能ですか?それが作成されたスレッドでのみ使用できますか?私はこれが問題であるかどうかはわかりませんが、とにかく、インストーラを作成してどこにアクセスしようとしているかによって、例外が異なります。 – lilline

4

私の頭を壁にぶつけた後、ついにそれが働いてしまった。 danielm氏が提案したように、私はParaso DoFnからDatastore APIを呼び出しています。 1つの問題は、AppEngineの外部にあるCloud Datastoreを使用するためのAPIが別にあることを認識していないことでした。 (com.google.api.services.datastore ...とcom.google.appengine.api.datastore ...)。もう1つの問題は、クラウドデータストアAPIの最新バージョン(google-api-services-datastore-protobuf v1beta2-rev1-4.0.0、IllegalAccessErrorがあります)に何らかのバグがあるようでした。古いバージョン(v1beta2-rev1-2.1.2)。

だから、ここに私の作業コードです:

import com.google.cloud.dataflow.sdk.Pipeline; 
import com.google.cloud.dataflow.sdk.io.PubsubIO; 
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; 
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; 
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; 
import com.google.cloud.dataflow.sdk.transforms.DoFn; 
import com.google.cloud.dataflow.sdk.transforms.ParDo; 
import com.google.cloud.dataflow.sdk.values.PCollection; 
import com.google.api.services.datastore.DatastoreV1.*; 
import com.google.api.services.datastore.client.Datastore; 
import com.google.api.services.datastore.client.DatastoreException; 
import com.google.api.services.datastore.client.DatastoreFactory; 
import static com.google.api.services.datastore.client.DatastoreHelper.*; 
import java.security.GeneralSecurityException; 
import java.io.IOException; 
import org.json.simple.JSONObject; 
import org.json.simple.parser.JSONParser; 
import org.json.simple.parser.ParseException; 

//-------------------- 

public static void main(String[] args) { 
    DataflowPipelineOptions options = PipelineOptionsFactory.create() 
      .as(DataflowPipelineOptions.class); 

    options.setRunner(DataflowPipelineRunner.class); 
    options.setProject(projectName); 
    options.setStagingLocation("gs://my-staging-bucket/staging"); 
    options.setStreaming(true); 

    Pipeline p = Pipeline.create(options); 
    PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/my-topic-name")); 

    input.apply(ParDo.of(new DoFn<String, String>() { 
     private static final long serialVersionUID = 1L; 
     public void processElement(ProcessContext c) throws ParseException, DatastoreException { 

      JSONObject json = (JSONObject)new JSONParser().parse(c.element()); 

      Datastore datastore = null; 
      try { 
       datastore = DatastoreFactory.get().create(getOptionsFromEnv() 
         .dataset(datasetid).build()); 
      } catch (GeneralSecurityException exception) { 
       System.err.println("Security error connecting to the datastore: " + exception.getMessage()); 
      } catch (IOException exception) { 
       System.err.println("I/O error connecting to the datastore: " + exception.getMessage()); 
      } 

      Key.Builder keyBuilder = makeKey("my-kind"); 
      keyBuilder.getPartitionIdBuilder().setNamespace("my-namespace"); 
      Entity.Builder event = Entity.newBuilder() 
        .setKey(keyBuilder); 

      event.addProperty(makeProperty("my-prop",makeValue((String)json.get("my-prop")))); 

      CommitRequest commitRequest = CommitRequest.newBuilder() 
        .setMode(CommitRequest.Mode.NON_TRANSACTIONAL) 
        .setMutation(Mutation.newBuilder().addInsertAutoId(event)) 
        .build(); 
      if(datastore!=null){ 
       datastore.commit(commitRequest); 
      } 

     } 
    })); 


    p.run(); 
} 

とのpom.xmlでの依存関係:

<dependency> 
    <groupId>com.google.cloud.dataflow</groupId> 
    <artifactId>google-cloud-dataflow-java-sdk-all</artifactId> 
    <version>[1.0.0,2.0.0)</version> 
</dependency> 
<dependency> 
    <groupId>com.google.apis</groupId> 
    <artifactId>google-api-services-datastore-protobuf</artifactId> 
    <version>v1beta2-rev1-2.1.2</version> 
</dependency> 
<dependency> 
    <groupId>com.google.http-client</groupId> 
    <artifactId>google-http-client</artifactId> 
    <version>1.17.0-rc</version> 
</dependency> 
<!-- Some more.. like JUnit etc.. --> 
関連する問題