schangxiang@126.com
2025-09-18 49a51c068d62084bc4c3e77c4be94a20de556c4a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved
 
//-----------------------------------------------------------------------------
//
// You may not use the identified files except in compliance with the Apache
// License, Version 2.0 (the "License.")
//
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0.
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
//-----------------------------------------------------------------------------
 
'use strict';
 
const { Readable } = require('stream');
 
class QueryStream extends Readable {
 
  constructor(rs) {
    super({ objectMode: true });
    this._fetching = false;
    this._numRows = 0;
 
    // calling open via process.nextTick to allow event handlers to be
    // registered prior to the events being emitted
    if (rs) {
      process.nextTick(() => {
        this._open(rs);
      });
    }
  }
 
  // called by readable.destroy() and ensures that the result set is closed if
  // it has not already been closed (never called directly)
  async _destroy(err, cb) {
    if (this._resultSet) {
      const rs = this._resultSet;
      this._resultSet = null;
      if (this._fetching) {
        await new Promise(resolve =>
          this.once('_doneFetching', resolve));
      }
      try {
        await rs._close();
      } catch (closeErr) {
        cb(closeErr);
        return;
      }
    }
    cb(err);
  }
 
  // called when the query stream is to be associated with a result set; this
  // takes place when the query stream if constructed (if a result set is known
  // at that point) or by Connection.execute() when the result set is ready
  _open(rs) {
    this._resultSet = rs;
 
    // trigger the event listener that may have been added in _read() now that
    // the result set is ready
    this.emit('open');
 
    // emit a metadata event as a convenience to users
    this.emit('metadata', rs.metaData);
  }
 
  // called by readable.read() and pushes rows to the internal queue maintained
  // by the stream implementation (never called directly) appropriate
  async _read() {
 
    // still waiting on the result set to be added via _open() so add an event
    // listener to retry when ready
    if (!this._resultSet) {
      this.once('open', this._read);
      return;
    }
 
    // using the JS getRow() to leverage the JS row cache; the result set's
    // _allowGetRowCall is set to true to allow the call for query streams
    // created via ResultSet.toQueryStream()
    try {
      this._fetching = true;
      this._resultSet._allowGetRowCall = true;
      const row = await this._resultSet.getRow();
      this._fetching = false;
      if (!this._resultSet) {
        this.emit('_doneFetching');
        return;
      }
      if (row) {
        this.push(row);
      } else {
        this.push(null);
      }
    } catch (err) {
      this.destroy(err);
    }
  }
 
}
 
module.exports = QueryStream;