1

アイテムがGoogle Datastoreに存在しない場合は、Google Datastoreにアイテムを挿入します。 データフローストリーミングジョブを作成します。google datastoreを使用してストリーミングモードのアイテムを確認する

仕事

static class RawToObjectConverter extends DoFn<String, Entity> { 
    @Override 
    public void processElement(ProcessContext c) {  

     Query<Entity> query = Query.entityQueryBuilder().kind("Post").filter(PropertyFilter.eq("postid", rq.postid)) 
       .build(); 
     QueryResults<Entity> posts = datastore.run(query); 

     if (posts == null || !posts.hasNext()) { 
      Entity post = Entity.builder(datastore.newKeyFactory().newKey("Post"))     
        .set("postid", "1") 
        .set("title", "p1") 
        .build(); 
      c.output(post); 
     }   
    } 
} 

lines.apply(ParDo.of(new RawToObjectConverter())) 
    .apply(DatastoreIO.v1().write().withProjectId(projectid)); 

問題タイプPCollection<Entity>のメソッドapply(PTransform<? super PCollection<Entity>,OutputT>)は、引数には適用されません(DatastoreV1.Write

また、私はを使用する必要があります10またはcom.google.datastore.v1.Entity

答えて

5
あなたは DatastoreIO.v1()。書き込み()を使用する前に、あなたの com.google.cloud.datastore.Entity RawToObjectConverter でcom.google.datastore.v1.Entity に変換する必要があります

toPb方法を使用して

それは私にエラー与え
c.output(post.toPb()); 
+1

:それは公共の方法ではありませんよう –

+1

@KassemShehadyに見える「PBへのメソッド()型BaseEntity からは見えません」。代替がある場合は、Datastoreチームに確認してください。当面はこの方法を自分で書くことができます。https://github.com/GoogleCloudPlatform/google-cloud-java/blob/master/google-cloud-datastore/src/main/java/com/google/cloud/datastore /BaseEntity.java#L680 –

関連する問題