'use strict';
|
|
const debug = require('debug')('cluster-client');
|
const is = require('is-type-of');
|
const Base = require('sdk-base');
|
const assert = require('assert');
|
const utils = require('../utils');
|
// Symbols
|
const {
|
init,
|
logger,
|
isReady,
|
innerClient,
|
subscribe,
|
unSubscribe,
|
publish,
|
invoke,
|
subInfo,
|
pubInfo,
|
closeHandler,
|
close,
|
singleMode,
|
createClient,
|
} = require('../symbol');
|
|
class WrapperBase extends Base {
|
/**
|
* Share Connection among Multi-Process Mode
|
*
|
* @param {Object} options
|
* - {Number} port - the port
|
* - {Transcode} transcode - serialze / deseriaze methods
|
* - {Boolean} isLeader - wehether is leader or follower
|
* - {Number} maxWaitTime - leader startup max time (ONLY effective on isLeader is true)
|
* - {Function} createRealClient - to create the real client instance
|
* @constructor
|
*/
|
constructor(options) {
|
super(options);
|
this[subInfo] = new Map();
|
this[pubInfo] = new Map();
|
this[init]().catch(err => { this.ready(err); });
|
}
|
|
get isClusterClientLeader() {
|
return this[innerClient] && this[innerClient].isLeader;
|
}
|
|
get [singleMode]() {
|
return false;
|
}
|
|
/**
|
* log instance
|
* @property {Logger} ClusterClient#[logger]
|
*/
|
get [logger]() {
|
return this.options.logger;
|
}
|
|
async [createClient]() {
|
throw new Error('not implement');
|
}
|
|
/**
|
* initialize, to leader or follower
|
*
|
* @return {void}
|
*/
|
async [init]() {
|
this[innerClient] = await this[createClient]();
|
|
// events delegate
|
utils.delegateEvents(this[innerClient], this);
|
|
// re init when connection is close
|
if (this[closeHandler]) {
|
this[innerClient].on('close', this[closeHandler]);
|
}
|
|
// wait leader/follower ready
|
await this[innerClient].ready();
|
|
// subscribe all
|
for (const registrations of this[subInfo].values()) {
|
for (const args of registrations) {
|
this[innerClient].subscribe(args[0], args[1]);
|
}
|
}
|
// publish all
|
for (const reg of this[pubInfo].values()) {
|
this[innerClient].publish(reg);
|
}
|
|
if (!this[isReady]) {
|
this[isReady] = true;
|
this.ready(true);
|
}
|
}
|
|
/**
|
* do subscribe
|
*
|
* @param {Object} reg - subscription info
|
* @param {Function} listener - callback function
|
* @return {void}
|
*/
|
[subscribe](reg, listener) {
|
assert(is.function(listener), `[ClusterClient:${this.options.name}] subscribe(reg, listener) listener should be a function`);
|
|
debug('[ClusterClient:%s] subscribe %j', this.options.name, reg);
|
const key = this.options.formatKey(reg);
|
const registrations = this[subInfo].get(key) || [];
|
registrations.push([ reg, listener ]);
|
this[subInfo].set(key, registrations);
|
|
if (this[isReady]) {
|
this[innerClient].subscribe(reg, listener);
|
}
|
}
|
|
/**
|
* do unSubscribe
|
*
|
* @param {Object} reg - subscription info
|
* @param {Function} listener - callback function
|
* @return {void}
|
*/
|
[unSubscribe](reg, listener) {
|
debug('[ClusterClient:%s] unSubscribe %j', this.options.name, reg);
|
const key = this.options.formatKey(reg);
|
const registrations = this[subInfo].get(key) || [];
|
const newRegistrations = [];
|
if (listener) {
|
for (const arr of registrations) {
|
if (arr[1] !== listener) {
|
newRegistrations.push(arr);
|
}
|
}
|
}
|
this[subInfo].set(key, newRegistrations);
|
|
if (this[isReady]) {
|
this[innerClient].unSubscribe(reg, listener);
|
}
|
}
|
|
/**
|
* do publish
|
*
|
* @param {Object} reg - publish info
|
* @return {void}
|
*/
|
[publish](reg) {
|
debug('[ClusterClient:%s] publish %j', this.options.name, reg);
|
const key = this.options.formatKey(reg);
|
this[pubInfo].set(key, reg);
|
|
if (this[isReady]) {
|
this[innerClient].publish(reg);
|
}
|
}
|
|
/**
|
* invoke a method asynchronously
|
*
|
* @param {String} method - the method name
|
* @param {Array} args - the arguments list
|
* @param {Function} callback - callback function
|
* @return {void}
|
*/
|
[invoke](method, args, callback) {
|
if (!this[isReady]) {
|
this.ready(err => {
|
if (err) {
|
callback && callback(err);
|
return;
|
}
|
this[innerClient].invoke(method, args, callback);
|
});
|
return;
|
}
|
|
debug('[ClusterClient:%s] invoke method: %s, args: %j', this.options.name, method, args);
|
this[innerClient].invoke(method, args, callback);
|
}
|
|
async [close]() {
|
try {
|
// close after ready, in case of innerClient is initializing
|
await this.ready();
|
} catch (err) {
|
// ignore
|
}
|
|
const client = this[innerClient];
|
if (client) {
|
// prevent re-initializing
|
if (this[closeHandler]) {
|
client.removeListener('close', this[closeHandler]);
|
}
|
if (client.close) {
|
await utils.callFn(client.close.bind(client));
|
}
|
}
|
}
|
}
|
|
module.exports = WrapperBase;
|