| 'use strict'; | 
|   | 
| const co = require('co'); | 
| const Base = require('./base'); | 
| const is = require('is-type-of'); | 
| const utils = require('../utils'); | 
| const SdkBase = require('sdk-base'); | 
| const random = require('utility').random; | 
|   | 
| // Symbol | 
| const { | 
|   logger, | 
|   createClient, | 
|   singleMode, | 
| } = require('../symbol'); | 
| const _instances = new Map(); | 
|   | 
| class InnerClient extends SdkBase { | 
|   constructor(options = {}) { | 
|     super(options); | 
|   | 
|     this._subData = new Map(); // <key, data> | 
|     this._subSet = new Set(); | 
|     this._subListeners = new Map(); // <key, Array<Function>> | 
|     this._transcode = options.transcode; | 
|     this._realClient = options.createRealClient(); | 
|     this._closeMethodName = utils.findMethodName(options.descriptors, 'close'); | 
|     this._subscribeMethodName = utils.findMethodName(options.descriptors, 'subscribe'); | 
|     this._publishMethodName = utils.findMethodName(options.descriptors, 'publish'); | 
|     this._isReady = false; | 
|     this._closeByUser = false; | 
|     this._refCount = 1; | 
|   | 
|     // event delegate | 
|     utils.delegateEvents(this._realClient, this); | 
|   | 
|     if (is.function(this._realClient.ready)) { | 
|       this._realClient.ready(err => { | 
|         if (err) { | 
|           this.ready(err); | 
|         } else { | 
|           this._isReady = true; | 
|           this.ready(true); | 
|         } | 
|       }); | 
|     } else { | 
|       this._isReady = true; | 
|       this.ready(true); | 
|     } | 
|   } | 
|   | 
|   ref() { | 
|     this._refCount++; | 
|   } | 
|   | 
|   get isLeader() { | 
|     return true; | 
|   } | 
|   | 
|   formatKey(reg) { | 
|     return '$$inner$$__' + this.options.formatKey(reg); | 
|   } | 
|   | 
|   subscribe(reg, listener) { | 
|     const key = this.formatKey(reg); | 
|     const transcode = this._transcode; | 
|     const isBroadcast = this.options.isBroadcast; | 
|   | 
|     const listeners = this._subListeners.get(key) || []; | 
|     listeners.push(listener); | 
|     this._subListeners.set(key, listeners); | 
|   | 
|     if (!this._subSet.has(key)) { | 
|       this._subSet.add(key); | 
|       this._realClient[this._subscribeMethodName](reg, result => { | 
|         const data = transcode.encode(result); | 
|         this._subData.set(key, data); | 
|   | 
|         let fns = this._subListeners.get(key); | 
|         if (!fns) { | 
|           return; | 
|         } | 
|   | 
|         const len = fns.length; | 
|         // if isBroadcast equal to false, random pick one to notify | 
|         if (!isBroadcast) { | 
|           fns = [ fns[random(len)] ]; | 
|         } | 
|   | 
|         for (const fn of fns) { | 
|           fn(transcode.decode(data)); | 
|         } | 
|       }); | 
|     } else if (this._subData.has(key) && isBroadcast) { | 
|       process.nextTick(() => { | 
|         const data = this._subData.get(key); | 
|         listener(transcode.decode(data)); | 
|       }); | 
|     } | 
|   } | 
|   | 
|   unSubscribe(reg, listener) { | 
|     const key = this.formatKey(reg); | 
|   | 
|     if (!listener) { | 
|       this._subListeners.delete(key); | 
|     } else { | 
|       const listeners = this._subListeners.get(key) || []; | 
|       const newListeners = []; | 
|   | 
|       for (const fn of listeners) { | 
|         if (fn === listener) { | 
|           continue; | 
|         } | 
|         newListeners.push(fn); | 
|       } | 
|       this._subListeners.set(key, newListeners); | 
|     } | 
|   } | 
|   | 
|   publish(reg) { | 
|     this._realClient[this._publishMethodName](reg); | 
|   } | 
|   | 
|   invoke(methodName, args, callback) { | 
|     let method = this._realClient[methodName]; | 
|     // compatible with generatorFunction | 
|     if (is.generatorFunction(method)) { | 
|       method = co.wrap(method); | 
|     } | 
|     args.push(callback); | 
|     const ret = method.apply(this._realClient, args); | 
|     if (callback && is.promise(ret)) { | 
|       ret.then(result => callback(null, result), err => callback(err)) | 
|         // to avoid uncaught exception in callback function, then cause unhandledRejection | 
|         .catch(err => { this._errorHandler(err); }); | 
|     } | 
|   } | 
|   | 
|   // emit error asynchronously | 
|   _errorHandler(err) { | 
|     setImmediate(() => { | 
|       if (!this._closeByUser) { | 
|         this.emit('error', err); | 
|       } | 
|     }); | 
|   } | 
|   | 
|   async close() { | 
|     if (this._refCount > 0) { | 
|       this._refCount--; | 
|     } | 
|     if (this._refCount > 0) return; | 
|   | 
|     this._closeByUser = true; | 
|   | 
|     if (this._realClient) { | 
|       if (this._closeMethodName) { | 
|         // support common function, generatorFunction, and function returning a promise | 
|         await utils.callFn(this._realClient[this._closeMethodName].bind(this._realClient)); | 
|       } | 
|     } | 
|     this.emit('close'); | 
|   } | 
| } | 
|   | 
|   | 
| class SingleClient extends Base { | 
|   get [singleMode]() { | 
|     return true; | 
|   } | 
|   | 
|   async [createClient]() { | 
|     const options = this.options; | 
|     let client; | 
|     if (_instances.has(options.name)) { | 
|       client = _instances.get(options.name); | 
|       client.ref(); | 
|       return client; | 
|     } | 
|     client = new InnerClient(options); | 
|     client.once('close', () => { | 
|       _instances.delete(options.name); | 
|       this[logger].info('[cluster#SingleClient] %s is closed.', options.name); | 
|     }); | 
|     _instances.set(options.name, client); | 
|     return client; | 
|   } | 
| } | 
|   | 
| module.exports = SingleClient; |