2016-10-03 7 views
0

RabbitMQ pub/sub tutorialを以下のダミーテストに変換しました。どういうわけか、期待どおりに動作していません。RabbitMQ pub/sub実装が動作しない

amqpURLは、有効なAMQPサービス(RabbitMQ)のURLです。私はキューの例でそれをテストし、それは動作します。どうにかして "交換"に失敗する

私はTestDummyに "[x] Hello World"を記録すると思います。どういうわけか、それは起こっていません。送信側の半分だけが期待どおりに動作しています。

どうしたのですか?

import (
    "fmt" 
    "log" 
    "testing" 

    "github.com/streadway/amqp" 
) 

func TestDummy(t *testing.T) { 
    done := exchangeReceive() 
    exchangeSend("Hello World") 
    <-done 
} 

func exchangeSend(msg string) { 
    failOnError := func(err error, msg string) { 
     if err != nil { 
      log.Fatalf("%s: %s", msg, err) 
      panic(fmt.Sprintf("%s: %s", msg, err)) 
     } 
    } 

    log.Printf("exchangeSend: connect %s", amqpURL) 
    conn, err := amqp.Dial(amqpURL) 
    failOnError(err, "Failed to connect to RabbitMQ") 
    defer conn.Close() 

    ch, err := conn.Channel() 
    failOnError(err, "Failed to open a channel") 
    defer ch.Close() 

    err = ch.ExchangeDeclare(
     "logs", // name 
     "fanout", // type 
     true,  // durable 
     false, // auto-deleted 
     false, // internal 
     false, // no-wait 
     nil,  // arguments 
    ) 
    failOnError(err, "Failed to declare an exchange") 

    body := []byte(msg) 
    err = ch.Publish(
     "logs", // exchange 
     "",  // routing key 
     false, // mandatory 
     false, // immediate 
     amqp.Publishing{ 
      ContentType: "text/plain", 
      Body:  []byte(body), 
     }) 
    failOnError(err, "Failed to publish a message") 

    log.Printf(" [x] Sent %s", body) 
} 

func exchangeReceive() <-chan bool { 

    done := make(chan bool) 

    failOnError := func(err error, msg string) { 
     if err != nil { 
      log.Fatalf("%s: %s", msg, err) 
      panic(fmt.Sprintf("%s: %s", msg, err)) 
     } 
    } 

    log.Printf("exchangeReceive: connect %s", amqpURL) 
    conn, err := amqp.Dial(amqpURL) 
    failOnError(err, "Failed to connect to RabbitMQ") 
    defer conn.Close() 

    ch, err := conn.Channel() 
    failOnError(err, "Failed to open a channel") 
    defer ch.Close() 

    err = ch.ExchangeDeclare(
     "logs", // name 
     "fanout", // type 
     true,  // durable 
     false, // auto-deleted 
     false, // internal 
     false, // no-wait 
     nil,  // arguments 
    ) 
    failOnError(err, "Failed to declare an exchange") 

    q, err := ch.QueueDeclare(
     "", // name 
     false, // durable 
     false, // delete when usused 
     true, // exclusive 
     false, // no-wait 
     nil, // arguments 
    ) 
    failOnError(err, "Failed to declare a queue") 

    err = ch.QueueBind(
     q.Name, // queue name 
     "",  // routing key 
     "logs", // exchange 
     false, 
     nil) 
    failOnError(err, "Failed to bind a queue") 

    msgs, err := ch.Consume(
     q.Name, // queue 
     "",  // consumer 
     true, // auto-ack 
     false, // exclusive 
     false, // no-local 
     false, // no-wait 
     nil, // args 
    ) 
    failOnError(err, "Failed to register a consumer") 

    go func() { 
     for d := range msgs { 
      log.Printf(" [x] %s", d.Body) 
      done <- true 
     } 
    }() 

    log.Printf(" [*] Waiting for logs. To exit press CTRL+C") 

    return done 
} 

答えて

0

ここではいくつかの愚かな間違い。 exchangeRecieveが終了すると、遅延ステートメントがトリガーされ、コネクションが閉じられます。だから、私の書き換えに失敗する理由です。

私はこの方法で私のコードを変更しましたし、それが働いた:

import (
    "fmt" 
    "os" 
    "testing" 
    "time" 

    "github.com/streadway/amqp" 
) 

func TestDummy(t *testing.T) { 
    amqpURL := os.Getenv("CLOUDAMQP_URL") 
    t.Logf(" [*] amqpURL: %s", amqpURL) 

    results1 := exchangeReceive(t, "consumer 1", amqpURL) 
    results2 := exchangeReceive(t, "consumer 2", amqpURL) 
    time.Sleep(50 * time.Millisecond) 

    exchangeSend(t, amqpURL, "Hello World") 
    if want, have := "Hello World", <-results1; want != have { 
     t.Errorf("expected %#v, got %#v", want, have) 
    } 
    if want, have := "Hello World", <-results2; want != have { 
     t.Errorf("expected %#v, got %#v", want, have) 
    } 
} 

func exchangeReceive(t *testing.T, name, amqpURL string) <-chan string { 

    out := make(chan string) 

    failOnError := func(err error, msg string) { 
     if err != nil { 
      t.Fatalf("%s: %s", msg, err) 
      panic(fmt.Sprintf("%s: %s", msg, err)) 
     } 
    } 

    conn, err := amqp.Dial(amqpURL) 
    failOnError(err, "Failed to connect to RabbitMQ") 

    ch, err := conn.Channel() 
    failOnError(err, "Failed to open a channel") 

    err = ch.ExchangeDeclare(
     "logs", // name 
     "fanout", // type 
     true,  // durable 
     false, // auto-deleted 
     false, // internal 
     false, // no-wait 
     nil,  // arguments 
    ) 
    failOnError(err, "Failed to declare an exchange") 

    q, err := ch.QueueDeclare(
     "", // name 
     false, // durable 
     false, // delete when usused 
     true, // exclusive 
     false, // no-wait 
     nil, // arguments 
    ) 
    failOnError(err, "Failed to declare a queue") 

    err = ch.QueueBind(
     q.Name, // queue name 
     "",  // routing key 
     "logs", // exchange 
     false, 
     nil) 
    failOnError(err, "Failed to bind a queue") 

    msgs, err := ch.Consume(
     q.Name, // queue 
     "",  // consumer 
     true, // auto-ack 
     false, // exclusive 
     false, // no-local 
     false, // no-wait 
     nil, // args 
    ) 
    failOnError(err, "Failed to register a consumer") 

    go func() { 
     for d := range msgs { 
      t.Logf(" [x] %s received: %s", name, d.Body) 
      out <- string(d.Body) 
     } 
    }() 

    t.Logf(" [*] %s ready to receive", name) 
    return out 
} 

func exchangeSend(t *testing.T, amqpURL, msg string) { 
    failOnError := func(err error, msg string) { 
     if err != nil { 
      t.Fatalf("%s: %s", msg, err) 
      panic(fmt.Sprintf("%s: %s", msg, err)) 
     } 
    } 

    conn, err := amqp.Dial(amqpURL) 
    failOnError(err, "Failed to connect to RabbitMQ") 
    defer conn.Close() 

    ch, err := conn.Channel() 
    failOnError(err, "Failed to open a channel") 
    defer ch.Close() 

    err = ch.ExchangeDeclare(
     "logs", // name 
     "fanout", // type 
     true,  // durable 
     false, // auto-deleted 
     false, // internal 
     false, // no-wait 
     nil,  // arguments 
    ) 
    failOnError(err, "Failed to declare an exchange") 

    body := []byte(msg) 
    err = ch.Publish(
     "logs", // exchange 
     "",  // routing key 
     false, // mandatory 
     false, // immediate 
     amqp.Publishing{ 
      ContentType: "text/plain", 
      Body:  []byte(body), 
     }) 
    failOnError(err, "Failed to publish a message") 

    t.Logf(" [x] Sent %s", body) 
} 
関連する問題