ハンドラの外側からRPCのストリーミング応答値を生成するにはどうすればよいですか? (具体的には、IObservableから)右に感じることはありません...GRPC非同期応答ストリームC#
public override Task GetTicker(RequestProto request, ServerCallContext context)
{
var subscription = AnRxObservable.Subscribe(value =>
{
responseStream.WriteAsync(new ResponseProto
{
Value = value
});
});
// Wait for the RPC to be canceled (my extension method
// that returns a task that completes when the CancellationToken
// is cancelled)
await context.CancellationToken.WhenCancelled();
// Dispose of the buffered stream
bufferedStream.Dispose();
// Dispose subscriber (tells rx that we aren't subscribed anymore)
subscription.Dispose();
return Task.FromResult(1);
}
このコードを私は現在、以下のことをやっているが、AnRxObservable
はRPCハンドラ間で共有されているので、これは、クロススレッドの問題を作成しています... RPCハンドラの外部で作成された共有ソースからRPC応答をストリーミングする方法は他にありません。
:
このリンクは、プルパラダイム対プッシュを説明するのに有用であるかもしれませんか? –
観測可能なプッシュされたデータのスレッドは、GetTickerが呼び出されたスレッド(GRPC内部から)ではありませんでした。例として、オブザーバブルはスレッドID 1からの値を常にプッシュしますが、GRPCは(スレッドプールからの)各要求に対して異なるスレッドでGetTickerを呼び出します。問題は、2つの同時GetTicker RPC要求がある場合です。ストリームは、クライアントが予期せず受信するのをやめます。 TLDR; GRPCスレッドセーフです...それは表示されませんが、私はこれを裏付ける証拠を見つけることができませんでした。 – Warrick