'use strict';
|
|
const debug = require('debug')('cluster-client:server');
|
const net = require('net');
|
const Base = require('sdk-base');
|
const sleep = require('mz-modules/sleep');
|
const Packet = require('./protocol/packet');
|
|
// share memory in current process
|
let serverMap;
|
if (global.serverMap) {
|
serverMap = global.serverMap;
|
} else {
|
global.serverMap = serverMap = new Map();
|
}
|
let typeSet;
|
if (global.typeSet) {
|
typeSet = global.typeSet;
|
} else {
|
global.typeSet = typeSet = new Set();
|
}
|
|
function claimServer(port) {
|
return new Promise((resolve, reject) => {
|
const server = net.createServer();
|
server.listen({
|
port,
|
host: '127.0.0.1',
|
// When exclusive is true, the handle is not shared, and attempted port sharing results in an error.
|
exclusive: true,
|
});
|
|
function onError(err) {
|
debug('listen %s error: %s', port, err);
|
reject(err);
|
}
|
|
server.on('error', onError);
|
server.on('listening', () => {
|
server.removeListener('error', onError);
|
debug('listen %s success', port);
|
resolve(server);
|
});
|
});
|
}
|
|
function tryToConnect(port) {
|
return new Promise(resolve => {
|
const socket = net.connect(port, '127.0.0.1');
|
debug('try to connecting %s', port);
|
let success = false;
|
socket.on('connect', () => {
|
success = true;
|
resolve(true);
|
// disconnect
|
socket.end();
|
debug('test connected %s success, end now', port);
|
});
|
socket.on('error', err => {
|
debug('test connect %s error: %s, success: %s', port, err, success);
|
// if success before, ignore it
|
if (success) return;
|
resolve(false);
|
});
|
});
|
}
|
|
class ClusterServer extends Base {
|
/**
|
* Manage all TCP Connections,assign them to proper channel
|
*
|
* @constructor
|
* @param {Object} options
|
* - {net.Server} server - the server
|
* - {Number} port - the port
|
*/
|
constructor(options) {
|
super();
|
|
this._sockets = new Map();
|
this._server = options.server;
|
this._port = options.port;
|
this._isClosed = false;
|
this._server.on('connection', socket => this._handleSocket(socket));
|
this._server.once('close', () => {
|
this._isClosed = true;
|
serverMap.delete(this._port);
|
this.emit('close');
|
});
|
this._server.once('error', err => {
|
this.emit('error', err);
|
});
|
}
|
|
get isClosed() {
|
return this._isClosed;
|
}
|
|
close() {
|
return new Promise((resolve, reject) => {
|
if (this.isClosed) return resolve();
|
|
this._server.close(err => {
|
if (err) return reject(err);
|
resolve();
|
});
|
|
// sockets must be closed manually, otherwise server.close callback will never be called
|
for (const socket of this._sockets.values()) {
|
socket.destroy();
|
}
|
});
|
}
|
|
_handleSocket(socket) {
|
let header;
|
let bodyLength;
|
let body;
|
const server = this;
|
const key = socket.remotePort;
|
this._sockets.set(key, socket);
|
|
function onReadable() {
|
if (!header) {
|
header = socket.read(24);
|
if (!header) {
|
return;
|
}
|
}
|
if (!bodyLength) {
|
bodyLength = header.readInt32BE(16) + header.readInt32BE(20);
|
}
|
body = socket.read(bodyLength);
|
if (!body) {
|
return;
|
}
|
// first packet to register to channel
|
const packet = Packet.decode(Buffer.concat([ header, body ], 24 + bodyLength));
|
header = null;
|
bodyLength = null;
|
body = null;
|
if (packet.connObj && packet.connObj.type === 'register_channel') {
|
const channelName = packet.connObj.channelName;
|
const eventKey = `${channelName}_connection`;
|
|
// that means leader already there
|
if (server.listenerCount(eventKey)) {
|
socket.removeListener('readable', onReadable);
|
// assign to proper channel
|
debug('new %s_connection %s connected', channelName, socket.remotePort);
|
server.emit(`${channelName}_connection`, socket, packet);
|
}
|
}
|
}
|
|
socket.on('readable', onReadable);
|
socket.once('close', () => {
|
debug('socket %s close', key);
|
this._sockets.delete(key);
|
});
|
debug('new socket %s from follower', socket.remotePort);
|
}
|
|
/**
|
* Occupy the port
|
*
|
* @param {String} name - the client name
|
* @param {Number} port - the port
|
* @return {ClusterServer} server
|
*/
|
static async create(name, port) {
|
const key = `${name}@${port}`;
|
let instance = serverMap.get(port);
|
if (instance && !instance.isClosed) {
|
if (typeSet.has(key)) {
|
return null;
|
}
|
typeSet.add(key);
|
return instance;
|
}
|
// compete for the local port, if got => leader, otherwise follower
|
try {
|
const server = await claimServer(port);
|
instance = new ClusterServer({ server, port });
|
typeSet.add(key);
|
serverMap.set(port, instance);
|
return instance;
|
} catch (err) {
|
// if exception, that mean compete for port failed, then double check
|
instance = serverMap.get(port);
|
if (instance && !instance.isClosed) {
|
if (typeSet.has(key)) {
|
return null;
|
}
|
typeSet.add(key);
|
return instance;
|
}
|
return null;
|
}
|
}
|
|
static async close(name, server) {
|
const port = server._port;
|
|
// remove from typeSet, so other client can occupy
|
typeSet.delete(`${name}@${port}`);
|
|
let listening = false;
|
for (const key of typeSet.values()) {
|
if (key.endsWith(`@${port}`)) {
|
listening = true;
|
break;
|
}
|
}
|
|
// close server if no one is listening on this port any more
|
if (!listening) {
|
const server = serverMap.get(port);
|
if (server) await server.close();
|
}
|
}
|
|
/**
|
* Wait for Leader Startup
|
*
|
* @param {Number} port - the port
|
* @param {Number} timeout - the max wait time
|
* @return {void}
|
*/
|
static async waitFor(port, timeout) {
|
const start = Date.now();
|
let connect = false;
|
while (!connect) {
|
connect = await tryToConnect(port);
|
|
// if timeout, throw error
|
if (Date.now() - start > timeout) {
|
throw new Error(`[ClusterClient] leader does not be active in ${timeout}ms on port:${port}`);
|
}
|
if (!connect) {
|
await sleep(3000);
|
}
|
}
|
}
|
}
|
|
module.exports = ClusterServer;
|