schangxiang@126.com
2025-09-09 3d8966ba2c81e7e0365c8b123e861d18ee4f94f5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"use strict";
 
/* globals $Values */
const tls = require('tls');
 
const DuplexPair = require('native-duplexpair');
 
const _require = require('events'),
      EventEmitter = _require.EventEmitter;
 
const _require2 = require('./packet'),
      TYPE = _require2.TYPE;
 
const Message = require('./message');
 
const IncomingMessageStream = require('./incoming-message-stream');
 
const OutgoingMessageStream = require('./outgoing-message-stream');
 
module.exports = class MessageIO extends EventEmitter {
  constructor(socket, packetSize, debug) {
    super();
    this.socket = socket;
    this.debug = debug;
    this.tlsNegotiationComplete = false;
    this.incomingMessageStream = new IncomingMessageStream(this.debug);
    this.incomingMessageStream.on('data', message => {
      message.on('data', chunk => {
        this.emit('data', chunk);
      });
      message.on('end', () => {
        this.emit('message');
      });
    });
    this.outgoingMessageStream = new OutgoingMessageStream(this.debug, {
      packetSize: packetSize
    });
    this.socket.pipe(this.incomingMessageStream);
    this.outgoingMessageStream.pipe(this.socket);
  }
 
  packetSize(...args) {
    if (args.length > 0) {
      const packetSize = args[0];
      this.debug.log('Packet size changed from ' + this.outgoingMessageStream.packetSize + ' to ' + packetSize);
      this.outgoingMessageStream.packetSize = packetSize;
    }
 
    return this.outgoingMessageStream.packetSize;
  }
 
  startTls(secureContext, hostname, trustServerCertificate) {
    const duplexpair = new DuplexPair();
    const securePair = this.securePair = {
      cleartext: tls.connect({
        socket: duplexpair.socket1,
        servername: hostname,
        secureContext: secureContext,
        rejectUnauthorized: !trustServerCertificate
      }),
      encrypted: duplexpair.socket2
    }; // If an error happens in the TLS layer, there is nothing we can do about it.
    // Forward the error to the socket so the connection gets properly cleaned up.
 
    securePair.cleartext.on('error', err => {
      // Streams in node.js versions before 8.0.0 don't support `.destroy`
      if (typeof securePair.encrypted.destroy === 'function') {
        securePair.encrypted.destroy();
      }
 
      this.socket.destroy(err);
    });
    securePair.cleartext.on('secureConnect', () => {
      const cipher = securePair.cleartext.getCipher();
 
      if (cipher) {
        this.debug.log('TLS negotiated (' + cipher.name + ', ' + cipher.version + ')');
      }
 
      this.emit('secure', securePair.cleartext);
      this.encryptAllFutureTraffic();
    });
    securePair.encrypted.on('data', data => {
      this.sendMessage(TYPE.PRELOGIN, data, false);
    });
  }
 
  encryptAllFutureTraffic() {
    this.securePair.encrypted.removeAllListeners('data');
    this.outgoingMessageStream.unpipe(this.socket);
    this.socket.unpipe(this.incomingMessageStream);
    this.socket.pipe(this.securePair.encrypted);
    this.securePair.encrypted.pipe(this.socket);
    this.securePair.cleartext.pipe(this.incomingMessageStream);
    this.outgoingMessageStream.pipe(this.securePair.cleartext);
    this.tlsNegotiationComplete = true;
  }
 
  tlsHandshakeData(data) {
    this.securePair.encrypted.write(data);
  } // TODO listen for 'drain' event when socket.write returns false.
  // TODO implement incomplete request cancelation (2.2.1.6)
 
 
  sendMessage(packetType, data, resetConnection) {
    const message = new Message({
      type: packetType,
      resetConnection: resetConnection
    });
    message.end(data);
    this.outgoingMessageStream.write(message);
  } // Temporarily suspends the flow of incoming packets.
 
 
  pause() {
    this.incomingMessageStream.pause();
  } // Resumes the flow of incoming packets.
 
 
  resume() {
    this.incomingMessageStream.resume();
  }
 
};