2016-08-02 9 views
2

私はMassTransit sagaステートマシン(Automatonymous.MassTransitStateMachineから派生した)を持っています。エンドポイント設定prefetchCountを1より大きな値に設定したときにしか現れない問題を回避しようとしていますPrefetch> 1で動作するMassTransit saga

問題は 'StartupCompletedEvent'が公開されてから、サガ状態がデータベースに保存される前に直ちに処理されることです。次のように

ステート・マシンが設定されています。

State(() => Initialising); 
State(() => StartingUp); 
State(() => GeneratingFiles); 

Event(() => Requested, x => x.CorrelateById(ctx => ctx.Message.CorrelationId).SelectId(ctx => ctx.Message.CorrelationId)); 
Event(() => StartupCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); 
Event(() => InitialisationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); 
Event(() => FileGenerationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); 


Initially(
    When(Requested) 
     .ThenAsync(async ctx => 
     { 
      Console.WriteLine("Starting up..."); 
      await ctx.Publish(new StartupCompletedEvent() { CorrelationId = ctx.Instance.CorrelationId })); 
      Console.WriteLine("Done starting up..."); 
     } 
     .TransitionTo(StartingUp) 
); 


During(StartingUp, 
    When(StartupCompleted) 
     .ThenAsync(InitialiseSagaInstanceData) 
     .TransitionTo(Initialising) 
); 

// snip...! 

私の武勇伝は、要求されたイベントである受信したときに何が起こる:

  1. 最初のブロックのThenAsyncハンドラがヒットします。この時点で、サガデータはレポに永続化されません(期待どおり)。
  2. StartupCompletedEventがバスに公開されています。ここでもサガデータはレポに保存されません。
  3. 最初の宣言のThenAsyncブロックが完了します。その後、サガデータは最終的に永続化されます。
  4. 何も起こりません。

この時点では、キューにメッセージはなく、StartupCompletedEventは失われます。ただし、データベースには、サガインスタンスがあります。

私はスタートアップについて演奏し、他のスレッドのうちの1つ(私のプリフェッチが> 1であるため)がイベントをピックアップし、データベース内のcorrelationIdが見つからず、イベントを破棄したと判断しました。このイベントは、サガが持続される前に公開され、処理されています。

私は最初のハンドラに以下を追加した場合:

When(StartupCompleted) 
    .Then(ctx => Console.WriteLine("Got the startup completed event when there is no saga instance")) 

それから私は、Console.WriteLineをが実行し得ます。これについて私が理解しているのは、イベントが受信されたが、correlationIdに存在するサガが存在しないため、Initiallyハンドラにルーティングされるということです。私がこの時点でブレークポイントを入れて、サガレポをチェックすると、サガはまだありません。

それは他のいくつかのポイントを挙げ、おそらく価値がある:

  1. 私はIsolationLevel.Serializable
  2. を使用するように設定私のサガレポコンテキストを持って、私は時にプリフェッチ・カウントすべてが期待どおりに動作しますEntityFrameworkSagaRepository
  3. を使用していますが1に設定されています
  4. 私はDI用にNinjectを使用しています。私のSagaRepositoryはスレッドスコープです。プリフェッチカウントが許可する各ハンドラが独自のsagaリポジトリコピーを持っていると想像してください。
  5. StartupCompletedEventを1000ms前にスリープしている別のスレッドにパブリッシュすると、正常に動作します。これは、サガレポがサガ状態を維持し終わったため、最終的にイベントが公開され、ハンドラによってピックアップされると、サガ状態がリポジトリから正しく取得されるためです。

私が何かを残しておけば教えてください。私はこの質問をあまりにも長くすることなく、私が価値あると思うすべてを提供しようとしました...

+0

私はイベントが永続ステップ(https://groups.google.com/d/msg/masstransit-discuss/Jom6ns5jF-w/uaxtPY6kb3IJ)まで開催されていることを主張MT3についての声明を見つけました。しかし、私はその行動を全く見ていない。私はMT 3.3.5を使用しています。 – Robert

+0

サガが永続するまでパブリッシュを延期する場合は、InMemoryOutbox()を有効にする必要があります。 –

+0

@ChrisPattersonありがとうございます - それはそれを修正したようです。なぜこれがデフォルト設定ではないのかを詳しく説明できますか?私はあなたが*この行動を望んでいない状況を想像することはできません... – Robert

答えて

2

私もこの問題を抱えていました。クリスのコメントを回答として投稿して、人々が見つけることができるようにしたいと考えています。

解決策は、送信トレイを有効にして、サガが保持されるまでメッセージを保持することです。

c.ReceiveEndpoint("queue", e => 
{ 
    e.UseInMemoryOutbox(); 
    // other endpoint configuration here 
} 
関連する問題