0

これはセットアップであり、1つのトピックがあり、このトピックには完全に同じ設定の2つのサブスクリプションv1とv2があり、どちらもプルサブスクリプションで10秒のack期限です。
サブスクリプションv1とv2の両方が専用のデータフローに分かれており、v2のデータフローはより最適化されていますが、ほぼ同じことをしています。gcp PubSubが「処理遅延が高かった」

問題が発生するたびに、警告メッセージが表示され、バックログの開始がv2サブスクリプションのみで行われ、v1がバックログがほとんどないことが示されます。

08:53:56.000 ${MESSAGE_ID} Pubsub processing delay was high at 72 sec. 

上記のメッセージを除いて、データフローログには何も明白ではありません。実際には、v2のデータフローCPU使用率はv1よりも低いので、私はこのことを理解していません。

質問:

  1. 処理遅延とどのように私はそれを修正することができますの原因は何?
  2. v1サブスクリプションで同じ警告が表示されないのはなぜですか?我々は右のPubSubの読み取り後に行うパルドフィルタリング操作が予想外に高い待ち時間が当たっているようだ@ben経由で示唆したように

は、2017年1月17日

で更新しました。しかし、getClassroomIdsが単純なJavaのリストであると考えると、私はこの問題にどのように取り組むことができるのか分かりません。 1つの質問は、私たちがpubsubに怠け者に適用したコーダーですか? ProcessContext#element()が呼び出されたときに適用されたコーダーで定義した解凍とデシリアライズはありますか?

def processElement(c: DoFn[Entity, Entity]#ProcessContext) = { 
    val startTime = System.currentTimeMillis() 
    val entity = c.element() 
    if (!entity.getClassroomIds.isEmpty) { 
    c.output(entity) 
    } 

    val latencyMs = System.currentTimeMillis() - startTime 
    if (latencyMs > 1000) { 
    // We see this warning messages during the load spike 
    log.warn(s"latency breached 1 second threshold: $latencyMs ms") 
    } 
} 
+0

ジョブIDは、これらの実行で何が起きているかを見るのに役立ちます。通常、そのメッセージは、PubSubソースのすぐ後のParDoステップが遅いことを意味します。また、「シャッフラー」ログを見ると、フロー制御がキューイング遅延を導入しているかどうかを示すことができます。 –

+0

@BenChambers ParDoの直後は非常に単純なフィルタリングロジックでなければならないので、そうであるかどうかわかりません。しかし、ストリームはエンコードされたzipであるため、デシリアライズする必要があります。しかし、この部分はv1とv2の間で同じでなければなりません。また、データフローが「シャッフラー」ログで記録されていることを意味している場合は、それらを見ただけでなく、GCログや「プロセス遅延」ログ以外のものを除いて、イベントのタイムラインの周りには何も表示されません。 – codingtwinky

+0

v1ストリームはデータフローv1.6.1であり、v2はデータフローv1.9.0ですが、v1.6.1のパフォーマンスを向上させる可能性のある明白なものはありません – codingtwinky

答えて

1

あなたが言及するタイミングは、そのステップで費やされた時間を正確に考慮していません。特に、fusion optimizationのために、フィルタリング操作の後にParDoのすべてを反映します。

あなたは次のようになりますパイプラインがある場合:

ReadFromPubSub - >パルド(フィルター) - >パルド(高価な) - >パルド(書き込み)

両方ExpensiveWrite出てくる各要素に対して実行をc.outputへの呼び出しが返される前にFilterのこれは、PubSubから出てくる要素に融合されるため、さらに問題になります。それは任意の通常のトリガーが起動しますよりも速くをトリガーするため、代わりに通常のGroupByKeyReshuffleを使用すると、素敵な性質を持っていることを

pipeline 
    .apply(PubSubIO.read(...)) 
    .apply(keyEachElement by their value and Void) 
    .apply(new Reshuffle<E, Void>()) 
    .apply(move the key back to the element) 
    // rest of the pipeline 

注:

最も簡単な解決策はReshuffleを実行するために考えられます。

+0

これからもっと多くの質問があります...私たちには「処理遅延の警告」とバックログの構築という2つの問題があります。 "処理遅延"は、pubsubメッセージACKのためにデータフローのステップの終わりに広がっていますか?バックグラウンドは「処理遅延」のために増強されていますか?また、ステップが実行される前に効果的にメッセージを確認していますか? – codingtwinky

+0

なぜv1.6.1のデータフローでv1にこの問題がないのですか? – codingtwinky

+0

PubSubメッセージは、データが他のどこかで「永続的にコミット」されるとすぐに確認応答されます。この場合、要素はシャッフル内で永続的にコミットされ、シャッフルが認識されます。パイプラインが単にParDosのシーケンスである場合、コミットはありません。 –

関連する問題