2016-12-10 1 views
2

ストリーミングデータフローパイプラインのDatastoreIOを使用していて、同じキーでエンティティを書き込むときにエラーが発生しました。トランザクションDatastoreIOの使用方法

2016-12-10T22:51:04.385Z: Error: (af00222cfd901860): Exception: com.google.datastore.v1.client.DatastoreException: A non-transactional commit may not contain multiple mutations affecting the same entity., code=INVALID_ARGUMENT 

私は、物事が働くキーで乱数を使用しますが、私はそうDataStoreIOを使用してこれを行うにはトランザクションの方法がある同じキーを更新する必要がある場合は?

static class CreateEntityFn extends DoFn<KV<String, Tile>, Entity> { 
    private static final long serialVersionUID = 0; 

    private final String namespace; 
    private final String kind; 

    CreateEntityFn(String namespace, String kind) { 
    this.namespace = namespace; 
    this.kind = kind; 
    } 

    public Entity makeEntity(String key, Tile tile) { 
    Entity.Builder entityBuilder = Entity.newBuilder(); 
    Key.Builder keyBuilder = makeKey(kind, key); 
    if (namespace != null) { 
     keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace); 
    } 
    entityBuilder.setKey(keyBuilder.build()); 
    entityBuilder.getMutableProperties().put("tile", makeValue(tile.toString()).build()); 
    return entityBuilder.build(); 
    } 

    @Override 
    public void processElement(ProcessContext c) { 
    String key = c.element().getKey(); 
    // this works key = key.concat(":" + UUID.randomUUID().toString()); 
    c.output(makeEntity(key, c.element().getValue())); 
    } 
} 

... 

... 
inputData = pipeline 
       .apply(PubsubIO.Read.topic(pubsubTopic)); 
windowedDataStreaming = inputData 
       .apply(Window.<String>into(
         SlidingWindows.of(Duration.standardMinutes(15)) 
            .every(Duration.standardSeconds(31)))); 


          ... 
          ... 
          ... 
//Create a Datastore entity 
PCollection<Entity> siteTileEntities = tileSiteKeyed 
     .apply(ParDo.named("CreateSiteEntities").of(new CreateEntityFn(options.getNamespace(), options.getKind())));  

// write site tiles to datastore 
siteTileEntities 
     .apply(DatastoreIO.v1().write().withProjectId(options.getDataset())); 

// Run the pipeline 
pipeline.run(); 
+0

コードを共有していただきありがとうございます。あなたは 'inputData'をどのように構築しているかを共有できますか?私は特にDatastoreの 'Query'オブジェクトに興味があります。 –

+0

私はDatastore Queryオブジェクトを持っていません。私はpubsubからデータを読んでいます。 –

答えて

4

コードスニペットでは、tileSiteKeyedの作成方法については説明していません。おそらくそれはPCollection<KV<String, Tile>ですが、もしそれが重複するStringの鍵を持っていたら、それはその問題を説明します。

一般にPCollection<KV<K, V>>には、同じキーを持つ複数のKVペアが含まれている場合があります。ウィンドウごとに一意のキーを確保する場合は、GroupByKeyを使用します。これにより、ウィンドウごとに一意のキーを持つPCollection<KV<K, Iterable<V>>>が得られます。次にCreateEntityFnを増やしてIterable<Tile>とし、必要な変更を加えて1つの突然変異を作成します。

+0

ありがとう、私はそれが問題だと思います。私はあなたの提案を試みます。 –

2

このエラーは、クラウドデータストアが(すなわち、それは二度同じエンティティを挿入するか、二度同じエンティティを変更しようと)同じキーの2つの変異を有するCommit要求を受信したことを示しています。

Commit要求ごとに1つのキーにつき1つの突然変異を含むだけでエラーを回避できます。

+0

ストリーミングDataflowパイプラインで実行されていますが、一度に複数のワーカーが同じキーを書き込んでいる可能性があります。私の質問はこれを回避する方法を求めています。 –

+0

これは、データフローがボンネットの下でバッチ処理する方法に関連している可能性があります。コードサンプルを共有することができますか?具体的には、実行しているクエリとトランスフォームです。 –

+0

ありがとう、私は私の質問を更新しました。 –

関連する問題