2017-06-09 6 views
0

documentationはプールの接続イベントエミッタを登録する例を示しますノードMySQLのプールClusterのイベント・エミッターが動作しない

pool.on('connection', function (connection) { 
    connection.query('SET SESSION auto_increment_increment=1'); 
}); 

マニュアルは、クラスタからプールを取得する方法を示しています。しかし

var pool = poolCluster.of('SLAVE*', 'RANDOM'); 
pool.getConnection(function (err, connection) {}); 
pool.getConnection(function (err, connection) {}); 
pool.query(function (error, results, fields) {}); 

var pool = poolCluster.of('SLAVE*', 'RANDOM');に続いてpool.on(...)でエラーが発生するpool.on is not a function

を登録しようとしています。クラスタを経由するはエラーなしで実行されますが、効果はありません。

TypeError: pool.on is not a function 
    at Object.<anonymous> (E:\scratch\scratch_server.js:14:7) 
    at Module._compile (module.js:570:32) 
    at Object.Module._extensions..js (module.js:579:10) 
    at Module.load (module.js:487:32) 
    at tryModuleLoad (module.js:446:12) 
    at Function.Module._load (module.js:438:3) 
    at Module.runMain (module.js:604:10) 
    at run (bootstrap_node.js:394:7) 
    at startup (bootstrap_node.js:149:9) 
    at bootstrap_node.js:509:3 

hello from new connection 

あなたが見ることができるように、pool.on('connection')の実行に失敗し、初めて.getConnection()のために実行しているときcluster.on('connection')は出さない:上記のコードから

var mysql = require('mysql'); 

var mysql_pool_cluster = mysql.createPoolCluster(); 
mysql_pool_cluster.add('myPool', {user: 'root', password: 'password'}); 
mysql_pool_cluster.on('connection', function(new_conn) { 
    console.log('hello from cluster event emitter'); 
    new_conn.release(); 
}); 

var pool = mysql_pool_cluster.of('myPool', 'ORDER'); 
try { 
    pool.on('connection', function(new_conn) { 
     console.log('hello from pool event emitter'); 
     new_conn.release(); 
    }); 
} catch(err) { 
    console.error(err); 
} 

console.log(''); 

pool.getConnection(function(err, conn) { 
    if (err) { 
     console.error(err); 
    } else { 
     console.log('hello from new connection'); 
     conn.release(); 

     mysql_pool_cluster.end(function(err) { 
      if (err) { 
       console.error(err); 
      } 
     }); 
    } 
}); 

出力を:

コードを再現します。

答えて

0

Iクラスタライブラリが仕様まではなかったことを決めたので、私は私自身のクラスタクラスをコード化:

var mysql = require('mysql'); 
var Promise = require('promise'); 

var deepCopy = function(obj) { 
    // https://stackoverflow.com/a/15040626 
    return JSON.parse(JSON.stringify(obj)); 
}; 
var logger = {log: console.log, error: console.error}; 

class mysqlPoolCluster { 
    // custom class to work around event emitter bug in mysql.createPoolCluster() 
    // https://stackoverflow.com/q/44466894 

    constructor() { 
     this._pool_dict = {}; 
     this._future_pools_on_events_dict = {}; 
    } 

    mergeAndSplitConfs(segregated_confs_dict) { 
     /* 
      Converts this object -- 
      { 
       pools: { 
        admin: {connectionLimit: 1, user: 'my_admin', password: 'password'}, 
        read: {user: 'my_reader', password: 'password'}, 
        write: {user: 'my_writer', password: 'password'}, 
        read_write: {user: 'my_reader_writer', password: 'password'} 
       }, 
       host: 'localhost', 
       database: 'my_db', 
       connectionLimit: 2 
      } 

      to this object -- 
      { 
       admin: { 
        connectionLimit: 1, 
        user: 'my_admin', 
        password: 'password', 
        host: 'localhost', 
        database: 'my_db' 
       }, 
       read: { 
        connectionLimit: 2, 
        user: 'my_reader', 
        password: 'password', 
        host: 'localhost', 
        database: 'my_db' 
       }, 
       write: { 
        connectionLimit: 2, 
        user: 'my_writer', 
        password: 'password', 
        host: 'localhost', 
        database: 'my_db' 
       }, 
       read_write: { 
        connectionLimit: 2, 
        user: 'my_reader_writer', 
        password: 'password', 
        host: 'localhost', 
        database: 'my_db' 
       } 
      } 
     */ 

     var pools_dict = deepCopy(segregated_confs_dict.pools); 
     if (!pools_dict) { 
      throw new Error("arg does not have property 'pools'"); 
     } 

     var base_conf = deepCopy(segregated_confs_dict); 
     delete base_conf.pools; 

     var base_keys = Object.keys(base_conf); 
     var pool_names = Object.keys(pools_dict); 
     for(var i_pool_name=0; i_pool_name < pool_names.length; i_pool_name++) { 
      var pool_conf = pools_dict[pool_names[i_pool_name]]; 
      for(var i_base_key=0; i_base_key < base_keys.length; i_base_key++) { 
       var base_key = base_keys[i_base_key]; 
       if (!pool_conf.hasOwnProperty(base_key)) { 
        pool_conf[base_key] = deepCopy(base_conf[base_key]); 
       } 
      } 
     } 

     return pools_dict; 
    } 

    populatePools(confs_dict) { 
     // 'confs_dict' is the return from this.mergeAndSplitConfs() 
     var names = Object.keys(confs_dict); 
     try { 
      for(var i_name=0; i_name < names.length; i_name++) { 
       var name = names[i_name]; 
       this.createAndAddPool(name, confs_dict[name]); 
      } 
     } catch(err) { 
      this.endClusterAndRemovePoolsPromiser() 
      .catch(logger.error); 

      throw err; 
     } 
    } 

    createAndAddPool(name, conf) { 
     if (this._pool_dict.hasOwnProperty(name)) { 
      throw new Error("pool '" + name + "' already exists"); 
     } 

     this._pool_dict[name] = mysql.createPool(conf); 

     try { 
      this.getPool(name).on('connection', function(conn) { 
       conn.queryPromiser = function(sql, args) { 
        return new Promise(function(resolve, reject) { 
         conn.query(
          sql, 
          args, 
          function(err, results, fields) { 
           if (err) { 
            reject(err); 
           } else { 
            resolve({"results": results, "fields": fields}); 
           } 
          } 
         ); 
        }); 
       }; 
      }); 

      var that = this; 

      this.getPool(name).queryPromiser = function(sql, args) { 
       return new Promise(function(resolve, reject) { 
        that.getPool(name).query(
         sql, 
         args, 
         function(err, results, fields) { 
          if (err) { 
           reject(err); 
          } else { 
           resolve({"results": results, "fields": fields}); 
          } 
         } 
        ); 
       }); 
      }; 

      this.getPool(name).getConnectionPromiser = function() { 
       return new Promise(function(resolve, reject) { 
        that.getPool(name).getConnection(
         function(err, conn) { 
          if (err) { 
           reject(err); 
          } else { 
           resolve(conn); 
           // remember to call conn.release() when you're finished with the conn 
          } 
         } 
        ); 
       }); 
      }; 

      var events = Object.keys(this._future_pools_on_events_dict); 
      for(var i_event=0; i_event < events.length; i_event++) { 
       var event = events[i_event]; 
       for(var i_cb=0; i_cb < this._future_pools_on_events_dict[event].length; i_cb++) { 
        this.getPool(name).on(event, this._future_pools_on_events_dict[event][i_cb]); 
       } 
      } 

      return this.getPool(name); 
     } catch(err) { 
      this.endAndRemovePoolPromiser(name) 
      .catch(logger.error); 

      throw err; 
     } 
    } 

    getPool(name) { 
     if (this._pool_dict.hasOwnProperty(name)) { 
      return this._pool_dict[name]; 
     } else { 
      throw new Error("pool '" + name + "' does not exist"); 
     } 
    } 

    endAndRemovePoolPromiser(name) { 
     var that = this; 
     return new Promise(function(resolve, reject) { 
      that.getPool(name).end(function(err) { 
       delete that._pool_dict[name]; 

       if (err) { 
        reject(err); 
       } else { 
        resolve(); 
       } 
      }); 
     }); 
    } 

    endClusterAndRemovePoolsPromiser() { 
     var end_promises = []; 
     var err_list = []; 
     var names = Object.keys(this._pool_dict); 
     for(var i_name=0; i_name < names.length; i_name++) { 
      end_promises.push(
       this.endAndRemovePoolPromiser(names[i_name]) 
       .catch(function(err) { 
        err_list.push(err); 
       }) 
      ); 
     } 

     return Promise.all(end_promises) 
     .then(function() { 
      if (err_list.length) { 
       return Promise.reject(err_list); 
      } 
     }); 
    } 

    on(event, cb) { 
     var names = Object.keys(this._pool_dict); 
     for(var i_name=0; i_name < names.length; i_name++) { 
      this.getPool(names[i_name]).on(event, cb); 
     } 

     if (!this._future_pools_on_events_dict.hasOwnProperty(event)) { 
      this._future_pools_on_events_dict[event] = []; 
     } 
     this._future_pools_on_events_dict[event].push(cb); 
    } 
} 
関連する問題