1

TLDR既存のエンティティを上書きせずにデータストアへの書き込み:データフローGoogleのデータフロー:

を経由して、既存のデータを上書きせずにデータストアエンティティを更新する方法を探して、私はGoogleのデータストアのエンティティを更新するために、データフロー2.0.0(ビーム)を使用しています。私のデータフローは、データストアからエンティティをロードし、更新し、データストアに保存し直します(既存のエンティティを上書きします)。

しかし、更新処理中に、既に存在する場合と存在しない場合がある追加のエンティティも検出します。既存のエンティティを上書きしないようにするために、以前はすべてのエンティティをDatastoreから読み込み、それらを(キーごとに)減らし、新しい重複を削除しました。

エンティティの数が増えるにつれて、すべてのエンティティをDataflowにロードするのではなく(古いタイムスタンプに基づいてバッチで取り込むのではなく)、エンティティを上書きするという問題を抱えています。現在のバッチにはありません。


私は(2つのスポット、既存のエンティティの一つであり、新しいエンティティの1に)を使用して、データフローにエンティティを書いている:

collection.apply(DatastoreIO.v1().write().withProjectId("...")) 

のようなものがあった場合、それは本当にいいだろうDatastoreIO.v1().writeNew()メソッドですが、残念ながらそれは存在しません。何か助けてくれてありがとう。

+0

で私のコードリポジトリに参照することができます。明確にすることはできますか?また、エンティティをバッチでどのように読み込んでいますか? –

+1

はい、データフローを介して公開する必要があります。このサービスはInsert、Upsert、およびUpdateをサポートしていますが、現在のところDataflowのUpsertだけが公開されています。 –

+0

非冪等書き込みのセマンティクスは、すべてのレコードが正常に書き込まれるまで、Dataflowが再試行を続けることを考慮して、再試行がどのように処理されるかを考慮する必要があります。いくつかの書き込みが成功したが、データフロージョブが失敗し、将来のジョブの再試行が成功しないシナリオになる可能性があります。 –

答えて

1

データストアに存在しない新しいエンティティを書きたい場合は、新しいキーで作成して書き込むだけです。

List<String> keyNames = Arrays.asList("L1", "L2"); // Somewhat you have new keys to store 
PTransform<PCollection<Entity>, ?> write = 
     DatastoreIO.v1().write().withProjectId(project_id); // This is a typical write operation 

p. 
    apply("GetInMemory", Create.of(keyNames)).setCoder(StringUtf8Coder.of()). // L1 and L2 are loaded 
    apply("Proc1", ParDo.of(new DoFn<String, Entity>(){ 
     @ProcessElement 
     public void processElement(ProcessContext c) { 
      Key.Builder key = makeKey("k2", c.element()); // Generate an entity key 
      final Entity entity = Entity.newBuilder(). 
        setKey(key). // Set the key 
        putProperties("p1", makeValue(new String("test constant value") 
         ).setExcludeFromIndexes(true).build()). 
        build(); 
      c.output(entity); 
     } 
    })). 
    apply(write); // Write them 
p.run(); 

全体のコードは、1つにあなたがエンティティを上書きしても実体を上書きしないようにしたいと言うとあなたがやろうとしているものを私に明確ではないhttps://github.com/yiu31802/gcp-project/commit/cc224b34

関連する問題