'use strict';
|
|
const net = require('net');
|
const is = require('is-type-of');
|
const assert = require('assert');
|
const Base = require('sdk-base');
|
|
const addressKey = Symbol('address');
|
const defaultOptions = {
|
noDelay: true,
|
connectTimeout: 3000,
|
responseTimeout: 3000,
|
heartbeatInterval: 5000,
|
needHeartbeat: true,
|
concurrent: 0,
|
logger: console,
|
};
|
const noop = () => {};
|
let seed = 0;
|
|
class TCPBase extends Base {
|
/**
|
* A base class for tcp client with basic functions
|
*
|
* @param {Object} options
|
* - {String} host - server host
|
* - {Number} port - server port
|
* - {Number} headerLength - length of the packet header, this field is optional,
|
* but if you not provider, you must override getHeader method
|
* - {Boolean} [noDelay] - whether use the Nagle algorithm or not,defaults to true
|
* - {Number} [concurrent] - the number of concurrent packet, defaults to zero, means no limit
|
* - {Number} [responseTimeout] - limit the maximum time for waiting a response
|
* - {Logger} [logger] - the logger client
|
* @constructor
|
*/
|
constructor(options) {
|
super();
|
|
this.options = Object.assign({}, defaultOptions, options);
|
if (!this.options.path) {
|
assert(this.options.host, 'options.host is required');
|
assert(this.options.port, 'options.port is required');
|
}
|
|
if (this.options.needHeartbeat) {
|
assert(this.heartBeatPacket, 'heartBeatPacket getter must be implemented if needHeartbeat');
|
}
|
|
this.clientId = ++seed;
|
this._heartbeatTimer = null;
|
this._socket = null;
|
this._header = null;
|
this._bodyLength = null;
|
this._lastError = null;
|
this._queue = [];
|
this._invokes = new Map();
|
this[addressKey] = this.options.host + ':' + this.options.port;
|
this._lastHeartbeatTime = 0;
|
this._lastReceiveDataTime = 0;
|
|
this._connect();
|
this.ready(err => {
|
if (!err && this.options.needHeartbeat) {
|
this._startHeartbeat();
|
}
|
});
|
}
|
|
/**
|
* get packet header
|
*
|
* @return {Buffer} header
|
*/
|
getHeader() {
|
return this.read(this.options.headerLength);
|
}
|
|
/* eslint-disable valid-jsdoc, no-unused-vars */
|
|
/**
|
* get body length from header
|
*
|
* @param {Buffer} header - header data
|
* @return {Number} bodyLength
|
*/
|
getBodyLength(header) {
|
throw new Error('not implement');
|
}
|
|
/**
|
* return a heartbeat packet
|
*
|
* @property {Buffer} TCPBase#heartBeatPacket
|
*/
|
get heartBeatPacket() {
|
throw new Error('not implement');
|
}
|
|
/**
|
* send heartbeat packet
|
*
|
* @return {void}
|
*/
|
sendHeartBeat() {
|
this._socket.write(this.heartBeatPacket);
|
}
|
|
/**
|
* deserialze method, leave it to implement by subclass
|
*
|
* @param {Buffer} buf - binary data
|
* @return {Object} packet object
|
*/
|
decode(buf) {
|
throw new Error('not implement');
|
}
|
|
/* eslint-enable valid-jsdoc, no-unused-vars */
|
|
/**
|
* if the connection is writable, also including flow control logic
|
*
|
* @property {Boolean} TCPBase#_writable
|
*/
|
get _writable() {
|
if (this.options.concurrent && this._invokes.size >= this.options.concurrent) {
|
return false;
|
}
|
|
return this.isOK;
|
}
|
|
/**
|
* if the connection is healthy or not
|
*
|
* @property {Boolean} TCPBase#isOK
|
*/
|
get isOK() {
|
return this._socket && this._socket.writable;
|
}
|
|
/**
|
* remote address
|
*
|
* @property {String} TCPBase#address
|
*/
|
get address() {
|
return this[addressKey];
|
}
|
|
/**
|
* logger
|
*
|
* @property {Logger} TCPBase#logger
|
*/
|
get logger() {
|
return this.options.logger;
|
}
|
|
/**
|
* Pulls some data out of the socket buffer and returns it.
|
* If no data available to be read, null is returned
|
*
|
* @param {Number} n - to specify how much data to read
|
* @return {Buffer} - data
|
*/
|
read(n) {
|
return this._socket.read(n);
|
}
|
|
/**
|
* send packet to server
|
*
|
* @param {Object} packet
|
* - {Number} id - packet id
|
* - {Buffer} data - binary data
|
* - {Boolean} [oneway] - oneway or not
|
* - {Number} [timeout] - the maximum time for waiting a response
|
* @param {Function} [callback] - Call this function,when processing is complete, optional.
|
* @return {void}
|
*/
|
send(packet, callback = noop) {
|
if (!this._socket) {
|
const err = new Error(`[TCPBase] The socket was closed. (address: ${this[addressKey]})`);
|
err.id = packet.id;
|
err.data = packet.data.toString('base64');
|
if (packet.oneway) {
|
err.oneway = true;
|
callback();
|
this.emit('error', err);
|
} else {
|
callback(err);
|
}
|
return;
|
}
|
if (packet.oneway) {
|
this._socket.write(packet.data);
|
callback();
|
return;
|
}
|
if (!this._writable) {
|
this._queue.push([ packet, callback ]);
|
return;
|
}
|
const meta = {
|
id: packet.id,
|
dataLength: packet.data.length,
|
bufferSize1: this._socket.bufferSize,
|
bufferSize2: -1,
|
startTime: Date.now(),
|
endTime: -1,
|
};
|
let endTime;
|
meta.writeSuccess = this._socket.write(packet.data, () => {
|
endTime = Date.now();
|
});
|
const timeout = packet.timeout || this.options.responseTimeout;
|
this._invokes.set(packet.id, {
|
meta,
|
packet,
|
timer: setTimeout(() => {
|
meta.bufferSize2 = this._socket.bufferSize;
|
meta.endTime = endTime;
|
this._finishInvoke(packet.id);
|
const err = new Error(`Server no response in ${timeout}ms, address#${this[addressKey]}`);
|
err.socketMeta = meta;
|
err.name = 'ResponseTimeoutError';
|
callback(err);
|
}, timeout),
|
callback,
|
});
|
}
|
|
/**
|
* thunk style api of send(packet, callback)
|
*
|
* @param {Object} packet
|
* - {Number} id - packet id
|
* - {Buffer} data - binary data
|
* - {Boolean} [oneway] - oneway or not
|
* - {Number} [timeout] - the maximum time for waiting a response
|
* @return {Function} thunk function
|
*/
|
sendThunk(packet) {
|
return callback => this.send(packet, callback);
|
}
|
|
_finishInvoke(id) {
|
this._invokes.delete(id);
|
if (this._writable) {
|
this._resume();
|
}
|
}
|
|
_errorCallback(callback, err) {
|
if (!err) {
|
err = new Error(`The socket was closed. (address: ${this[addressKey]})`);
|
err.name = 'SocketCloseError';
|
}
|
callback && callback(err);
|
}
|
|
// mark all invokes timeout
|
_cleanInvokes(err) {
|
for (const id of this._invokes.keys()) {
|
const req = this._invokes.get(id);
|
clearTimeout(req.timer);
|
this._errorCallback(req.callback, err);
|
}
|
this._invokes.clear();
|
}
|
|
// clean up the queue
|
_cleanQueue(err) {
|
let args = this._queue.pop();
|
while (args) {
|
// args[0] 是packet, args[1]是callback
|
this._errorCallback(args[1], err);
|
args = this._queue.pop();
|
}
|
}
|
|
_resume() {
|
const args = this._queue.shift();
|
if (args) {
|
this.send(args[0], args[1]);
|
}
|
}
|
|
// read data from socket,and decode it to packet object
|
_readPacket() {
|
if (is.nullOrUndefined(this._bodyLength)) {
|
this._header = this.getHeader();
|
if (!this._header) {
|
return false;
|
}
|
this._bodyLength = this.getBodyLength(this._header);
|
}
|
|
let body;
|
if (this._bodyLength > 0) {
|
body = this.read(this._bodyLength);
|
if (!body) {
|
return false;
|
}
|
}
|
this._bodyLength = null;
|
const entity = this.decode(body, this._header);
|
// the schema of entity
|
// {
|
// id: 'request id',
|
// isResponse: true,
|
// data: {} // deserialized object
|
// }
|
let type = 'request';
|
if (!entity.hasOwnProperty('isResponse')) {
|
entity.isResponse = this._invokes.has(entity.id);
|
}
|
if (entity.isResponse) {
|
type = 'response';
|
const invoke = this._invokes.get(entity.id);
|
if (invoke) {
|
this._finishInvoke(entity.id);
|
clearTimeout(invoke.timer);
|
process.nextTick(() => {
|
invoke.callback(entity.error, entity.data);
|
});
|
}
|
}
|
if (entity.data) {
|
process.nextTick(() => {
|
this.emit(type, entity, this[addressKey]);
|
});
|
}
|
return true;
|
}
|
|
/**
|
* close the socket
|
*
|
* @param {Error} err - the error which makes socket closed
|
* @return {void}
|
*/
|
close(err) {
|
if (!this._socket) {
|
return Promise.resolve();
|
}
|
this._socket.destroy(err);
|
return this.await('close');
|
}
|
|
_handleClose() {
|
if (!this._socket) {
|
return;
|
}
|
this._socket.removeAllListeners();
|
this._socket = null;
|
|
this._cleanInvokes(this._lastError);
|
// clean timer
|
if (this._heartbeatTimer) {
|
clearInterval(this._heartbeatTimer);
|
this._heartbeatTimer = null;
|
}
|
this._cleanQueue(this._lastError);
|
this.emit('close');
|
}
|
|
_handleReadable() {
|
this._lastReceiveDataTime = Date.now();
|
try {
|
let remaining = false;
|
do {
|
remaining = this._readPacket();
|
} while (remaining);
|
} catch (err) {
|
this.close(err);
|
}
|
}
|
|
_connect(done) {
|
if (!done) {
|
done = err => {
|
this.ready(err ? err : true);
|
};
|
}
|
const { port, host, localAddress, localPort, family, hints, lookup, path } = this.options;
|
const socket = this._socket = net.connect({
|
port, host, localAddress, localPort, family, hints, lookup, path,
|
});
|
socket.setNoDelay(this.options.noDelay);
|
socket.on('readable', () => { this._handleReadable(); });
|
socket.once('close', () => { this._handleClose(); });
|
socket.once('error', err => {
|
err.message += ' (address: ' + this[addressKey] + ')';
|
this._lastError = err;
|
if (err.code === 'ECONNRESET') {
|
this.logger.warn('[TCPBase] socket is closed by other side while there were still unhandled data in the socket buffer');
|
} else {
|
this.emit('error', err);
|
}
|
});
|
socket.setTimeout(this.options.connectTimeout, () => {
|
const err = new Error(`[TCPBase] socket connect timeout (${this.options.connectTimeout}ms)`);
|
err.name = 'TcpConnectionTimeoutError';
|
err.host = this.options.host;
|
err.port = this.options.port;
|
this.close(err);
|
});
|
|
socket.once('connect', () => {
|
// set timeout back to zero after connected
|
socket.setTimeout(0);
|
this.emit('connect');
|
});
|
|
Promise.race([
|
this.await('connect'),
|
this.await('error'),
|
]).then(done, done);
|
}
|
|
_startHeartbeat() {
|
this._heartbeatTimer = setInterval(() => {
|
const duration = this._lastHeartbeatTime - this._lastReceiveDataTime;
|
if (this._lastReceiveDataTime && duration > this.options.heartbeatInterval) {
|
const err = new Error(`server ${this[addressKey]} no response in ${duration}ms, maybe the socket is end on the other side.`);
|
err.name = 'ServerNoResponseError';
|
this.close(err);
|
return;
|
}
|
// flow control
|
if (this._invokes.size > 0 || !this.isOK) {
|
return;
|
}
|
this._lastHeartbeatTime = Date.now();
|
this.sendHeartBeat();
|
}, this.options.heartbeatInterval);
|
}
|
}
|
|
module.exports = TCPBase;
|