2016-11-18 1 views
1

私はプロジェクトでgolang RabbitMQライブラリを使用していますが、別のパッケージにConnect関数があります。私はmain関数でConnectと呼んでいますが、別の関数でRabbitMQに接続するので、接続関数内の接続を閉じるdefer conn.Close()関数が呼び出されます。どちらが完璧な意味を持っていますが、それでは質問をします。それではどこに電話しますかconn.Close()別の方法でgolangの延期を使用

package drivers 

import (

    // Core 
    "log" 
    "os" 
    "time" 

    // Third party 
    "github.com/streadway/amqp" 
) 

type Queue struct { 
    Channel *amqp.Channel 
} 

func NewQueue() *Queue { 
    return &Queue{} 
} 

// Queue interface 
type IQueue interface { 
    Connect(args ...interface{}) 
    Publish(queue string, payload []byte) error 
    Listen(queue string) (<-chan amqp.Delivery, error) 
    Declare(queue string) (amqp.Queue, error) 
} 

// Connect - Connects to RabbitMQ 
func (queue *Queue) Connect(args ...interface{}) { 

    var uri string 

    if args == nil { 

     // Get from env vars 
     uri = os.Getenv("RABBIT_MQ_URI") 

     if uri == "" { 
      log.Panic("No uri for queue given") 
     } 
    } else { 
     uri = args[0].(string) 
    } 

    // Make max 5 connection attempts, with a 1 second timeout 
    for i := 0; i < 5; i++ { 

     log.Println("Connecting to:", uri) 

     // If connection is successful, return new instance 
     conn, err := amqp.Dial(uri) 
     defer conn.Close() 

     if err == nil { 
      log.Println("Successfully connected to queue!") 
      channel, _ := conn.Channel() 
      queue.Channel = channel 
      return 
     } 

     log.Println("Failed to connect to queue, retrying...", err) 

     // Wait 1 second 
     time.Sleep(5 * time.Second) 
    } 
} 

// Declare a new queue 
func (queue *Queue) Declare(queueName string) (amqp.Queue, error) { 
    return queue.Channel.QueueDeclare(
     queueName, 
     true, 
     false, 
     false, 
     false, 
     nil, 
    ) 
} 

// Publish a message 
func (queue *Queue) Publish(queueName string, payload []byte) error { 
    return queue.Channel.Publish(
     "", 
     queueName, 
     false, 
     false, 
     amqp.Publishing{ 
      DeliveryMode: amqp.Persistent, 
      ContentType: "application/json", 
      Body:   payload, 
     }, 
    ) 
} 

// Listen for a new message 
func (queue *Queue) Listen(queueName string) (<-chan amqp.Delivery, error) { 
    return queue.Channel.Consume(
     queueName, 
     "", 
     true, 
     false, 
     false, 
     false, 
     nil, 
    ) 
} 

あなたは上記のコードで見ることができるように、私は接続を行った後defer conn.Close()を呼んでいるが、しかし、これはすぐに再接続を閉じます。ここで

は簡単な解決策は、他の場所からconn.Close()を呼び出すことであるhttps://play.golang.org/p/5cz2D4gDgn

+2

明白な答えは、あなたが接続して完了したら[閉じる呼んでいます。あなたのコードのどこにあるかはあなた次第です。 'Queue'が接続のプロキシである場合、そこにCloseメソッドを公開してみませんか? – JimB

+0

あなたは絶対に正しいです!それはちょっと分かりましたが、今あなたはそれを指摘しました。私はチャンネルと接続のプロパティを公開しているので、私の主な機能のものにdeferを呼んでいます。 –

答えて

1

...私が話している何ゴー遊び場スプーフィングです。これはちょうど私かもしれませんが、他の場所、つまりQueueのフィールドとして接続を公開しないことはちょっと奇妙だと思います。キューからの接続を閉じる機能を公開することでこれを解決し、より柔軟に対応できます。

ので、この:

type Queue struct { 
    // your original fields 
    Conn amqp.Connection 
} 

// Somewhere else 
queue.Conn.Close() 

あなたは他のオプションが閉じた後、その接続であなたが望むすべてのアクションをやって、接続されています。私のような何かを考えている:

func action(conn amqp.Connection, args ...interface{}) (<-chan bool) { 
    done := make(chan bool) 
    go func(amqpConn amqp.Connection, dChan chan bool){ 
     // Do what you want with the connection 
     dChan <- true 
    }(conn, done) 
    return done 
} 

func (queue *Queue) Connect(args ...interface{}) { 
    // your connection code 
    doneChans := make([](chan bool), 5) 
    for i := 0; i < 5; i++ { 
      conn, err := amqp.Dial(uri) 
      defer conn.Close() 
      if err != nil { 
       // handle error 
      } 
      done := action(conn) 
    } 
    // This for loop will block until the 5 action calls are done 
    for j := range doneChans { 
     isFinish := <-doneChans[j] 
     if !isFinish { 
      // handle bad state 
     } 
    } 
} 
1

1つのオプションは、Connectリターンconnを持ち、呼び出し元にdefer conn.Close()を呼び出すことです。

package driver 

// imports, etc 

func (queue *Queue) Connect(args ...interface{}) amqp.Connection, error { 
    // ... 

    conn, err := amqp.Dial(uri) 
    if err != nil { 
     return nil, err 
    } 

    // ... 

    return conn, nil 
} 

はその後、別のパッケージに:

package stuff 

// imports, etc 

func doStuff() { 
    queue = driver.NewQueue() 

    conn, err := queue.Connect(args...) 
    if err != nil { 
     log.Fatalf("oh no! %v!", err) 
    } 

    defer conn.Close() 

    // Do stuff 
} 
関連する問題