333
schangxiang@126.com
2025-09-19 18966e02fb573c7e2bb0c6426ed792b38b910940
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"use strict";
 
const BufferList = require('bl');
 
const _require = require('readable-stream'),
      Transform = _require.Transform;
 
const Message = require('./message');
 
const _require2 = require('./packet'),
      Packet = _require2.Packet,
      HEADER_LENGTH = _require2.HEADER_LENGTH;
 
/**
  IncomingMessageStream
  Transform received TDS data into individual IncomingMessage streams.
*/
class IncomingMessageStream extends Transform {
  constructor(debug) {
    super({
      readableObjectMode: true
    });
    this.debug = debug;
    this.currentMessage = undefined;
    this.bl = new BufferList();
  }
 
  processBufferedData(callback) {
    // The packet header is always 8 bytes of length.
    while (this.bl.length >= HEADER_LENGTH) {
      // Get the full packet length
      const length = this.bl.readUInt16BE(2);
 
      if (this.bl.length >= length) {
        const data = this.bl.slice(0, length);
        this.bl.consume(length); // TODO: Get rid of creating `Packet` instances here.
 
        const packet = new Packet(data);
        this.debug.packet('Received', packet);
        this.debug.data(packet);
        let message = this.currentMessage;
 
        if (message === undefined) {
          message = new Message({
            type: packet.type(),
            resetConnection: false
          });
          this.push(message);
        }
 
        if (packet.isLast()) {
          this.currentMessage = undefined; // Wait until the current message was fully processed before we
          // continue processing any remaining messages.
 
          message.once('end', () => {
            this.processBufferedData(callback);
          });
          message.end(packet.data());
          return;
        } else {
          this.currentMessage = message; // If too much data is buffering up in the
          // current message, wait for it to drain.
 
          if (!message.write(packet.data())) {
            message.once('drain', () => {
              this.processBufferedData(callback);
            });
            return;
          }
        }
      } else {
        break;
      }
    } // Not enough data to read the next packet. Stop here and wait for
    // the next call to `_transform`.
 
 
    callback();
  }
 
  _transform(chunk, encoding, callback) {
    this.bl.append(chunk);
    this.processBufferedData(callback);
  }
 
}
 
module.exports = IncomingMessageStream;