2016-08-10 11 views
2

データフローを使用して何百万ものDatastoreエンティティを削除しようとしていますが、ペースは非常に遅いです(5エンティティ/秒)。私はそれを合理的なペースまで拡大するために私が従わなければならないパターンを私に説明することができることを願っています。より多くの労働者を追加するだけでは役に立ちませんでした。DatastoreIOとDataflowを使用してエンティティ上で何百万ものバッチを削除するには

データストア管理コンソールでは、特定の種類のすべてのエンティティを削除できますが、多くの場合失敗し、1週間以上で4,000万のエンティティを削除できます。データフローは、特定のクエリパラメータにもマッチする何百万ものエンティティを削除するのに役立つはずです。

いくつかのタイプのバッチ処理戦略を採用する必要があると思います(たとえば、1000個の削除を含む突然変異を作成します)が、私はそれについてどうやって明らかになっているのでしょうか。 DatastoreIOは、一度に1つのエンティティのみを扱うことができます。ポインタは非常に高く評価されるだろう。

以下は私の現在の低速ソリューションです。

Pipeline p = Pipeline.create(options); 
DatastoreIO.Source source = DatastoreIO.source() 
    .withDataset(options.getDataset()) 
    .withQuery(getInstrumentQuery(options)) 
    .withNamespace(options.getNamespace()); 
p.apply("ReadLeafDataFromDatastore", Read.from(source)) 
.apply("DeleteRecords", ParDo.of(new DeleteInstrument(options.getDataset()))); 
p.run(); 

static class DeleteInstrument extends DoFn<Entity, Integer> { 
String dataset; 
    DeleteInstrument(String dataset) { 
    this.dataset = dataset; 
    } 
    @Override 
    public void processElement(ProcessContext c) { 
    DatastoreV1.Mutation.Builder mutation = DatastoreV1.Mutation.newBuilder(); 
    mutation.addDelete(c.element().getKey()); 
    final DatastoreV1.CommitRequest.Builder request = DatastoreV1.CommitRequest.newBuilder(); 
    request.setMutation(mutation); 
    request.setMode(DatastoreV1.CommitRequest.Mode.NON_TRANSACTIONAL); 
    try { 
     DatastoreOptions.Builder dbo = new DatastoreOptions.Builder(); 
     dbo.dataset(dataset); 
     dbo.credential(getCredential()); 
     Datastore db = DatastoreFactory.get().create(dbo.build()); 
     db.commit(request.build()); 
     c.output(1); 
     count++; 
     if(count%100 == 0) { 
     LOG.info(count+""); 
     } 
    } catch (Exception e) { 
     c.output(0); 
     e.printStackTrace(); 
    } 
    } 
} 

答えて

4

現在のバージョンのDatastoreIOを使用してエンティティを直接削除する方法はありません。このバージョンのDatastoreIOは、次のDataflowリリースで新しいバージョン(v1beta3)を使用して廃止される予定です。我々は、削除ユーティリティ(例またはPTransformを通して)を提供するための良いユースケースがあると考えていますが、まだ進行中です。今の

あなたの代わりに一度に一つを削除するバッチあなたの削除、することができます

public static class DeleteEntityFn extends DoFn<Entity, Void> { 
    // Datastore max batch limit 
    private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500; 
    private Datastore db; 
    private List<Key> keyList = new ArrayList<>(); 

    @Override 
    public void startBundle(Context c) throws Exception { 
     // Initialize Datastore Client 
     // db = ... 
    } 

    @Override 
    public void processElement(ProcessContext c) throws Exception { 
     keyList.add(c.element().getKey()); 
     if (keyList.size() >= DATASTORE_BATCH_UPDATE_LIMIT) { 
     flush(); 
     } 
    } 

    @Override 
    public void finishBundle(Context c) throws Exception { 
     if (keyList.size() > 0) { 
     flush(); 
     } 
    } 

    private void flush() throws Exception { 
     // Make one delete request instead of one for each element. 
     CommitRequest request = 
      CommitRequest.newBuilder() 
       .setMode(CommitRequest.Mode.NON_TRANSACTIONAL) 
       .setMutation(Mutation.newBuilder().addAllDelete(keyList).build()) 
       .build(); 
     db.commit(request); 
     keyList.clear(); 
    } 
    } 
+0

はありがとう、私はそれを試してみると、それがどのように動作するかを確認できます。もっと多くのワーカーを追加するだけで、削除速度を上げることができますか(--numWorkers = 20)? Dataflowがプロビジョニングされた作業者をどのように割り当てるかは、私には不明です。バックログが削除の場合は、より速くキーを読むのに役立ちません。 – craftycoder

+0

ランナーを増やすと実際に削除が遅くなることが確認できます。 「多すぎる競合」エラーはエンティティを1秒あたりに約75%切断することで終了します。 – craftycoder

関連する問題