データフローを使用して何百万ものDatastoreエンティティを削除しようとしていますが、ペースは非常に遅いです(5エンティティ/秒)。私はそれを合理的なペースまで拡大するために私が従わなければならないパターンを私に説明することができることを願っています。より多くの労働者を追加するだけでは役に立ちませんでした。DatastoreIOとDataflowを使用してエンティティ上で何百万ものバッチを削除するには
データストア管理コンソールでは、特定の種類のすべてのエンティティを削除できますが、多くの場合失敗し、1週間以上で4,000万のエンティティを削除できます。データフローは、特定のクエリパラメータにもマッチする何百万ものエンティティを削除するのに役立つはずです。
いくつかのタイプのバッチ処理戦略を採用する必要があると思います(たとえば、1000個の削除を含む突然変異を作成します)が、私はそれについてどうやって明らかになっているのでしょうか。 DatastoreIOは、一度に1つのエンティティのみを扱うことができます。ポインタは非常に高く評価されるだろう。
以下は私の現在の低速ソリューションです。
Pipeline p = Pipeline.create(options);
DatastoreIO.Source source = DatastoreIO.source()
.withDataset(options.getDataset())
.withQuery(getInstrumentQuery(options))
.withNamespace(options.getNamespace());
p.apply("ReadLeafDataFromDatastore", Read.from(source))
.apply("DeleteRecords", ParDo.of(new DeleteInstrument(options.getDataset())));
p.run();
static class DeleteInstrument extends DoFn<Entity, Integer> {
String dataset;
DeleteInstrument(String dataset) {
this.dataset = dataset;
}
@Override
public void processElement(ProcessContext c) {
DatastoreV1.Mutation.Builder mutation = DatastoreV1.Mutation.newBuilder();
mutation.addDelete(c.element().getKey());
final DatastoreV1.CommitRequest.Builder request = DatastoreV1.CommitRequest.newBuilder();
request.setMutation(mutation);
request.setMode(DatastoreV1.CommitRequest.Mode.NON_TRANSACTIONAL);
try {
DatastoreOptions.Builder dbo = new DatastoreOptions.Builder();
dbo.dataset(dataset);
dbo.credential(getCredential());
Datastore db = DatastoreFactory.get().create(dbo.build());
db.commit(request.build());
c.output(1);
count++;
if(count%100 == 0) {
LOG.info(count+"");
}
} catch (Exception e) {
c.output(0);
e.printStackTrace();
}
}
}
はありがとう、私はそれを試してみると、それがどのように動作するかを確認できます。もっと多くのワーカーを追加するだけで、削除速度を上げることができますか(--numWorkers = 20)? Dataflowがプロビジョニングされた作業者をどのように割り当てるかは、私には不明です。バックログが削除の場合は、より速くキーを読むのに役立ちません。 – craftycoder
ランナーを増やすと実際に削除が遅くなることが確認できます。 「多すぎる競合」エラーはエンティティを1秒あたりに約75%切断することで終了します。 – craftycoder