これはセットアップであり、1つのトピックがあり、このトピックには完全に同じ設定の2つのサブスクリプションv1とv2があり、どちらもプルサブスクリプションで10秒のack期限です。
サブスクリプションv1とv2の両方が専用のデータフローに分かれており、v2のデータフローはより最適化されていますが、ほぼ同じことをしています。gcp PubSubが「処理遅延が高かった」
問題が発生するたびに、警告メッセージが表示され、バックログの開始がv2サブスクリプションのみで行われ、v1がバックログがほとんどないことが示されます。
08:53:56.000 ${MESSAGE_ID} Pubsub processing delay was high at 72 sec.
上記のメッセージを除いて、データフローログには何も明白ではありません。実際には、v2のデータフローCPU使用率はv1よりも低いので、私はこのことを理解していません。
質問:
- 処理遅延とどのように私はそれを修正することができますの原因は何?
- v1サブスクリプションで同じ警告が表示されないのはなぜですか?我々は右のPubSubの読み取り後に行うパルドフィルタリング操作が予想外に高い待ち時間が当たっているようだ@ben経由で示唆したように
は、2017年1月17日
で更新しました。しかし、getClassroomIds
が単純なJavaのリストであると考えると、私はこの問題にどのように取り組むことができるのか分かりません。 1つの質問は、私たちがpubsubに怠け者に適用したコーダーですか? ProcessContext#element()
が呼び出されたときに適用されたコーダーで定義した解凍とデシリアライズはありますか?
def processElement(c: DoFn[Entity, Entity]#ProcessContext) = {
val startTime = System.currentTimeMillis()
val entity = c.element()
if (!entity.getClassroomIds.isEmpty) {
c.output(entity)
}
val latencyMs = System.currentTimeMillis() - startTime
if (latencyMs > 1000) {
// We see this warning messages during the load spike
log.warn(s"latency breached 1 second threshold: $latencyMs ms")
}
}
ジョブIDは、これらの実行で何が起きているかを見るのに役立ちます。通常、そのメッセージは、PubSubソースのすぐ後のParDoステップが遅いことを意味します。また、「シャッフラー」ログを見ると、フロー制御がキューイング遅延を導入しているかどうかを示すことができます。 –
@BenChambers ParDoの直後は非常に単純なフィルタリングロジックでなければならないので、そうであるかどうかわかりません。しかし、ストリームはエンコードされたzipであるため、デシリアライズする必要があります。しかし、この部分はv1とv2の間で同じでなければなりません。また、データフローが「シャッフラー」ログで記録されていることを意味している場合は、それらを見ただけでなく、GCログや「プロセス遅延」ログ以外のものを除いて、イベントのタイムラインの周りには何も表示されません。 – codingtwinky
v1ストリームはデータフローv1.6.1であり、v2はデータフローv1.9.0ですが、v1.6.1のパフォーマンスを向上させる可能性のある明白なものはありません – codingtwinky