DataflowからGCP PubSubを読んで問題が発生しました。多数のメッセージを短時間で公開すると、一部のメッセージが失われることを除いて、Dataflowは送信されたメッセージのほとんどを受信しますいくつかの他のメッセージは複製されます。そして、最も奇妙な部分は、失われたメッセージの数が複製されるメッセージの数とまったく同じになるということです。メッセージが失われ、GCP Pubsubで重複しています
例では、5秒間に4,000のメッセージを送信し、合計4000個のメッセージを受信しましたが、9個のメッセージが失われ、正確に9個のメッセージが複製されました。
私が重複を判断する方法は、ロギングによるものです。私は、pubsubによって生成されたメッセージIDとともに、Pubsubに公開されているすべてのメッセージを記録しています。私はまたPardoの変換でPubsubIOから読んだ直後にメッセージを記録しています。
私はデータフローのPubSubから読み取る方法はorg.apache.beam.sdk.ioPubsubIO
を使用している:
public interface Options extends GcpOptions, DataflowPipelineOptions {
// PUBSUB URL
@Description("Pubsub URL")
@Default.String("https://pubsub.googleapis.com")
String getPubsubRootUrl();
void setPubsubRootUrl(String value);
// TOPIC
@Description("Topic")
@Default.String("projects/test-project/topics/test_topic")
String getTopic();
void setTopic(String value);
...
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
options.setRunner(DataflowRunner.class);
...
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(PubsubIO
.<String>read()
.topic(options.getTopic())
.withCoder(StringUtf8Coder.of())
)
.apply("Logging data coming out of Pubsub", ParDo
.of(some_logging_transformation)
)
.apply("Saving data into db", ParDo
.of(some_output_transformation)
)
;
pipeline.run().waitUntilFinish();
}
これはPubSubのかPubsubIOの既知の問題ですかしら?
UPDATE:私はいくつかのより多くの実験を経て、複製メッセージが服用していることがわかっ
: は、のpubsubエミュレータと全く欠落したデータと重複なし
UPDATE#2を4000要求を試していません見つからないものからmessage_id
問題の方向性はかなり元気になっているので、詳細なログと、メッセージの発行や受信に使用したコードを別の質問で投稿することにしました。 新しい質問へのリンク:Google Cloud Pubsub Data lost
パイプラインのジョブIDを共有できますか? –
ありがとうございます。 pubsbuIOを使ってpubsubを読み込み、オブジェクトを解析してNeo4jデータベースに保存する単純なパイプラインを作った。 3000リクエストを送信すると、13が失われ、13が重複していました。ジョブIDは2017-05-24_08_46_18-1297374047328352490 –
です。重複メッセージと失われたメッセージはどのように判断していますか? –