333
schangxiang@126.com
2025-09-19 18966e02fb573c7e2bb0c6426ed792b38b910940
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const events_1 = require("events");
const utils_1 = require("../utils");
const util_1 = require("./util");
const redis_1 = require("../redis");
const debug = utils_1.Debug("cluster:connectionPool");
class ConnectionPool extends events_1.EventEmitter {
    constructor(redisOptions) {
        super();
        this.redisOptions = redisOptions;
        // master + slave = all
        this.nodes = {
            all: {},
            master: {},
            slave: {}
        };
        this.specifiedOptions = {};
    }
    getNodes(role = "all") {
        const nodes = this.nodes[role];
        return Object.keys(nodes).map(key => nodes[key]);
    }
    getInstanceByKey(key) {
        return this.nodes.all[key];
    }
    getSampleInstance(role) {
        const keys = Object.keys(this.nodes[role]);
        const sampleKey = utils_1.sample(keys);
        return this.nodes[role][sampleKey];
    }
    /**
     * Find or create a connection to the node
     *
     * @param {IRedisOptions} node
     * @param {boolean} [readOnly=false]
     * @returns {*}
     * @memberof ConnectionPool
     */
    findOrCreate(node, readOnly = false) {
        const key = util_1.getNodeKey(node);
        readOnly = Boolean(readOnly);
        if (this.specifiedOptions[key]) {
            Object.assign(node, this.specifiedOptions[key]);
        }
        else {
            this.specifiedOptions[key] = node;
        }
        let redis;
        if (this.nodes.all[key]) {
            redis = this.nodes.all[key];
            if (redis.options.readOnly !== readOnly) {
                redis.options.readOnly = readOnly;
                debug("Change role of %s to %s", key, readOnly ? "slave" : "master");
                redis[readOnly ? "readonly" : "readwrite"]().catch(utils_1.noop);
                if (readOnly) {
                    delete this.nodes.master[key];
                    this.nodes.slave[key] = redis;
                }
                else {
                    delete this.nodes.slave[key];
                    this.nodes.master[key] = redis;
                }
            }
        }
        else {
            debug("Connecting to %s as %s", key, readOnly ? "slave" : "master");
            redis = new redis_1.default(utils_1.defaults({
                // Never try to reconnect when a node is lose,
                // instead, waiting for a `MOVED` error and
                // fetch the slots again.
                retryStrategy: null,
                // Offline queue should be enabled so that
                // we don't need to wait for the `ready` event
                // before sending commands to the node.
                enableOfflineQueue: true,
                readOnly: readOnly
            }, node, this.redisOptions, { lazyConnect: true }));
            this.nodes.all[key] = redis;
            this.nodes[readOnly ? "slave" : "master"][key] = redis;
            redis.once("end", () => {
                this.removeNode(key);
                this.emit("-node", redis, key);
                if (!Object.keys(this.nodes.all).length) {
                    this.emit("drain");
                }
            });
            this.emit("+node", redis, key);
            redis.on("error", function (error) {
                this.emit("nodeError", error, key);
            });
        }
        return redis;
    }
    /**
     * Remove a node from the pool.
     */
    removeNode(key) {
        const { nodes } = this;
        if (nodes.all[key]) {
            debug("Remove %s from the pool", key);
            delete nodes.all[key];
        }
        delete nodes.master[key];
        delete nodes.slave[key];
    }
    /**
     * Reset the pool with a set of nodes.
     * The old node will be removed.
     *
     * @param {(Array<string | number | object>)} nodes
     * @memberof ConnectionPool
     */
    reset(nodes) {
        debug("Reset with %O", nodes);
        const newNodes = {};
        nodes.forEach(node => {
            const key = util_1.getNodeKey(node);
            // Don't override the existing (master) node
            // when the current one is slave.
            if (!(node.readOnly && newNodes[key])) {
                newNodes[key] = node;
            }
        });
        Object.keys(this.nodes.all).forEach(key => {
            if (!newNodes[key]) {
                debug("Disconnect %s because the node does not hold any slot", key);
                this.nodes.all[key].disconnect();
                this.removeNode(key);
            }
        });
        Object.keys(newNodes).forEach(key => {
            const node = newNodes[key];
            this.findOrCreate(node, node.readOnly);
        });
    }
}
exports.default = ConnectionPool;