2017-10-07 20 views
0

std.container.dlistを使用してキュータイプを作成するのは簡単です。キューを使用してスレッド間で通信する

私は複数のスレッドを持っていますが、メッセージを渡す(https://tour.dlang.org/tour/en/multithreading/message-passing)ではなくキューと通信したいと考えています。私が理解しているように、メッセージはコード内の特定のポイントで常にデータを受け取るように設計されています。受信スレッドは、期待されるデータが受信されるまでブロックされます。

(EDIT:私はreceiveTimeoutについて通知されましたが、タイムアウトがなく、ちょうどこの場合は本当により適切です(多分0のタイムアウトですか?)また、複数の場合にメッセージAPIが何をするかわかりませんメッセージは、私はそれをプレイする必要があります。いずれかが受信されているすべての前に送信されます。)

void main() { 
    spawn(&worker, thisTid); 

    // This line will block until the expected message is received. 
    receive (
     (string message) { 
      writeln("Received the message: ", text); 
     }, 
    ) 
} 

私が必要としていますどのようないくつかがある場合は、単にデータを受信することです。このような何か:

void main() { 
    Queue!string queue// custom `Queue` type based on DList 

    spawn(&worker, queue); 

    while (true) { 
     // Go through any messages (while consuming `queue`) 
     for (string message; queue) { 
      writeln("Received a message: ", text); 
     } 
     // Do other stuff 
    } 
} 

私はshared変数(https://tour.dlang.org/tour/en/multithreading/synchronization-sharing)を使用して試してみましたが、DMDがいることを不平を言っている「可変スレッドローカルデータへのエイリアスが許可されていません。」それに応じて、いくつかのエラーが発生します。

これはDでどのように行われますか?あるいは、この種の通信を行うためにメッセージを使用する方法はありますか?

答えて

0

私は必要な答えを得ました。

簡単に言えば、std.concurrencyではなくcore.threadを使用してください。 std.concurrencyはメッセージを管理しており、自分で管理することはできません。 core.threadは内部でstd.concurrencyが使用しているものです。

答えが長ければ、ここに私がそれを完全に実装した方法があります。

Singly Linked Listに基づいてタイプし、最後の要素のポインタを保持するQueueタイプを作成しました。 Queueは、標準コンポーネントのinputRangeとoutputRangeを使用しています(または少なくとも私はそれがそうすると思います)、Walter Brightsのビジョン(https://www.youtube.com/watch?v=cQkBOCo8UrE)に従ってください。 Queueは、一方のスレッドが書き込むことを可能にし、もう一方のスレッドは内部的に非常に小さなミューテックスで読み込むことができるように構築されているため、高速である必要があります。

Queue!string inputQueue = new Queue!string; 
ThreadInput threadInput = new ThreadInput(inputQueue); 
threadInput.start; 

while (true) { 
    foreach (string value; inputQueue) { 
     writeln(value); 
    } 
} 

ThreadInputがこのようにように定義される:

class ThreadInput : Thread { 
    private Queue!string queue; 

    this(Queue!string queue) { 
     super(&run); 
     this.queue = queue; 
    } 

    private void run() { 
     while (true) { 
      queue.put(readln); 
     } 
    } 
} 

コードhttps://pastebin.com/w5jwRVrL
Queue
キューIは、第二のスレッドが入力を読んでいるためにここhttps://pastebin.com/ddyPpLrp

単純な実装共有しました再びhttps://pastebin.com/ddyPpLrp

1

がちょうどreceiveTimeout代わりの平野receive

http://dpldocs.info/experimental-docs/std.concurrency.receiveTimeout.html

+0

これは本当に私が必要とするものではありませんが、私は 'receiveTimeout'を逃しました。私が他の何かを働かせることができなければ、おそらく 'receiveTimeout'を使って私が必要なことをすることができます。 –

+0

receiveTimoutに-1などの負の値を指定すると、必要な処理が実行されます。参照:https://stackoverflow.com/a/31624806/2026276 – Bauss

0
を呼び出して...これは具体的な質問に答えていませんが、tiが、私はメッセージパッシングAPIの誤解と思われるものまで明確ありません

私はこれを使用します:

shared class Queue(T) { 

    private T[] queue; 

    synchronized void opOpAssign(string op)(T object) if(op == "~") { 
     queue ~= object; 
    } 

    synchronized size_t length(){ 
     return queue.length; 
    } 

    synchronized T pop(){ 
     assert(queue.length, "Please check queue length, is 0"); 
     auto first = queue[0]; 
     queue = queue[1..$]; 
     return first; 
    } 

    synchronized shared(T[]) consume(){ 
     auto copy = queue; 
     queue = []; 
     return copy; 
    } 

} 
関連する問題