222
schangxiang@126.com
2025-06-13 6a8393408d8cefcea02b7a598967de8dc1e565c2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
'use strict';
 
const debug = require('debug')('egg:util:messenger:local');
const is = require('is-type-of');
const EventEmitter = require('events');
 
/**
 * Communication between app worker and agent worker with EventEmitter
 */
class Messenger extends EventEmitter {
 
  constructor(egg) {
    super();
    this.egg = egg;
  }
 
  /**
   * Send message to all agent and app
   * @param {String} action - message key
   * @param {Object} data - message value
   * @return {Messenger} this
   */
  broadcast(action, data) {
    debug('[%s] broadcast %s with %j', this.pid, action, data);
    this.send(action, data, 'both');
    return this;
  }
 
  /**
   * send message to the specified process
   * Notice: in single process mode, it only can send to self process,
   * and it will send to both agent and app's messengers.
   * @param {String} pid - the process id of the receiver
   * @param {String} action - message key
   * @param {Object} data - message value
   * @return {Messenger} this
   */
  sendTo(pid, action, data) {
    debug('[%s] send %s with %j to %s', this.pid, action, data, pid);
    if (pid !== process.pid) return this;
    this.send(action, data, 'both');
    return this;
  }
 
  /**
   * send message to one worker by random
   * Notice: in single process mode, we only start one agent worker and one app worker
   * - if it's running in agent, it will send to one of app workers
   * - if it's running in app, it will send to agent
   * @param {String} action - message key
   * @param {Object} data - message value
   * @return {Messenger} this
   */
  sendRandom(action, data) {
    debug('[%s] send %s with %j to opposite', this.pid, action, data);
    this.send(action, data, 'opposite');
    return this;
  }
 
  /**
   * send message to app
   * @param {String} action - message key
   * @param {Object} data - message value
   * @return {Messenger} this
   */
  sendToApp(action, data) {
    debug('[%s] send %s with %j to all app', this.pid, action, data);
    this.send(action, data, 'application');
    return this;
  }
 
  /**
   * send message to agent
   * @param {String} action - message key
   * @param {Object} data - message value
   * @return {Messenger} this
   */
  sendToAgent(action, data) {
    debug('[%s] send %s with %j to all agent', this.pid, action, data);
    this.send(action, data, 'agent');
    return this;
  }
 
  /**
   * @param {String} action - message key
   * @param {Object} data - message value
   * @param {String} to - let master know how to send message
   * @return {Messenger} this
   */
  send(action, data, to) {
    // use nextTick to keep it async as IPC messenger
    process.nextTick(() => {
      const { egg } = this;
      let application;
      let agent;
      let opposite;
 
      if (egg.type === 'application') {
        application = egg;
        agent = egg.agent;
        opposite = agent;
      } else {
        agent = egg;
        application = egg.application;
        opposite = application;
      }
      if (!to) to = egg.type === 'application' ? 'agent' : 'application';
 
      if (application && application.messenger && (to === 'application' || to === 'both')) {
        application.messenger._onMessage({ action, data });
      }
      if (agent && agent.messenger && (to === 'agent' || to === 'both')) {
        agent.messenger._onMessage({ action, data });
      }
      if (opposite && opposite.messenger && to === 'opposite') {
        opposite.messenger._onMessage({ action, data });
      }
    });
 
    return this;
  }
 
  _onMessage(message) {
    if (message && is.string(message.action)) {
      debug('[%s] got message %s with %j', this.pid, message.action, message.data);
      this.emit(message.action, message.data);
    }
  }
 
  close() {
    this.removeAllListeners();
  }
 
  /**
   * @method Messenger#on
   * @param {String} action - message key
   * @param {Object} data - message value
   */
}
 
module.exports = Messenger;