私はジョブディスパッチャであり、たくさんのTCPソケットから大量のデータを照合しているコードをいくつか持っています。このコードはLarge number of transient objects - avoiding contentionへのアプローチの結果であり、大部分はCPU使用率の低下と現在の問題ではありません。バッファリングされたチャネルによるデッドロック
時々、私のアプリケーションがロックアップし、データがまだ私のソケットから入ってきたときに、「チャンネル長」ログだけが繰り返されます。ただし、カウントは5000のままであり、ダウンストリーム処理は行われません。
私は問題は、競合状態であるかもしれない、それは多分にハングアップ取得された行はjobDispatcher
のselect
内channel <- msg
だと思い。問題は、私はこれを確認する方法を考えることができません。
私はselectとして項目を無作為に取ることができると思うので、ゴルーチンが返ってきており、shutdownChanは処理する機会がありません。その後、データがinboundFromTCPにヒットし、ブロックされます!
誰かが本当に明らかに間違っているものを見つけるかもしれません。うまくいけば解決策を提供する!?
var MessageQueue = make(chan *trackingPacket_v1, 5000)
func init() {
go jobDispatcher(MessageQueue)
}
func addMessage(trackingPacket *trackingPacket_v1) {
// Send the packet to the buffered queue!
log.Println("Channel length:", len(MessageQueue))
MessageQueue <- trackingPacket
}
func jobDispatcher(inboundFromTCP chan *trackingPacket_v1) {
var channelMap = make(map[string]chan *trackingPacket_v1)
// Channel that listens for the strings that want to exit
shutdownChan := make(chan string)
for {
select {
case msg := <-inboundFromTCP:
log.Println("Got packet", msg.Avr)
channel, ok := channelMap[msg.Avr]
if !ok {
packetChan := make(chan *trackingPacket_v1)
channelMap[msg.Avr] = packetChan
go processPackets(packetChan, shutdownChan, msg.Avr)
packetChan <- msg
continue
}
channel <- msg
case shutdownString := <-shutdownChan:
log.Println("Shutting down:", shutdownString)
channel, ok := channelMap[shutdownString]
if ok {
delete(channelMap, shutdownString)
close(channel)
}
}
}
}
func processPackets(ch chan *trackingPacket_v1, shutdown chan string, id string) {
var messages = []*trackingPacket_v1{}
tickChan := time.NewTicker(time.Second * 1)
defer tickChan.Stop()
hasCheckedData := false
for {
select {
case msg := <-ch:
log.Println("Got a messages for", id)
messages = append(messages, msg)
hasCheckedData = false
case <-tickChan.C:
messages = cullChanMessages(messages)
if len(messages) == 0 {
messages = nil
shutdown <- id
return
}
// No point running checking when packets have not changed!!
if hasCheckedData == false {
processMLATCandidatesFromChan(messages)
hasCheckedData = true
}
case <-time.After(time.Duration(time.Second * 60)):
log.Println("This channel has been around for 60 seconds which is too much, kill it")
messages = nil
shutdown <- id
return
}
}
}
更新01/20/16
私はいくつかのミューテックスのロックをグローバルとしてchannelMap
を手直ししようとしたが、それはまだデッドロック終わりました。
コードをちょっと微調整しても、まだロックされていますが、私はこの方法がどういうものかはわかりません!! https://play.golang.org/p/PGpISU4XBJ
更新01/21/17 の人が見ることができるように、私は、スタンドアロンの作業例にこれを入れていくつかの勧告後。 https://play.golang.org/p/88zT7hBLeD
これは長時間実行されるプロセスであるため、機械でローカルに実行する必要があります。うまくいけば、これはそれの底に到達するのに役立ちます!ここ
デッドロックが発生したときにスタックトレースを取得し、各ゴルーチンがブロックされている場所を確認します。 – JimB
私はそれを試みましたが、そこには300以上のゴルーチンがあり、ブロックがどこにあるのか分からなかったのです! –
pprofコマンドはどんな場合でも役立ちますか? –