2017-06-24 16 views
0

製品をドロップダウンして、製品を選択してwebsocketに接続し、その製品のフィードメッセージを取得します。 (1)フィードメッセージが来たら、(2)注文書を入手してから、(3)フィードメッセージを処理する必要があります。したがって、最初と最後のタスクは非同期に実行されます。このために私は次のコードを書いています:マルチスレッドで正しくデータが処理されない

void OnReceivingFeedMessage() 
{ 
    concurrentQueue.Enqueue(message); 

    if (!messageStreamStarted) // only first time get order book 
    { 
     messageStreamStarted = true; 
     GetOrderBookData(); 
    } 
} 

private void GetOrderBookData() 
{ 
    MarketData m = new MarketData(); 
    ProductOrderBook p = m.GetProductOrderBook(productId);   
    bidsList = p.bids; 
    asksList = p.asks; 
    isOrderBookUpdated = true; 

    Task task3 = Task.Run(() => KickStartToProcessQueue());   
} 

private void KickStartToProcessQueue() 
{ 
    while (threadProcessQueueExist) 
    { 
     int recordCountNew = concurrentQueue.Count(); 
     if (recordCountNew != 0) 
     { 
      if (isOrderBookUpdated) 
      { 
       ProcessQueueMessages(); 
      } 
     } 
    } 
} 

private void ProcessQueueMessages() 
{ 
    if (!concurrentQueue.IsEmpty) 
    { 
     string jsonString; 
     while (concurrentQueue.TryDequeue(out jsonString)) 
     { 
      // have to insert the record in existing order book 
     } 
    } 
} 

これは初めて完全に動作します。しかし、製品を交換して再接続すると、データが正しく処理されず、データが正しく処理されません。製品に書かれたコードは、私がロックまたはasync /待つか、何か他のものを使用する必要があるかどうかわからないので、マルチスレッドに新しいです

private void CloseAndReconnectToGetWebsocketFeed() 
{ 
    w.CloseWebsocketConnection();     
    messageStreamStarted = false;    
    isOrderBookUpdated = false; 
    ConcurrentQueue<string> wssMessagesQueue = new ConcurrentQueue<string>(); 
    concurrentQueue = wssMessagesQueue; 

    ConnectAndGetWebsocketFeedMessages(); // this calls OnReceivingFeedMessage 
} 

を変更するのselectedIndex。上記のコードで何が間違っていますか?

初めて実行したときに正常に動作していますが、製品を変更して同じ処理を再度行うと問題が発生します。誰かが何度も同じ手順を繰り返す前にすべてのリソースをクリアする方法をアドバイスできますか?

+1

非常に不必要に複雑なコードがあるようです。メッセージが届くとすぐに(並行して)簡単に処理したいと思っていますか? – georch

+0

新しいメッセージが入ってくるとすぐに古いメッセージの処理を中止しますか? – georch

+0

いいえ、私はメッセージが来るとすぐに注文簿を取得したいし、一度私は注文書を取得し、私はメッセージを処理する必要があります。したがって、最初のメッセージが来て本を注文してから処理します – user1254053

答えて

1

私は不必要に複雑なコードを書いていると思います。私はあなたの問題は何か、100%確実ではないが、ここであなたを助けるかもしれないいくつかのものがあります。

新しいメッセージが入ってくるされるまで、そのクラスでBlockingCollection<T>

を使用して、あなたの消費者のスレッドを停止することができますここでは、これらのものが作業している方法についての簡単な例です:。

BlockingCollection<string> collection = new BlockingCollection<string>(new ConcurrentQueue<string>()); 
Task t = Task.Run(() => 
{ 
    while (collection.TryTake(out string item, Timeout.Infinite)) 
    { 
     Console.WriteLine($"Started reading {item}..."); 
     Thread.Sleep(1000); //simulate intense work 
     Console.WriteLine($"Done reading {item}"); 
    } 
}); 
while (true) 
{ 
    //This could be your OnReceivingFeedMessage() 
    string input = Console.ReadLine(); 
    if (input == "stop") 
    { 
     Console.WriteLine("Stopping..."); 
     collection.CompleteAdding(); 
     break; 
    } 
    else 
    { 
     collection.Add(input); 
    } 
} 
t.Wait(); 

をタスクtは、 collectionに項目があるまで待機します。アイテムが「受け取られた」(ここでは単にコンソール入力を介して)、それらはあなたのリストに追加されます。

は、入力

非常にシンプルで動作するように新しいタスクをディスパッチ:

while (true) 
{ 
    string item = Console.ReadLine(); 
    Task.Run(() => 
    { 
     Console.WriteLine($"Started reading {item}..."); 
     Thread.Sleep(1000); //simulate intense work 
     Console.WriteLine($"Done reading {item}"); 
    }); 
} 

これはまた、タスクはすべて並列に実行されていることを利点(または欠点を)持っています。つまり、自分が作業している順序に頼ることはできませんが、処理がはるかに高速になります。

ところで、これらのアプローチはどちらも、あなたが忙しく待たないという利点があります。このコードは、お使いのプロセッサの一方のコアが実際に何もせずに非常に高い負荷になることを意味し、あなたのキューに限り、何もないよう待機している忙しい作成します

while (threadProcessQueueExist) 
{ 
    int recordCountNew = concurrentQueue.Count(); 
    if (recordCountNew != 0) 
    { 
     if (isOrderBookUpdated) 
     { 
      ProcessQueueMessages(); 
     } 
    } 
} 

:あなたの質問から。それは悪い習慣とみなされます。

関連する問題