2016-09-08 3 views
0

Azure Service Busをトランスポートとして使用していますが、IConsumer内部からの呼び出し以外のスケジュールされたメッセージは機能しません。MassTransitStateMachineのスケジュールが壊れていますか?

私は何時間も何時間も費やしていましたが、何が起こっているのかはほとんど分かりません。

空白のサービスバスを使用して状態マシンからスケジュールを取得するために必要なことを説明できる人はいますか?そして、おそらくスケジュールメッセージがIConsumerコンテキストから動作しますが、他の場所では動作しないのはなぜでしょうか。

public class BatchCollector : MassTransitStateMachine<BufferSaga> 
{ 
    public BatchCollector(IBatchFactory batchFactory) 
    { 
     InstanceState(saga => saga.State); 
     Event(() => BufferedCommandDetected, 
      _ => _.CorrelateById(context => context.Message.GetBatchId())); 

     Schedule(() => WindowElapsed, x => x.BatchCompletionId, x => 
     { 
      x.Delay = TimeSpan.FromSeconds(5); 
      x.Received = e => e.CorrelateById(context => context.Message.CorrelationId); 
     }); 


     Initially(
      When(BufferedCommandDetected) 
       .Then(
        context => 
        { 
         context.Instance.CorrelationId = context.Data.GetBatchId(); 
         context.Instance.Id = Guid.NewGuid().ToString("N"); 
         context.Instance.Buffer.Add(context.Data); 
         context.Instance.BatchStartTime = DateTimeOffset.Now; 
         context.Instance.AbsoluteDeadLine = DateTimeOffset.Now + context.Data.AbsoluteWindowSpan; 
         context.Instance.SlidingDeadLine = DateTimeOffset.Now + context.Data.SlidingWindowSpan; 
        }) 
       .Schedule(WindowElapsed, 
        context => new WindowElapsed {CorrelationId = context.Instance.CorrelationId }, 
        delayProvider: scheduleDelayProvider => scheduleDelayProvider.Data.SlidingWindowSpan < scheduleDelayProvider.Data.AbsoluteWindowSpan ? scheduleDelayProvider.Data.SlidingWindowSpan : scheduleDelayProvider.Data.AbsoluteWindowSpan) 
       .TransitionTo(Waiting)); 

     During(Waiting, 
      When(BufferedCommandDetected) 
       .Then(context => 
       { 
        context.Instance.SlidingDeadLine += context.Data.SlidingWindowSpan; 
        context.Instance.Buffer.Add(context.Data); 
       }), 
      When(WindowElapsed.Received, context => context.Instance.SlidingDeadLine > DateTimeOffset.Now && context.Instance.AbsoluteDeadLine > DateTimeOffset.Now) 
       .Schedule(WindowElapsed, context => new WindowElapsed { CorrelationId = context.Instance.CorrelationId }), 
      When(WindowElapsed.Received, context => context.Instance.SlidingDeadLine <= DateTimeOffset.Now || context.Instance.AbsoluteDeadLine <= DateTimeOffset.Now) 
       //.Unschedule(WindowElapsed) 
       .Publish(context => new Batch() 
       { 
        BatchId = context.Instance.BatchCompletionId ?? Guid.NewGuid(), 
        Content = context.Instance.Buffer, 
        StartTime = context.Instance.BatchStartTime, 
        EndTime = DateTimeOffset.Now 
       }) 
       .Finalize() 
       .TransitionTo(BufferCompleted)); 

     SetCompletedWhenFinalized(); 
    } 

    public Event<BufferedCommand> BufferedCommandDetected { get; private set; } 


    public Schedule<BufferSaga, WindowElapsed> WindowElapsed { get; private set; } 

    public State Waiting { get; private set; } 

    public State BufferCompleted { get; private set; } 
} 

バスのinit:

container.RegisterType<IBusControl>(
      new HierarchicalLifetimeManager(), 
      new InjectionFactory(c => 
      { 
       var bus = Bus.Factory.CreateUsingAzureServiceBus(
        cfg => 
        { 
         var azSbHost = cfg.Host(new Uri(CloudConfigurationManager.GetSetting("ServiceBus.Url")) 
          , host => 
          { 
           host.TokenProvider = TokenProvider 
            .CreateSharedAccessSignatureTokenProvider 
            (CloudConfigurationManager.GetSetting("ServiceBus.SharedAccessKeyName"), 
             CloudConfigurationManager.GetSetting("ServiceBus.AccessKey"), 
             TokenScope.Namespace); 
          }); 

         cfg.ReceiveEndpoint(
          azSbHost, 
          "Quartz.Scheduler", 
          sbConfig => 
           { 
            cfg.UseMessageScheduler(sbConfig.InputAddress); 
            sbConfig.Consumer(() => new ScheduleMessageConsumer(c.Resolve<IScheduler>())); 
           } 
         ); 

         cfg.ReceiveEndpoint(
          azSbHost, 
          Assembly.GetExecutingAssembly().GetName().Name, 
          sbConfig => 
          { 
           AllClasses.FromAssembliesInBasePath() 
            .Where(
             @class => 
              (@class?.Namespace?.StartsWith("bcn", 
               StringComparison.OrdinalIgnoreCase) ?? false) 
              && 
              @class.GetParentClasses() 
               .Any(
                parent => 
                  parent.Name.StartsWith("MassTransitStateMachine`1"))) 
            .ForEach(@class => 
            { 
             //dynamic cast to avoid having to deal with generic typing when type is not known until runtime.             
             dynamic stateMachineExtension = 
              new DynamicStaticWrapper(typeof(StateMachineSubscriptionExtensions)); 
             stateMachineExtension 
              .StateMachineSaga(
               sbConfig, 
               c.Resolve(@class), 
               c.Resolve(typeof(ISagaRepository<>).MakeGenericType(
                @class.GetParentClasses().First(parent => 
                   parent.Name.StartsWith("MassTransitStateMachine`1")) 
                 .GetGenericArguments().First()))); 
            }); 



           AllClasses.FromAssembliesInBasePath() 
            .Where(
             @class => 
              (@class?.Namespace?.StartsWith("bcn", StringComparison.OrdinalIgnoreCase) ?? 
              false) 
              && @class.GetInterfaces().Any(
               @interface => 
                @interface?.FullName?.StartsWith("MassTransit.IConsumer`1") ?? 
                false)) 
            .ForEach(@class => 
            { 
             var factoryType = typeof(UnityConsumerFactory<>).MakeGenericType(@class); 
             //Automatically register consumers. 
             dynamic consumerFactory = Activator.CreateInstance(
              factoryType, 
              container); 
             var consumingMethod = typeof(ConsumerExtensions). 
              GetMethods() 
              .First(
               m => 
                m.Name == "Consumer" && m.IsGenericMethod && 
                m.GetGenericArguments().Length == 1 && 
                m.GetParameters().Length == 3) 
              .MakeGenericMethod(@class) 
              .Invoke(null, new object[] {sbConfig, consumerFactory, null}); 

             //Automatically detect which payload contains message data. This message data is stored in blob. 
             @class.GetInterfaces().Where(
               @interface => 
                 @interface.FullName.StartsWith("MassTransit.IConsumer`1")) 
              .Select(@interface => @interface.GetGenericArguments().First()) 
              .Where(payload => payload.GetProperties() 
               .Any(prop => prop.PropertyType.Name.StartsWith("MessageData`1"))) 
              .ForEach(
               BlobType => 
                typeof(MessageDataConfiguratorExtensions) 
                 .GetMethods() 
                 .First(
                  method => 
                   method.GetParameters().First().ParameterType == 
                   typeof(IConsumePipeConfigurator) 
                   && 
                   method.GetParameters().Last().ParameterType == 
                   typeof(IMessageDataRepository)) 
                 .MakeGenericMethod(BlobType) 
                 .Invoke(null, 
                  new object[] 
                   {sbConfig, c.Resolve<IMessageDataRepository>()})); 
            }); 
          }); 

         cfg.UseServiceBusMessageScheduler(); 
         //azSbHost. 
        }); 

       return bus; 
      })); 
     container.RegisterType<IBus, IBusControl>(); 
     container.RegisterType<IBus, IBusControl>(new ContainerControlledLifetimeManager()); 

そして開始:

var container = UnityConfig.GetConfiguredContainer(); 
     var bus = container.Resolve<IBusControl>(); 
     bus.Start(); 

     var scheduler = container.Resolve<IScheduler>(); 
     scheduler.Start(); 

     bus.Publish<BufferedCommand>(new BufferedCommandAdapter<decimal>(10m, TimeSpan.FromSeconds(5), 
      TimeSpan.FromSeconds(5))); 

答えて

0

あなたは、石英のジョブの工場を設定していますか? QuartzIntegrationライブラリのセットアップを行う方法を見てみましょう:、また

https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit.QuartzIntegration/QuartzIntegrationExtensions.cs

をその石英は/一時停止開始されたので、バスの周りにオブザーバーを使用/バスでインラインで停止しました。

https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit.QuartzIntegration/Configuration/SchedulerBusObserver.cs

+0

クリス長い時間の後、RavenDbのサガプロバイダが壊れているようだ:(私はそれを動作させることはできません。メモリは、それが正常に動作します。それが更新されていないプロジェクトを見てみます4年...それは自信を失わせるものではありません – Alwyn

+0

おそらくMT3用に設計された新しいものがあると思います –

+0

このプロジェクトがありますhttps://github.com/alexeyzimarev/MassTransit.RavenDbIntegrationナゲットパッケージが壊れていますので、ソースと参照を直接参照してください。これまでよりうまくいくと思われます。ロックはまだ私が望むよりも遅いため、イベントを結合するときに並列に実行する傾向があります。 – Alwyn

関連する問題