2

DataflowからGCP PubSubを読んで問題が発生しました。多数のメッセージを短時間で公開すると、一部のメッセージが失われることを除いて、Dataflowは送信されたメッセージのほとんどを受信しますいくつかの他のメッセージは複製されます。そして、最も奇妙な部分は、失われたメッセージの数が複製されるメッセージの数とまったく同じになるということです。メッセージが失われ、GCP Pubsubで重複しています

例では、5秒間に4,000のメッセージを送信し、合計4000個のメッセージを受信しましたが、9個のメッセージが失われ、正確に9個のメッセージが複製されました。

私が重複を判断する方法は、ロギングによるものです。私は、pubsubによって生成されたメッセージIDとともに、Pubsubに公開されているすべてのメッセージを記録しています。私はまたPardoの変換でPubsubIOから読んだ直後にメッセージを記録しています。

私はデータフローのPubSubから読み取る方法はorg.apache.beam.sdk.ioPubsubIOを使用している:

public interface Options extends GcpOptions, DataflowPipelineOptions { 

    // PUBSUB URL 
    @Description("Pubsub URL") 
    @Default.String("https://pubsub.googleapis.com") 
    String getPubsubRootUrl(); 
    void setPubsubRootUrl(String value); 

    // TOPIC 
    @Description("Topic") 
    @Default.String("projects/test-project/topics/test_topic") 
    String getTopic(); 
    void setTopic(String value); 

... 
} 

public static void main(String[] args) { 
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); 


    options.setStreaming(true); 
    options.setRunner(DataflowRunner.class); 

    ... 

    Pipeline pipeline = Pipeline.create(options); 
    pipeline.apply(PubsubIO 
       .<String>read() 
       .topic(options.getTopic()) 
       .withCoder(StringUtf8Coder.of()) 
      ) 

      .apply("Logging data coming out of Pubsub", ParDo 
       .of(some_logging_transformation) 
      ) 

      .apply("Saving data into db", ParDo 
       .of(some_output_transformation) 
      ) 
      ; 


    pipeline.run().waitUntilFinish(); 


} 

これはPubSubのかPubsubIOの既知の問題ですかしら?

UPDATE:私はいくつかのより多くの実験を経て、複製メッセージが服用していることがわかっ

: は、のpubsubエミュレータと全く欠落したデータと重複なし

UPDATE#2を4000要求を試していません見つからないものからmessage_id問題の方向性はかなり元気になっているので、詳細なログと、メッセージの発行や受信に使用したコードを別の質問で投稿することにしました。 新しい質問へのリンク:Google Cloud Pubsub Data lost

+0

パイプラインのジョブIDを共有できますか? –

+0

ありがとうございます。 pubsbuIOを使ってpubsubを読み込み、オブジェクトを解析してNeo4jデータベースに保存する単純なパイプラインを作った。 3000リクエストを送信すると、13が失われ、13が重複していました。ジョブIDは2017-05-24_08_46_18-1297374047328352490 –

+0

です。重複メッセージと失われたメッセージはどのように判断していますか? –

答えて

1

私はPubSubチームからGoogleの男と話をしました。これは、Pythonクライアントのスレッドセーフの問題が原因であるようです。 Googleからの回答はGoogle Cloud Pubsub Data lostの回答を参照してください

+0

この問題は、Pythonクライアントのバージョン0.29で解決されるようです –

関連する問題