2017-01-27 12 views
3

スタックドライブチャートによれば、特定のトピック/サブスクリプションに対する「未確認メッセージ」の数が時々増えることに気付き始めました。Google PubsubでACKedメッセージが残っています

症状

私たちはのStackdriverチャートを信頼することができますどのくらいかわからないが、私はすでにチェックしました:

  • プル動作回数は、パブリッシュ操作回数
  • ACK操作限り多くあります問題が発生したときのプル操作数よりもカウントが小さい

また、pubsubが実際に同じメッセージを複数回送信していることがわかりました。私たちのログによれば、それはまた、「プル」が成功したことを確認しますが、「ack」はおそらく失敗しています。

私たちのシステムはすぐに引き出されると思いますが、GCPの観点からはうまくいかないと思います。

ACKを時間通りに送信しない可能性をチェックしましたが、以下のフローを示すように、私はそうではないと思います。

問題のあるサブスクリプションでは、メッセージが数時間蓄積されています。私たちにとって、これは深刻な問題です。

実装の詳細

は、我々はいくつかの理由でプル方式を使用して、我々は良い根拠がない限り、プッシュ方式に切り替えるには消極的です。各サブスクリプションごとに、1つのメッセージ・ポンピング・ゴルーチンがあり、このゴルーチンは、取り出されたメッセージごとにワーカーを生成します。より具体的には、

// in a dedicated message-pumping goroutine 
sub, _ := CreateSubscription(..., 0 /* ack-deadline */,) 
iter, _ := sub.Pull(...) 
for { 
    // omitted: wait if we have too many workers 
    msg, _ := iter.Next() 
    go func(msg Message) { 
    // omitted: handle the message and measure the latency; it turned out the latency is almost within 1 second 
    msg.Done(true) 
    }(msg) 
} 

ロードバランシングの場合、サブスクリプションは同じクラスタ内の他のポッドからも引き出されます。したがって、1つのサブスクリプション(Google Pubsubのトピック/サブスクリプションなど)では、(Goバインディングのサブスクリプション構造のように)複数のサブスクリプションオブジェクトがあり、それぞれが1つのポッドでのみ使用されます。そして、各サブスクリプションオブジェクトは1つのイテレータを作成します。私はこの設定が間違っているとは思っていませんが、私が間違っている場合は私を修正してください。

このコードが示すように、私たちはACKを行います。 (私たちのサーバがパニックにしない;。msg.Done()の周りに取得するためのパスがないので)

試み

奇妙なことは、問題のサブスクリプションがビジー状態のものではないです。私たちは通常、同じポッドにはるかに多くのメッセージを受け取る別のサブスクリプションには問題ありません。そこで、プル操作のmax-prefetchオプションが影響を受けるかどうか疑問に思っていました。しばらく問題が修正されたようですが、問題が再発しました。

私はまた、Googleサポートのアドバイスに従って、ポッドの数を増やし、効果的に労働者の数を増やしました。これはあまり役に立たなかった。私たちは問題のあるメッセージ(約1メッセージ/秒)に多くのメッセージを公開していないため、(おそらくあまりにも多くの)労働者がたくさんいるので、私たちのサーバーに負荷がかかっているとは思わない。

誰かがこれにいくつかの光を当てることができますか?

+1

私は先週起きたNode.jsライブラリを使ってこの同じ問題に直面しています。 ackは機能しませんし、私は再配信されるメッセージを待つ必要があります – andresk

答えて

2

私のケースでは、何らかの理由でAckが返さない症状が定期的に発生し、gRPCの呼び出しに対してタイムアウトが設定されず、「acker」のgroutineがブロックされていました。

screen shot

だから私はpubsub.NewClientからgRPCオプションを渡すことによってそれを解決しました。

import (
    "cloud.google.com/go/pubsub" 
    "google.golang.org/api/option" 
    "google.golang.org/grpc" 
) 

// ... 

scChan := make(chan grpc.ServiceConfig) 
go func() { 
    sc := grpc.ServiceConfig{ 
     Methods: map[string]grpc.MethodConfig{ 
      "/google.pubsub.v1.Subscriber/Acknowledge": { 
       Timeout: 5 * time.Second, 
      }, 
     }, 
    } 
    scChan <- sc 
}() 

c, err := pubsub.NewClient(ctx, project, option.WithGRPCDialOption(grpc.WithServiceConfig(scChan))) 

grpc.EnableTracing = trueを指定して原因を調査できます。

grpc.EnableTracing = true 

c, err := pubsub.NewClient(ctx, project) 
if err != nil { 
    return nil, errors.Wrap(err, "pubsub.NewClient") 
} 

go func(){ 
    http.ListenAndServe(":8080", nil) 
}() 

golang.org/x/net/traceでgRPCのトレース情報を確認できます。

+1

ありがとうございます。これは私が今までに得た最も有益な答えです。私はそれを試みます。 – linjus

関連する問題