2016-04-08 7 views
1

1つの非同期機能内で同じダーツ分離に繰り返し送信/応答をカプセル化することは可能ですか?単一の非同期機能内で同じDart分離に繰り返し送信/応答をカプセル化

背景:

便利なAPIを設計するために、私は、例えば、関数は非同期分離株によって生成された結果を返す持っていると思います

var ans = await askIsolate(isolateArgs); 

私が直接spawnUri呼び出しによって生成された応答を使用する場合、これは正常に動作し、例えば

Future<String> askIsolate(Map<String,dynamic> isolateArgs) { 

ReceivePort response = new ReceivePort(); 
var uri = Uri.parse(ISOLATE_URI); 

Future<Isolate> remote = Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort); 
return remote.then((i) => response.first) 
       .catchError((e) { print("Failed to spawn isolate"); }) 
       .then((msg) => msg.toString()); 
} 

上記のアプローチの欠点は、しかし、である私は繰り返し、askIsolateを呼び出すために必要がある場合隔離は毎回産卵する必要があります。

私は代わりに、実行中の分離と通信したいと思います。これは、確実に分離がsendPortを呼び出し側に返すことによって可能になります。しかし、私は2013 Isolate refactoring以来、これは呼び出し側がreceivePort上の後続のメッセージを聞いて、単一の非同期関数内のカプセル化を不可能にすることを要求します。

これを達成するためのメカニズムはありますか?

+1

は、それは私が分離株でプレイ中です。 https://pub.dartlang.org/packages/isolateは、分離するための素晴らしいAPIを提供することになっています。私は、それをもっと見る価値があると思います。 –

+1

'package:isolate'の' IsolateRunner'は、スポーンされた隔離領域内の関数を複数回呼び出すためのものです。私はそれがこの問題のために働くと思います: 'var runner = IsolateRunner.spawn();を待ちます。 for(var arg something)} {... runner.run(queryFunction、arg);を待ちます。 ...} await runner.close(); ' – lrn

+1

別のオプションは、サービスを分離して実行することですが、毎回同じポートで結果を返すのではなく、それぞれの要求が独自の' SendPort'を送ることができます。次に、各リクエストは 'ResponsePort'を作成し、レスポンスの' first' getterを返すことができます: 'Future askIsolate(isolateArgs){var p = new ReceivePort(); runningIsolatePort.send([isolateArgs、p.sendPort]); p.firstを返す。 } '。 – lrn

答えて

1

上記のlrnのコメントに基づいた簡単な例です。この例では、spawnURIを介してアイソレートを初期化し、次に応答が期待される新しいReceivePortを渡すことによってアイソレートと通信します。これにより、askIsolateは実行中のspawnURI分離からの応答を直接返すことができます。

注意エラー処理はわかりやすくするために省略されています。

アイソコード:

import 'dart:isolate'; 
import 'dart:convert' show JSON; 

main(List<String> initArgs, SendPort replyTo) async { 
    ReceivePort receivePort = new ReceivePort(); 
    replyTo.send(receivePort.sendPort); 

    receivePort.listen((List<dynamic> callArgs) async { 
    SendPort thisResponsePort = callArgs.removeLast(); //last arg must be the offered sendport 
    thisResponsePort.send("Map values: " + JSON.decode(callArgs[0]).values.join(",")); 
    }); 
} 

電話番号:

import 'dart:async'; 
import 'dart:isolate'; 
import 'dart:convert'; 


const String ISOLATE_URI = "http://localhost/isolates/test_iso.dart"; 
SendPort isolateSendPort = null; 

Future<SendPort> initIsolate(Uri uri) async { 
    ReceivePort response = new ReceivePort(); 
    await Isolate.spawnUri(uri, [], response.sendPort, errorsAreFatal: true); 
    print("Isolate spawned from $ISOLATE_URI"); 
    return await response.first; 
} 


Future<dynamic> askIsolate(Map<String,String> args) async { 
    if (isolateSendPort == null) { 
    print("ERROR: Isolate has not yet been spawned"); 
    isolateSendPort = await initIsolate(Uri.parse(ISOLATE_URI)); //try again 
    } 

    //Send args to the isolate, along with a receiveport upon which we listen for first response 
    ReceivePort response = new ReceivePort(); 
    isolateSendPort.send([JSON.encode(args), response.sendPort]); 
    return await response.first; 
} 

main() async { 
    isolateSendPort = await initIsolate(Uri.parse(ISOLATE_URI)); 

    askIsolate({ 'foo':'bar', 'biz':'baz'}).then(print); 
    askIsolate({ 'zab':'zib', 'rab':'oof'}).then(print); 
    askIsolate({ 'One':'Thanks', 'Two':'lrn'}).then(print); 
} 

出力

Isolate spawned from http://localhost/isolates/test_iso.dart 
Map values: bar,baz 
Map values: zib,oof 
Map values: Thanks,lrn 
2

答えは、分離株

  • を使用する方法によって異なりますが、それは入力し送信し、無期限に実行されていると非同期に答えを受けることを期待し続けるつもりですか?

  • 多くの(しかし有限の)入力を一度に分離して送信し、非同期で応答を受信し、分離を閉じることを期待しますか?

私は後者を推測している、とあなたaskIsolate()機能は、それがすべての答えを受信したときに完了するよりも、すぐにFutureを返す必要があります。

await forループを使用すると、ストリームを受信し、終了するまでイベントを消費することができます。

私は単離株に精通していないので、これは問題ないと思います。私はそれをテストしていません。私は分離株が終了し、応答がクローズしたと仮定しています。

String askIsolate(Map<String,dynamic> isolateArgs) async { 

    ReceivePort response = new ReceivePort(); 
    var uri = Uri.parse(ISOLATE_URI); 

    Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort) 
    .catchError((e)) { 
    throw ...; 
    }); 

    List<String> answers = new List<String>; 

    await for(var answer in response) { 
    out.add(answer.toString()); 
    } 

    return answers; 
} 

注:

  • responseは、あなたが答えを聞いているストリームです。 の前にが生成されているので、それを聞く前に隔離された未来が完了するのを待つ必要はありません。すべてが面倒は私が個人的に混乱して読みにくい見つける.then(...)鎖を有する約いじくるなし - それはそれは非常に簡単に、すぐに関数が返すときに完了し、将来を返すようになるため

  • 私はaskIsolate()非同期を作りました。ところで、あなたの元then(...).catchError(...)スタイルのコードが良く、このように書かれることになる

Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort) 
    .catchError((e) { ... }); 

    return response.first) 
    .then((msg) => msg.toString()); 

私は、分離株の作成後の行にcatchErrorハンドラを取り付ける遅らせることは未来がで完了させるかもしれないと信じていますエラーの前に、ハンドラが配置されています。

参照:https://www.dartlang.org/articles/futures-and-error-handling/#potential-problem-failing-to-register-error-handlers-early分離株が作成されたときに一度だけ数回同じ分離株で関数を呼び出す代わりに - 私もpackage:isolateIsolateRunnerを見てお勧めします、このような問題を解決するもので

+0

実際には、あなたの2つの箇条書きの前者は私が望むものです。 catch(...)問題をキャッチしてくれてありがとう。catchError(...)問題! – ilikerobots

+0

入力を隔離領域に送信し、分離領域が結果を返すときに完了する未来を返します。これは繰り返し呼び出すことができますか? –

2

あなたは、他の、より原始的な、選択肢があることをしたくない場合は

非同期機能は、先物またはストリームに待つことができるとReceivePortは、ストリームです。 クイックハックの場合は、おそらく応答ストリームでawait forで何かをすることができますが、あまり便利ではありません。

ReceivePortStreamQueueに置き換えると、package:asyncがより適しています。これにより、個々のイベントを未来に変換することができます。次のようなものがあります。

myFunc() async { 
    var responses = new ReceivePort(); 
    var queue = new StreamQueue(responses); 
    // queryFunction sends its own SendPort on the port you pass to it. 
    var isolate = await isolate.spawn(queryFunction, [], responses.sendPort); 
    var queryPort = await queue.next(); 
    for (var something in somethingToDo) { 
    queryPort.send(something); 
    var response = await queue.next(); 
    doSomethingWithIt(response); 
    } 
    queryPort.send("shutdown command"); 
    // or isolate.kill(), but it's better to shut down cleanly. 
    responses.close(); // Don't forget to close the receive port. 
} 
関連する問題