'use strict';
|
|
const debug = require('debug')('egg:util:messenger:local');
|
const is = require('is-type-of');
|
const EventEmitter = require('events');
|
|
/**
|
* Communication between app worker and agent worker with EventEmitter
|
*/
|
class Messenger extends EventEmitter {
|
|
constructor(egg) {
|
super();
|
this.egg = egg;
|
}
|
|
/**
|
* Send message to all agent and app
|
* @param {String} action - message key
|
* @param {Object} data - message value
|
* @return {Messenger} this
|
*/
|
broadcast(action, data) {
|
debug('[%s] broadcast %s with %j', this.pid, action, data);
|
this.send(action, data, 'both');
|
return this;
|
}
|
|
/**
|
* send message to the specified process
|
* Notice: in single process mode, it only can send to self process,
|
* and it will send to both agent and app's messengers.
|
* @param {String} pid - the process id of the receiver
|
* @param {String} action - message key
|
* @param {Object} data - message value
|
* @return {Messenger} this
|
*/
|
sendTo(pid, action, data) {
|
debug('[%s] send %s with %j to %s', this.pid, action, data, pid);
|
if (pid !== process.pid) return this;
|
this.send(action, data, 'both');
|
return this;
|
}
|
|
/**
|
* send message to one worker by random
|
* Notice: in single process mode, we only start one agent worker and one app worker
|
* - if it's running in agent, it will send to one of app workers
|
* - if it's running in app, it will send to agent
|
* @param {String} action - message key
|
* @param {Object} data - message value
|
* @return {Messenger} this
|
*/
|
sendRandom(action, data) {
|
debug('[%s] send %s with %j to opposite', this.pid, action, data);
|
this.send(action, data, 'opposite');
|
return this;
|
}
|
|
/**
|
* send message to app
|
* @param {String} action - message key
|
* @param {Object} data - message value
|
* @return {Messenger} this
|
*/
|
sendToApp(action, data) {
|
debug('[%s] send %s with %j to all app', this.pid, action, data);
|
this.send(action, data, 'application');
|
return this;
|
}
|
|
/**
|
* send message to agent
|
* @param {String} action - message key
|
* @param {Object} data - message value
|
* @return {Messenger} this
|
*/
|
sendToAgent(action, data) {
|
debug('[%s] send %s with %j to all agent', this.pid, action, data);
|
this.send(action, data, 'agent');
|
return this;
|
}
|
|
/**
|
* @param {String} action - message key
|
* @param {Object} data - message value
|
* @param {String} to - let master know how to send message
|
* @return {Messenger} this
|
*/
|
send(action, data, to) {
|
// use nextTick to keep it async as IPC messenger
|
process.nextTick(() => {
|
const { egg } = this;
|
let application;
|
let agent;
|
let opposite;
|
|
if (egg.type === 'application') {
|
application = egg;
|
agent = egg.agent;
|
opposite = agent;
|
} else {
|
agent = egg;
|
application = egg.application;
|
opposite = application;
|
}
|
if (!to) to = egg.type === 'application' ? 'agent' : 'application';
|
|
if (application && application.messenger && (to === 'application' || to === 'both')) {
|
application.messenger._onMessage({ action, data });
|
}
|
if (agent && agent.messenger && (to === 'agent' || to === 'both')) {
|
agent.messenger._onMessage({ action, data });
|
}
|
if (opposite && opposite.messenger && to === 'opposite') {
|
opposite.messenger._onMessage({ action, data });
|
}
|
});
|
|
return this;
|
}
|
|
_onMessage(message) {
|
if (message && is.string(message.action)) {
|
debug('[%s] got message %s with %j', this.pid, message.action, message.data);
|
this.emit(message.action, message.data);
|
}
|
}
|
|
close() {
|
this.removeAllListeners();
|
}
|
|
/**
|
* @method Messenger#on
|
* @param {String} action - message key
|
* @param {Object} data - message value
|
*/
|
}
|
|
module.exports = Messenger;
|