schangxiang@126.com
2025-09-19 9be9c3784b2881a3fa25e93ae2033dc2803c0ed0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
"use strict";
 
const BufferList = require('bl');
 
const _require = require('readable-stream'),
      Duplex = _require.Duplex;
 
const _require2 = require('./packet'),
      Packet = _require2.Packet,
      HEADER_LENGTH = _require2.HEADER_LENGTH;
 
class OutgoingMessageStream extends Duplex {
  constructor(debug, {
    packetSize
  }) {
    super({
      writableObjectMode: true
    });
    this.packetSize = packetSize;
    this.debug = debug;
    this.bl = new BufferList(); // When the writable side is ended, push `null`
    // to also end the readable side.
 
    this.on('finish', () => {
      this.push(null);
    });
  }
 
  _write(message, encoding, callback) {
    const length = this.packetSize - HEADER_LENGTH;
    let packetNumber = 0;
    this.currentMessage = message;
    this.currentMessage.on('data', data => {
      this.bl.append(data);
 
      while (this.bl.length > length) {
        const data = this.bl.slice(0, length);
        this.bl.consume(length); // TODO: Get rid of creating `Packet` instances here.
 
        const packet = new Packet(message.type);
        packet.packetId(packetNumber += 1);
        packet.resetConnection(message.resetConnection);
        packet.addData(data);
        this.debug.packet('Sent', packet);
        this.debug.data(packet);
 
        if (this.push(packet.buffer) === false) {
          this.currentMessage.pause();
        }
      }
    });
    this.currentMessage.on('end', () => {
      const data = this.bl.slice();
      this.bl.consume(data.length); // TODO: Get rid of creating `Packet` instances here.
 
      const packet = new Packet(message.type);
      packet.packetId(packetNumber += 1);
      packet.resetConnection(message.resetConnection);
      packet.last(true);
      packet.addData(data);
      this.debug.packet('Sent', packet);
      this.debug.data(packet);
      this.push(packet.buffer);
      this.currentMessage = undefined;
      callback();
    });
  }
 
  _read(size) {
    // If we do have a message, resume it and get data flowing.
    // Otherwise, there is nothing to do.
    if (this.currentMessage) {
      this.currentMessage.resume();
    }
  }
 
}
 
module.exports = OutgoingMessageStream;