1
アイテムがGoogle Datastoreに存在しない場合は、Google Datastoreにアイテムを挿入します。 データフローストリーミングジョブを作成します。google datastoreを使用してストリーミングモードのアイテムを確認する
仕事
static class RawToObjectConverter extends DoFn<String, Entity> {
@Override
public void processElement(ProcessContext c) {
Query<Entity> query = Query.entityQueryBuilder().kind("Post").filter(PropertyFilter.eq("postid", rq.postid))
.build();
QueryResults<Entity> posts = datastore.run(query);
if (posts == null || !posts.hasNext()) {
Entity post = Entity.builder(datastore.newKeyFactory().newKey("Post"))
.set("postid", "1")
.set("title", "p1")
.build();
c.output(post);
}
}
}
lines.apply(ParDo.of(new RawToObjectConverter()))
.apply(DatastoreIO.v1().write().withProjectId(projectid));
問題タイプPCollection<Entity>
のメソッドapply(PTransform<? super PCollection<Entity>,OutputT>)
は、引数には適用されません(DatastoreV1.Write
)
また、私はを使用する必要があります10またはcom.google.datastore.v1.Entity
?
:それは公共の方法ではありませんよう –
@KassemShehadyに見える「PBへのメソッド()型BaseEntityからは見えません」。代替がある場合は、Datastoreチームに確認してください。当面はこの方法を自分で書くことができます。https://github.com/GoogleCloudPlatform/google-cloud-java/blob/master/google-cloud-datastore/src/main/java/com/google/cloud/datastore /BaseEntity.java#L680 –