2017-07-01 16 views
3

ハンドラの外側からRPCのストリーミング応答値を生成するにはどうすればよいですか? (具体的には、IObservableから)右に感じることはありません...GRPC非同期応答ストリームC#

public override Task GetTicker(RequestProto request, ServerCallContext context) 
{ 
    var subscription = AnRxObservable.Subscribe(value => 
    { 
     responseStream.WriteAsync(new ResponseProto 
     { 
      Value = value 
     }); 
    }); 

    // Wait for the RPC to be canceled (my extension method 
    // that returns a task that completes when the CancellationToken 
    // is cancelled) 
    await context.CancellationToken.WhenCancelled(); 

    // Dispose of the buffered stream 
    bufferedStream.Dispose(); 

    // Dispose subscriber (tells rx that we aren't subscribed anymore) 
    subscription.Dispose(); 

    return Task.FromResult(1); 
} 

このコードを私は現在、以下のことをやっているが、AnRxObservableはRPCハンドラ間で共有されているので、これは、クロススレッドの問題を作成しています... RPCハンドラの外部で作成された共有ソースからRPC応答をストリーミングする方法は他にありません。

+0

このリンクは、プルパラダイム対プッシュを説明するのに有用であるかもしれませんか? –

+0

観測可能なプッシュされたデータのスレッドは、GetTickerが呼び出されたスレッド(GRPC内部から)ではありませんでした。例として、オブザーバブルはスレッドID 1からの値を常にプッシュしますが、GRPCは(スレッドプールからの)各要求に対して異なるスレッドでGetTickerを呼び出します。問題は、2つの同時GetTicker RPC要求がある場合です。ストリームは、クライアントが予期せず受信するのをやめます。 TLDR; GRPCスレッドセーフです...それは表示されませんが、私はこれを裏付ける証拠を見つけることができませんでした。 – Warrick

答えて

2

一般に、プッシュモデル(IObservable)からプルモデル(プル型モデルへの変換を列挙する)に変換しようとすると、メッセージの中間バッファが必要です。 blockingQueue。ハンドラ本体は、キューの次のメッセージをフェッチしようとする非同期ループ(できれば非同期の方法)であり、それをresponseStreamに書き込むことができます。

また、gRPC APIでは、一度に1つの機内応答しか得られないことに注意してください。あなたのスニペットはこれを尊重しません。したがって、別の書き込みを開始する前にWriteAsync()を待つ必要があります(これが中間キューを必要とするもう1つの理由です)。あなたは「クロススレッドの問題を引き起こして」とはどういう意味ですかWhen to use IEnumerable vs IObservable?

+0

私は実際に飛行機の問題に遭遇し、バッファを実装しました(https://gist.github.com/warrickw/f1795e188aeda5b59eae3e0ad786245a)...私の場合はかなり多くオーバーフローしました。 GRPCは、次の値を送信する前にメッセージの受信を待つか(200ms後になる可能性があります)、効果的に待ち時間に基づく帯域幅のボトルネックが発生しますか? – Warrick

+0

gRPCは、TCP/HTTPフロー制御ウィンドウで許可された後に、メッセージをワイヤで送信するまで書き込み操作を確認して待機します(フロー制御を必要としない場合は、遅いピアを簡単に氾濫させます)。 –

関連する問題