2017-06-09 4 views
0

Google Data Flowを使用していますが、ステップの1つでは、既に作成済みのサブスクリプションを使用しているパブサブスクライブのトピックにサブスクライブしています。 エラー処理パイプライン:次のエラーで結果を実行する際ここ は、コードスニペットGoogleデータフローの既存のpubサブスクリプションを使用

CustomPipelineOptions options = 
      PipelineOptionsFactory.fromArgs(args).withValidation().as(customPipelineOptions.class); 
    Pipeline p = Pipeline.create(options); 

    PCollection<TableRow> datastream = p.apply(PubsubIO.Read.named("Read device data from PubSub")     .subscription("projects/<projectID>/subscriptions/<subscriptionname>) 
      .topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic())) 
      .timestampLabel("ts") 
      .withCoder(TableRowJsonCoder.of())); 

上記のコードです。原因:(b5e276ef8c76419f):ステップs1の認識できない入力pubsub_subscription。

正しいサブスクリプション名とプロジェクトIDを渡していますか。 なぜ上記のエラーが発生するのか不明です。

ご協力ください。

答えて

2

2つのソースのいずれかを指定すると、トピックまたはサブスクリプションで十分です。

私はあなたがしようと提案する:

また
PCollection<TableRow> datastream = p 
     .apply(PubsubIO.Read.named("Read device data from PubSub") 
     .topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic())) 
     .timestampLabel("ts") 
     .withCoder(TableRowJsonCoder.of())); 

:私はあなたがデータフロー1.9 SDKを使用していると仮定? new Beam 2.0.0 releaseに移動することを考えるとよいでしょう。そのSDK hereでPubSubのリファレンスを見つけることができます。

+0

こんにちはマティアス、応答をありがとう。私はそれを試みて、あなたに戻ってください。 –