2016-12-22 6 views
2

私は、RabbitMQでMassTransitを使用してステートフルサービスとしてService Fabric上で動作しているサービスにメッセージを投稿するウェブサイトのデモを考えてきました。MassTransitとサービスファブリックステートフルサービス?

IBusControl bus = BusConfigurator.ConfigureBus(); 
Uri sendToUri = new Uri($"{RabbitMqConstants.RabbitMqUri}" + $"{RabbitMqConstants.PeopleServiceQueue}"); 
ISendEndpoint endPoint = await bus.GetSendEndpoint(sendToUri); 

await endPoint.Send<ICompanyRequest>(new {CompanyId = id }); 

私のサービスファブリックサービスにおける私の消費者が同じように定義されました:

 IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg => 
     { 
      IRabbitMqHost host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri), h => 
      { 
       h.Username(RabbitMqConstants.UserName); 
       h.Password(RabbitMqConstants.Password); 
      }); 

      cfg.ReceiveEndpoint(host, RabbitMqConstants.PeopleServiceQueue, e => 
      { 
       e.Consumer<PersonInformationConsumer>(); 
      }); 

     }); 

     busControl.Start(); 

これは私がでメッセージを消費することができないすべてがうまく行っていた

、私のクライアントは、メッセージを投稿します私のクラスと私はそれをうまく処理することができます。この問題は、IReliableDictonaryまたはIReliableQueue、またはサービスファブリックサービスのRunAsync関数から実行されるコンテキストを参照する必要があるものを使用する場合に発生します。

私の質問は、どのようにしてMassTransitがサービスコンテキスト自体の知識を持つステートフルサービスファブリックサービス内で動作するように設定できますか?

事前に感謝します。 [OK]をマイク

更新 が、私は私のメッセージコンシューマクラス(例えば)にレジスタ・ルーチンを指している場合、この上でいくつかの進歩を遂げてきました:

ServiceRuntime.RegisterServiceAsync("ServiceType", context => new PersonInformationConsumer(context)).GetAwaiter().GetResult(); 
       ServiceEventSource.Current.ServiceTypeRegistered(Process.GetCurrentProcess().Id, typeof(PersonInformationConsumer).Name); 

その後、私のための私の消費者のクラスで

0:

internal sealed class PersonInformationConsumer : StatefulService, IConsumer<ICompanyRequest> 
{ 
    private static StatefulServiceContext _currentContext; 

    #region Constructors 

    public PersonInformationConsumer(StatefulServiceContext serviceContext) : base(serviceContext) 
    { 
     _currentContext = serviceContext; 
    } 

    public PersonInformationConsumer() : base(_currentContext) 
    { 
    } 

私は今、成功したサービスメッセージを呼び出すことができます。メッセージは、私は、次の操作を行うことができます

私は現在IReliableDictionaryに何かを保存しようとしていますが、これは "オブジェクト参照がオブジェクトのインスタンスに設定されていません"というエラーが発生します。エラー:(...任意のアイデアは新しい年になるまで)

 public async Task Consume(ConsumeContext<ICompanyRequest> context) 
    { 

     ServiceEventSource.Current.ServiceMessage(this.Context, "Message has been consumed, request Id: {0}", context.Message.CompanyId); 

     using (ITransaction tx = StateManager.CreateTransaction()) 
     { 

      try 
      { 

       var myDictionary = await StateManager.GetOrAddAsync<IReliableDictionary<string, long>>("myDictionary"); 

これはエラーの原因です....ヘルプ! :)

+0

私はサービスファブリックSDKがインストールされていませんが、非同期メソッドの内部_static_クラスへのアクセスを見ることは確実に信頼性の高い辞書のインスタンスにアクセスするための別の方法があり、私は一時停止できます。 –

答えて

0

MassTransitとステートフルなサービスを連携させるにはもう少しやりがいがありますが、ここで懸念すべき問題がいくつかあります。

ステートフルパーティション内のマスタ(n個のパーティション内のn個のマスタ)のみがステートフルサービスへの書き込み/更新が可能で、すべてのレプリカは状態を書き戻そうとすると例外をスローします。だから、この問題に対処する必要があります。クラスタの再調整のためにマスターがクラスタの周りを動くことができるようになるまで、簡単に聞こえます。一般的なサービスファブリックアプリケーションのデフォルトは、レプリカ上での処理を行い、マスター上でのみ作業を実行します。これは、すべてRunAsyncメソッドによって実行されます(RunAsyncメソッドで何かを使って3つのステートフルサービスを実行し、次にマスタを終了します)。

また、パーティションでステートフルなサービスの規模があるため、データをサービスバス上の別個のエンドポイントに配布する方法を作成する必要があります。所定のパーティション範囲? UserCreatedメッセージがあるとします。これを分割するとcountryUKはパーティション1に、USはパーティション2に移動します。

何か基本的なものを稼働させたいのであれば、それを1つのパーティションに限定して、RunAsync内にバス作成を入れ、取り消しトークンで取り消しが要求されたらバスをシャットダウンしてみてください。

protected override async Task RunAsync(CancellationToken cancellationToken) 
{ 
    var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => 
    { 
     IRabbitMqHost host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri), h => 
     { 
      h.Username(RabbitMqConstants.UserName); 
      h.Password(RabbitMqConstants.Password); 
     }); 

     cfg.ReceiveEndpoint(host, RabbitMqConstants.PeopleServiceQueue, e => 
     { 
      // Pass in the stateful service context 
      e.Consumer(c => new PersonInformationConsumer(Context)); 
     }); 

    }); 

    busControl.Start(); 

    while (true) 
    { 
     if(cancellationToken.CancellationRequested) 
     { 
      //Service Fabric wants us to stop 
      busControl.Stop(); 

      cancellationToken.ThrowIfCancellationRequested(); 
     } 

     await Task.Delay(TimeSpan.FromSeconds(1)); 
    } 
} 
+0

サービスリスナーを使用してPOCを少し行いました。この例を確認してください。 - https://github.com/kevbite/MassTransit.ServiceFabric –

関連する問題