Pub/Subからデータを読み取り、Datastoreに書き込むパイプラインを作成できますか?私のコードでは、入力としてPubsubIOを指定し、境界のあるPCollectionを取得するためにウィンドウを適用していますが、DatastoreIO.writeToをoptions.setStreamingとともに使用することはできないようです入力としてPubsubIO。これを回避する方法はありますか?あるいは単にpubsubから読み込んでデータストアに書き込むことはできませんか?PubsubIOからDatastoreIOへの書き込み
は、ここに私のコードです:
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
options.setProject(projectName);
options.setStagingLocation("gs://my-staging-bucket/staging");
options.setStreaming(true);
Pipeline p = Pipeline.create(options);
PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/event-streaming"));
PCollection<String> inputWindow = input.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))).triggering(AfterPane.elementCountAtLeast(1)).discardingFiredPanes().withAllowedLateness(Duration.standardHours(1)));
PCollection<String> inputDecode = inputWindow.apply(ParDo.of(new DoFn<String, String>() {
private static final long serialVersionUID = 1L;
public void processElement(ProcessContext c) {
String msg = c.element();
byte[] decoded = Base64.decodeBase64(msg.getBytes());
String outmsg = new String(decoded);
c.output(outmsg);
}
}));
PCollection<DatastoreV1.Entity> inputEntity = inputDecode.apply(ParDo.of(new CreateEntityFn("stream", "events")));
inputEntity.apply(DatastoreIO.writeTo(datasetid));
p.run();
そして、これは私が手例外です:DatastoreIOシンクが現在ストリーミングランナーではサポートされていません
Exception in thread "main" java.lang.UnsupportedOperationException: The Write transform is not supported by the Dataflow streaming runner.
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:488)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:480)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:314)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:312)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:159)
at my.own.project.google.dataflow.EventStreamingDataflow.main(EventStreamingDataflow.java:104)
おかげで、これは参考になりました。しかし、私はAppEngineアプリケーションではないDataflowアプリケーションからDatastore APIを呼び出す際に問題に直面しています。AppStore APIは、AppEngine上で動作するアプリケーションでのみ使用可能なAppEngine機能に多くの依存があるようです。それから、私が必要としているものを正確に提供するようなRemote APIが見つかりましたが、まだそれを使用するのが困難に直面しています。サービスアカウントで認証する必要はありますか?この[ページ](https://cloud.google.com/appengine/docs/java/tools/remoteapi)のコードサンプルに従っていますが、HttpResponseException、302が発生しています – lilline
Datastoreインスタンスに書き込もうとしていますかデータフローパイプラインとは異なるプロジェクトに属していますか?もしそうなら、それを設定する方法はhttps://cloud.google.com/dataflow/security-and-permissions#cross-projectを見てください。 – danielm
いいえ、データストアインスタンスはデータフローと同じプロジェクトの一部ですが、私はその302の問題を乗り越えました。 ParDoが親パイプラインとは別のスレッドまたはインスタンスでDoFn関数を実行していて、リモートAPIインストーラがシリアル化可能ではなく、インストーラが実行されていない場合、ParDoでRemote APIを使用することは可能ですか?それが作成されたスレッドでのみ使用できますか?私はこれが問題であるかどうかはわかりませんが、とにかく、インストーラを作成してどこにアクセスしようとしているかによって、例外が異なります。 – lilline