2017-11-13 11 views
0

Apache Beam JavaSDKを使用してPubSubに書き込むことができません。ApacheビームPubSubIO write

私はビームをPubSubから読み込み、処理してからPubSubトピックに書き込むことを試みていますが、PubSubへの書き込み方法の実例を見つけることができません。

誰かがPubSubトピックに書き込むための適切な変換を手助けできますか?

.apply("Create pubsub messages", ParDo.of(new DoFn<String, PubsubMessage>() { 
    @DoFn.ProcessElement 
    public void processElement(ProcessContext c) throws Exception { 
     PubsubMessage pubsubMessage = new PubsubMessage(c.element()); 
     c.output(pubsubMessage); 
    } 
    })) 
.apply("Write messages to topic",PubsubIO.writeMessages().to("projects/project_id/topics/topic_name")) 

私は現在、ここで

[ERROR] COMPILATION ERROR : 
[INFO] ------------------------------------------------------------- 
[ERROR] /home/username/src/main/java/com/domain/JavaClass.java:[336,1] no suitable method found for apply(java.lang.String,org.apache.beam.sdk.transforms.ParDo.SingleOutput<java.lang.String,org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage>) 
    method org.apache.beam.sdk.values.PCollection.<OutputT>apply(org.apache.beam.sdk.transforms.PTransform<? super org.apache.beam.sdk.values.PCollection<org.apache.beam.sdk.values.KV<java.lang.String,java.lang.Integer>>,OutputT>) is not applicable 
     (cannot infer type-variable(s) OutputT 
     (actual and formal argument lists differ in length)) 
    method org.apache.beam.sdk.values.PCollection.<OutputT>apply(java.lang.String,org.apache.beam.sdk.transforms.PTransform<? super org.apache.beam.sdk.values.PCollection<org.apache.beam.sdk.values.KV<java.lang.String,java.lang.Integer>>,OutputT>) is not applicable 
     (inference variable InputT has incompatible bounds 
     equality constraints: java.lang.String 
     lower bounds: org.apache.beam.sdk.values.KV<java.lang.String,java.lang.Integer>) 
[ERROR] /home/username//src/main/java/com/domain/JavaClass.java:[339,39] constructor PubsubMessage in class org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage cannot be applied to given types; 
    required: byte[],java.util.Map<java.lang.String,java.lang.String> 
    found: java.lang.String 
    reason: actual and formal argument lists differ in length 
+0

ではない何かを説明するためにあなたの質問を編集してくださいあなたが含まれているコードスニペットであなたのために働いています。 – jkff

+0

このシンペットの前にパイプラインの部分を投稿できますか。つまり、「pubsubメッセージの作成」変換にどのPCollectionを使用していますか。スニペット自体は大丈夫だと私は非常によく似た何かを使用し、それは正常に動作します。 – Arqu

答えて

0

がGCSからの読み出しの一例であり、パブ/サブに書き、コンパイルエラーを取得しています:

package ... 

import java.io.IOException; 

import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; 
import org.apache.beam.sdk.Pipeline; 
import org.apache.beam.sdk.PipelineResult; 
import org.apache.beam.sdk.io.TextIO; 
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; 
import org.apache.beam.sdk.options.Default; 
import org.apache.beam.sdk.options.Description; 
import org.apache.beam.sdk.options.PipelineOptionsFactory; 
import org.apache.beam.sdk.values.PCollection; 

public class Gcs2PubSub { 

    public interface Pubsub2DatastoreOptions extends DataflowPipelineOptions { 
     @Description("GCP project name") 
     @Default.String("gcp_project_name") 
     String getProjectId(); 
     void setProjectId(String value); 
    } 

    public static void main(String[] args) throws IOException { 

     Pubsub2DatastoreOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() 
       .as(Pubsub2DatastoreOptions.class); 

     Pipeline p = Pipeline.create(options); 

     PCollection<String> line = p.apply("Read GCS",TextIO.read().from("gs://<bucket>/*")); 
     line.apply("Sending Pub/sub",PubsubIO.writeStrings().to("<topic>")); 

     PipelineResult result = p.run(); 
     try { 
      result.waitUntilFinish(); 
     } catch (Exception exc) { 
      result.cancel(); 
     } 
    } 
} 
関連する問題