2017-01-20 4 views
0

私はジョブディスパッチャであり、たくさんのTCPソケットから大量のデータを照合しているコードをいくつか持っています。このコードはLarge number of transient objects - avoiding contentionへのアプローチの結果であり、大部分はCPU使用率の低下と現在の問題ではありません。バッファリングされたチャネルによるデッドロック

時々、私のアプリケーションがロックアップし、データがまだ私のソケットから入ってきたときに、「チャンネル長」ログだけが繰り返されます。ただし、カウントは5000のままであり、ダウンストリーム処理は行われません。

私は問題は、競合状態であるかもしれない、それは多分にハングアップ取得された行はjobDispatcherselectchannel <- msgだと思い。問題は、私はこれを確認する方法を考えることができません。

私はselectとして項目を無作為に取ることができると思うので、ゴルーチンが返ってきており、shutdownChanは処理する機会がありません。その後、データがinboundFromTCPにヒットし、ブロックされます!

誰かが本当に明らかに間違っているものを見つけるかもしれません。うまくいけば解決策を提供する!?

var MessageQueue = make(chan *trackingPacket_v1, 5000) 

func init() { 
    go jobDispatcher(MessageQueue) 
} 

func addMessage(trackingPacket *trackingPacket_v1) { 
    // Send the packet to the buffered queue! 
    log.Println("Channel length:", len(MessageQueue)) 
    MessageQueue <- trackingPacket 
} 

func jobDispatcher(inboundFromTCP chan *trackingPacket_v1) { 
    var channelMap = make(map[string]chan *trackingPacket_v1) 

    // Channel that listens for the strings that want to exit 
    shutdownChan := make(chan string) 

    for { 
     select { 
     case msg := <-inboundFromTCP: 
      log.Println("Got packet", msg.Avr) 
      channel, ok := channelMap[msg.Avr] 
      if !ok { 
       packetChan := make(chan *trackingPacket_v1) 

       channelMap[msg.Avr] = packetChan 
       go processPackets(packetChan, shutdownChan, msg.Avr) 
       packetChan <- msg 
       continue 
      } 
      channel <- msg 
     case shutdownString := <-shutdownChan: 
      log.Println("Shutting down:", shutdownString) 
      channel, ok := channelMap[shutdownString] 
      if ok { 
       delete(channelMap, shutdownString) 
       close(channel) 
      } 
     } 
    } 
} 

func processPackets(ch chan *trackingPacket_v1, shutdown chan string, id string) { 
    var messages = []*trackingPacket_v1{} 

    tickChan := time.NewTicker(time.Second * 1) 
    defer tickChan.Stop() 

    hasCheckedData := false 

    for { 
     select { 
     case msg := <-ch: 
      log.Println("Got a messages for", id) 
      messages = append(messages, msg) 
      hasCheckedData = false 
     case <-tickChan.C: 

      messages = cullChanMessages(messages) 
      if len(messages) == 0 { 
       messages = nil 
       shutdown <- id 
       return 
      } 

      // No point running checking when packets have not changed!! 
      if hasCheckedData == false { 
       processMLATCandidatesFromChan(messages) 
       hasCheckedData = true 
      } 
     case <-time.After(time.Duration(time.Second * 60)): 
      log.Println("This channel has been around for 60 seconds which is too much, kill it") 
      messages = nil 
      shutdown <- id 
      return 
     } 
    } 
} 

更新01/20/16

私はいくつかのミューテックスのロックをグローバルとしてchannelMapを手直ししようとしたが、それはまだデッドロック終わりました。


コードをちょっと微調整しても、まだロックされていますが、私はこの方法がどういうものかはわかりません!! https://play.golang.org/p/PGpISU4XBJ


更新01/21/17 の人が見ることができるように、私は、スタンドアロンの作業例にこれを入れていくつかの勧告後。 https://play.golang.org/p/88zT7hBLeD

これは長時間実行されるプロセスであるため、機械でローカルに実行する必要があります。うまくいけば、これはそれの底に到達するのに役立ちます!ここ

+0

デッドロックが発生したときにスタックトレースを取得し、各ゴルーチンがブロックされている場所を確認します。 – JimB

+0

私はそれを試みましたが、そこには300以上のゴルーチンがあり、ブロックがどこにあるのか分からなかったのです! –

+0

pprofコマンドはどんな場合でも役立ちますか? –

答えて

2

私はあなたの問題が他のゴルーチンがshutdown <- idをやっているのと同時にchannel <- msgをやっていることに固執していると推測しています。

channelshutdownもバッファされていないため、受信機の待機をブロックします。そして、相手側が利用可能になるのを待ってデッドロックすることができます。

これを修正する方法はいくつかあります。 1つのバッファで両方のチャネルを宣言することができます。

シャットダウンメッセージを送信してシグナリングする代わりに、Googleのコンテキストパッケージが行うことを実行し、シャットダウンチャネルを閉じることでシャットダウン信号を送信できます。 https://golang.org/pkg/context/、特にWithCancelWithDeadline、およびDoneの機能を見てください。

コンテキストを使用して独自のシャットダウンチャネルとタイムアウトコードを削除することができます。

そして、JimBは、まだチャンネルで受信している間、ゴルーチンをシャットダウンするという点があります。何をすべきかは、シャットダウンメッセージを送信する(またはコンテキストを終了する、またはキャンセルする)こと、chチャネルが終了するまでメッセージを処理することです(case msg, ok := <-ch:で検出する)。これは送信者によってシャットダウンが受信された後に発生します。

このようにして、シャットダウンが実際に起こるまでに入ってきたすべてのメッセージを取得し、2回目のデッドロックを回避する必要があります。

+0

私はスタンドアロンの作業例をhttps://play.golang.org/p/88zT7hBLeDに掲載しています。ありがとう! –

-2

私は行くことに新たなんだけど、このコードでは、ここで

case msg := <-inboundFromTCP: 
     log.Println("Got packet", msg.Avr) 
     channel, ok := channelMap[msg.Avr] 
     if !ok { 
      packetChan := make(chan *trackingPacket_v1) 

      channelMap[msg.Avr] = packetChan 
      go processPackets(packetChan, shutdownChan, msg.Avr) 
      packetChan <- msg 
      continue 
     } 
     channel <- msg 

あなたは(バッファリングされていない?)チャンネルで何かを配置していない

channel, ok := channelMap[msg.Avr] 

だから、空にする必要はありませんあなたがここにmsgを追加する前にそのチャンネルを出す?

channel <- msg 

私が言ったように、私は行くことができないので、私はうんざりしていないことを願っています。 :)

+1

いいえ、 'channel、ok'は保存されたチャンネルを地図から取り出しています。存在しない場合、 'ok'はfalseであり、コードはチャネルを作成し、マップに設定します。 –

+0

説明をありがとう、ザン! –

+0

私は人々が私の答えをdownvoting停止することを願っています。質問をした人はまた、目の第2セットを求めた。私の答えは間違っている可能性が高いことが分かっていましたが、時には寄与するだけで、他の人が別の方法で問題を見たり、以前考えなかったことを考えたりするのに役立ちます。彼の答えは、私の答えに基づいてunbufferedされているチャネル< - msgの可能性のある問題について考え始めたことを示すように思われるようですが、これはZan Lynxの場合のようです。少なくとも、私の答えはもう一つの答えを促したようで、この話題に関してもっと活動を誘発したようです。 –

関連する問題