schangxiang@126.com
2025-09-19 9be9c3784b2881a3fa25e93ae2033dc2803c0ed0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
'use strict';
 
const util = require('util');
const mysql = require('mysql');
const wrap = require('co-wrap-all');
const Operator = require('./operator');
const RDSConnection = require('./connection');
const RDSTransaction = require('./transaction');
const promisify = require('pify');
 
module.exports = RDSClient;
module.exports.literals = require('./literals');
 
function RDSClient(options) {
  if (!(this instanceof RDSClient)) {
    return new RDSClient(options);
  }
  Operator.call(this);
 
  this.pool = mysql.createPool(options);
  [
    'query',
    'getConnection',
  ].forEach(method => {
    this.pool[method] = promisify(this.pool[method]);
  });
}
 
util.inherits(RDSClient, Operator);
 
const proto = RDSClient.prototype;
 
proto._query = function(sql) {
  return this.pool.query(sql);
};
 
proto.getConnection = function() {
  return this.pool.getConnection().then(onConnection, onError);
  function onConnection(conn) {
    return new RDSConnection(conn);
  }
  function onError(err) {
    if (err.name === 'Error') {
      err.name = 'RDSClientGetConnectionError';
    }
    throw err;
  }
};
 
/**
 * Begin a transaction
 *
 * @return {Transaction} transaction instance
 */
proto.beginTransaction = function* () {
  const conn = yield this.getConnection();
  try {
    yield conn.beginTransaction();
  } catch (err) {
    conn.release();
    throw err;
  }
 
  return new RDSTransaction(conn);
};
 
/**
 * Auto commit or rollback on a transaction scope
 *
 * @param {Function} scope - scope with code
 * @param {Object} [ctx] - transaction env context, like koa's ctx.
 *   To make sure only one active transaction on this ctx.
 * @return {Object} - scope return result
 */
proto.beginTransactionScope = function* (scope, ctx) {
  ctx = ctx || {};
  if (!ctx._transactionConnection) {
    ctx._transactionConnection = yield this.beginTransaction();
    ctx._transactionScopeCount = 1;
  } else {
    ctx._transactionScopeCount++;
  }
  const tran = ctx._transactionConnection;
  try {
    const result = yield scope(tran);
    ctx._transactionScopeCount--;
    if (ctx._transactionScopeCount === 0) {
      ctx._transactionConnection = null;
      yield tran.commit();
    }
    return result;
  } catch (err) {
    if (ctx._transactionConnection) {
      ctx._transactionConnection = null;
      yield tran.rollback();
    }
    throw err;
  }
};
 
/**
 * doomed to be rollbacked after transaction scope
 * useful on writing test that depend on database
 *
 * @param {Function} scope - scope with code
 * @param {Object} [ctx] - transaction env context, like koa's ctx.
 *   To make sure only one active transaction on this ctx.
 * @return {Object} - scope return result
 */
proto.beginDoomedTransactionScope = function* (scope, ctx) {
  ctx = ctx || {};
  if (!ctx._transactionConnection) {
    ctx._transactionConnection = yield this.beginTransaction();
    ctx._transactionScopeCount = 1;
  } else {
    ctx._transactionScopeCount++;
  }
  const tran = ctx._transactionConnection;
  try {
    const result = yield scope(tran);
    ctx._transactionScopeCount--;
    if (ctx._transactionScopeCount === 0) {
      ctx._transactionConnection = null;
    }
    return result;
  } catch (err) {
    if (ctx._transactionConnection) {
      ctx._transactionConnection = null;
    }
    throw err;
  } finally {
    yield tran.rollback();
  }
};
 
proto.end = function(callback) {
  // callback style
  if (callback) {
    return this.pool.end(callback);
  }
 
  // promise style
  const that = this;
  return new Promise(function(resolve, reject) {
    that.pool.end(function(err) {
      if (err) {
        return reject(err);
      }
      resolve();
    });
  });
};
 
wrap(proto);