2017-12-08 5 views
2

私は、以下のコードのinit関数でオープンされたDBへの単一の接続を使用して、rabbitmqから受け取った新しいメッセージごとにPostgresテーブルにデータ行を挿入しようとしています。Goの行挿入に単一のPostgres DB接続を再利用するには?

ではなく開口部だけで一つの接続、コードは497を開き、行を停止する挿入引き起こすアウトmaxes ...

私はこれらの質問に情報を使用してみましたがopening and closing DB connection in Go appと私は1つを開く必要が言うopen database connection inside a functionメイン関数がsql文をinit関数でオープンされた接続に渡せるようにするには、グローバルDBを使用します。

私は行くことに新しいですし、プログラミングが限られている

... Postgresの接続制限に達すると、コードは動作を停止したように、しかし、新たな接続がそれぞれの新しい行のために開かれている、私はこれをやったと思いました経験は、私が過去二日間、この問題を解決/理解しようとしていると私は本当に私はこれで間違ったつもりだいくつかの助けを理解した上で行うことができます...

var db *sql.DB 

func init() { 
    var err error 
    db, err = sql.Open ("postgres", "postgres://postgres:[email protected]/PORT/DB") 
    if err != nil { 
     log.Fatal("Invalid DB config:", err) 
    } 
    if err = db.Ping(); err != nil { 
     log.Fatal("DB unreachable:", err) 
    } 
} 

func main() { 

// RABBITMQ CONNECTION CODE IS HERE 

// EACH MESSAGE RECEIVED IS SPLIT TO LEGEND, STATUS, TIMESTAMP VARIABLES 

// VARIABLES ARE PASSED TO sqlSatement  

     sqlStatement := ` 
     INSERT INTO heartbeat ("Legend", "Status", "TimeStamp") 
     VALUES ($1, $2, $3) 
` 
     // sqlStatement IS THEN PASSED TO db.QueryRow 

     db.QueryRow(sqlStatement, Legend, Status, TimeStamp) 
    } 
}() 

<-forever 
} 

完全なコードを以下に示します。

package main 

import (
    "database/sql" 
    "log" 
    _ "github.com/lib/pq" 

    "github.com/streadway/amqp" 
    "strings" 
) 
var db *sql.DB 

func failOnError(err error, msg string) { 
    if err != nil { 
     log.Fatalf("%s: %s", msg, err) 
    } 
} 

func init() { 
    var err error 
    db, err = sql.Open ("postgres", "postgres://postgres:[email protected]:5432/test?sslmode=disable") 
    if err != nil { 
     log.Fatal("Invalid DB config:", err) 
    } 
    if err = db.Ping(); err != nil { 
     log.Fatal("DB unreachable:", err) 
    } 
} 

func main() { 
    conn, err := amqp.Dial("amqp://Admin:[email protected]:50003/") 
    failOnError(err, "Failed to connect to RabbitMQ") 
    defer conn.Close() 

    ch, err := conn.Channel() 
    failOnError(err, "Failed to open a channel") 
    defer ch.Close() 

    q, err := ch.QueueDeclare(
     "HEARTBEAT", // name 
     false,  // durable 
     false,  // delete when unused 
     false,  // exclusive 
     false,  // no-wait 
     nil,   // arguments 
    ) 
    failOnError(err, "Failed to declare a queue") 

    msgs, err := ch.Consume(
     q.Name, // queue 
     "",  // consumer 
     false, // auto-ack 
     false, // exclusive 
     false, // no-local 
     false, // no-wait 
     nil, // args 
    ) 
    failOnError(err, "Failed to register a consumer") 

    forever := make(chan bool) 

    go func() { 

     for d := range msgs { 
      myString := string(d.Body[:]) 
      result := strings.Split(myString, ",") 
      Legend := result[0] 
      Status := result[1] 
      TimeStamp := result[2] 

      sqlStatement := ` 
    INSERT INTO heartbeat ("Legend", "Status", "TimeStamp") 
    VALUES ($1, $2, $3) 
    ` 
      // 
      db.QueryRow(sqlStatement, Legend, Status, TimeStamp) 
     } 
    }() 

    <-forever 
} 
+1

提供されたコードでは、クエリごとに接続を作成しないでください。しかし、あなたは一度だけステートメントを作成することができます。 – zerkms

+0

@zerkmsここの夜遅く、私はあなたが言ったことをよく理解していませんが、私の脳が揚げられていると、私は一度眠りにつきます。 init関数???私の新しさを許してお返事ありがとうございます。 –

+0

あなたは*する必要はありません*、それは単なる改善です。私の主なポイントは、私があなたが提供しているコードの抜粋があなたが観察している症状を引き起こすかもしれないとは思わないということです。 – zerkms

答えて

6

まず、*sql.DBは接続ではなく接続のプールです。必要なだけ多くの接続を開き、postgresサーバーが許可する数だけ接続します。プール内にアイドル状態のものがなくても使用できる状態になったときにのみ新しい接続を開きます。


したがって、DBが開く接続が解放されていないというのはなぜですか?返された*Rowの値にScanを呼び出すことなくQueryRowを使用しているためです。

フードの下で*Rowには、自分の接続にアクセスできる*Rowsインスタンスがあり、その接続はScanが呼び出されると自動的に解放されます。 Scanが呼び出されていない場合、接続は解放されず、の次の呼び出しでDBプールが新しい接続を開くようになります。接続を解放していないので、DBは、postgres設定で指定された制限に達するまで新しいものを開き続け、接続がアイドルになるのを待つので、QueryRowへの次の呼び出しがハングします。

出力を気にしない場合は、Execを使用するか、*RowScanを呼び出す必要があります。

+0

明確で詳細な説明をいただきありがとうございます。これは意味があり、私の理解に本当に役立ちます! –

+0

「スキャン」を使用して問題を解決できました。コードは期待通りに機能します。ありがとうございます! –

+0

問題はありません、私はあなたが働いてうれしいです。 – mkopriva

関連する問題