333
schangxiang@126.com
2025-09-19 18966e02fb573c7e2bb0c6426ed792b38b910940
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# EventStream
 
<img src=https://secure.travis-ci.org/dominictarr/event-stream.png?branch=master>
 
[![browser status](http://ci.testling.com/dominictarr/event-stream.png)]
(http://ci.testling.com/dominictarr/event-stream)
 
[Streams](http://nodejs.org/api/stream.html "Stream") are node's best and most misunderstood idea, and 
_<em>EventStream</em>_ is a toolkit to make creating and working with streams <em>easy</em>.  
 
Normally, streams are only used for IO,  
but in event stream we send all kinds of objects down the pipe.  
If your application's <em>input</em> and <em>output</em> are streams,  
shouldn't the <em>throughput</em> be a stream too?  
 
The *EventStream* functions resemble the array functions,  
because Streams are like Arrays, but laid out in time, rather than in memory.  
 
<em>All the `event-stream` functions return instances of `Stream`</em>.
 
`event-stream` creates 
[0.8 streams](https://github.com/joyent/node/blob/v0.8/doc/api/stream.markdown)
, which are compatible with [0.10 streams](http://nodejs.org/api/stream.html "Stream").
 
>NOTE: I shall use the term <em>"through stream"</em> to refer to a stream that is writable <em>and</em> readable.  
 
### [simple example](https://github.com/dominictarr/event-stream/blob/master/examples/pretty.js):
 
``` js
 
//pretty.js
 
if(!module.parent) {
  var es = require('event-stream')
  var inspect = require('util').inspect
 
  process.stdin                        //connect streams together with `pipe`
    .pipe(es.split())                  //split stream to break on newlines
    .pipe(es.map(function (data, cb) { //turn this async function into a stream
      cb(null
        , inspect(JSON.parse(data)))   //render it nicely
    }))
    .pipe(process.stdout)              // pipe it to stdout !
}
```
run it ...
 
``` bash  
curl -sS registry.npmjs.org/event-stream | node pretty.js
```
 
[node Stream documentation](http://nodejs.org/api/stream.html)
 
## through (write?, end?)
 
Re-emits data synchronously. Easy way to create synchronous through streams.
Pass in optional `write` and `end` methods. They will be called in the 
context of the stream. Use `this.pause()` and `this.resume()` to manage flow.
Check `this.paused` to see current flow state. (write always returns `!this.paused`)
 
this function is the basis for most of the synchronous streams in `event-stream`.
 
``` js
 
es.through(function write(data) {
    this.emit('data', data)
    //this.pause() 
  },
  function end () { //optional
    this.emit('end')
  })
 
```
 
## map (asyncFunction)
 
Create a through stream from an asynchronous function.  
 
``` js
var es = require('event-stream')
 
es.map(function (data, callback) {
  //transform data
  // ...
  callback(null, data)
})
 
```
 
Each map MUST call the callback. It may callback with data, with an error or with no arguments, 
 
  * `callback()` drop this data.  
    this makes the map work like `filter`,  
    note:`callback(null,null)` is not the same, and will emit `null`
 
  * `callback(null, newData)` turn data into newData
    
  * `callback(error)` emit an error for this item.
 
>Note: if a callback is not called, `map` will think that it is still being processed,   
>every call must be answered or the stream will not know when to end.  
>
>Also, if the callback is called more than once, every call but the first will be ignored.
 
## mapSync (syncFunction)
 
Same as `map`, but the callback is called synchronously. Based on `es.through`
 
## split (matcher)
 
Break up a stream and reassemble it so that each line is a chunk. matcher may be a `String`, or a `RegExp` 
 
Example, read every line in a file ...
 
``` js
fs.createReadStream(file, {flags: 'r'})
  .pipe(es.split())
  .pipe(es.map(function (line, cb) {
    //do something with the line 
    cb(null, line)
  }))
```
 
`split` takes the same arguments as `string.split` except it defaults to '\n' instead of ',', and the optional `limit` parameter is ignored.
[String#split](https://developer.mozilla.org/en/JavaScript/Reference/Global_Objects/String/split)
 
## join (separator)
 
Create a through stream that emits `separator` between each chunk, just like Array#join.
 
(for legacy reasons, if you pass a callback instead of a string, join is a synonym for `es.wait`)
 
## merge (stream1,...,streamN) or merge (streamArray) 
> concat → merge
 
Merges streams into one and returns it.
Incoming data will be emitted as soon it comes into - no ordering will be applied (for example: `data1 data1 data2 data1 data2` - where `data1` and `data2` is data from two streams).
Counts how many streams were passed to it and emits end only when all streams emitted end.
 
```js
es.merge(
  process.stdout,
  process.stderr
).pipe(fs.createWriteStream('output.log'));
```
 
It can also take an Array of streams as input like this: 
```js
es.merge([
  fs.createReadStream('input1.txt'),
  fs.createReadStream('input2.txt')
]).pipe(fs.createWriteStream('output.log'));
```
 
## replace (from, to)
 
Replace all occurrences of `from` with `to`. `from` may be a `String` or a `RegExp`.  
Works just like `string.split(from).join(to)`, but streaming.
 
 
## parse
 
Convenience function for parsing JSON chunks. For newline separated JSON,
use with `es.split`.  By default it logs parsing errors by `console.error`;
for another behaviour, transforms created by `es.parse({error: true})` will
emit error events for exceptions thrown from `JSON.parse`, unmodified.
 
``` js
fs.createReadStream(filename)
  .pipe(es.split()) //defaults to lines.
  .pipe(es.parse())
```
 
## stringify
 
convert javascript objects into lines of text. The text will have whitespace escaped and have a `\n` appended, so it will be compatible with `es.parse`
 
``` js
objectStream
  .pipe(es.stringify())
  .pipe(fs.createWriteStream(filename))
```
 
## readable (asyncFunction) 
 
create a readable stream (that respects pause) from an async function.  
while the stream is not paused,  
the function will be polled with `(count, callback)`,  
and `this`  will be the readable stream.
 
``` js
 
es.readable(function (count, callback) {
  if(streamHasEnded)
    return this.emit('end')
  
  //...
  
  this.emit('data', data) //use this way to emit multiple chunks per call.
      
  callback() // you MUST always call the callback eventually.
             // the function will not be called again until you do this.
})
```
you can also pass the data and the error to the callback.  
you may only call the callback once.  
calling the same callback more than once will have no effect.  
 
## readArray (array)
 
Create a readable stream from an Array.
 
Just emit each item as a data event, respecting `pause` and `resume`.
 
``` js
  var es = require('event-stream')
    , reader = es.readArray([1,2,3])
 
  reader.pipe(...)
```
 
If you want the stream behave like a 0.10 stream you will need to wrap it using [`Readable.wrap()`](http://nodejs.org/api/stream.html#stream_readable_wrap_stream) function. Example:
 
``` js
    var s = new stream.Readable({objectMode: true}).wrap(es.readArray([1,2,3]));
```
 
## writeArray (callback)
 
create a writeable stream from a callback,  
all `data` events are stored in an array, which is passed to the callback when the stream ends.
 
``` js
  var es = require('event-stream')
    , reader = es.readArray([1, 2, 3])
    , writer = es.writeArray(function (err, array){
      //array deepEqual [1, 2, 3]
    })
 
  reader.pipe(writer)
```
 
## pause  () 
 
A stream that buffers all chunks when paused.
 
 
``` js
  var ps = es.pause()
  ps.pause() //buffer the stream, also do not allow 'end' 
  ps.resume() //allow chunks through
```
 
## duplex (writeStream, readStream)
 
Takes a writable stream and a readable stream and makes them appear as a readable writable stream.
 
It is assumed that the two streams are connected to each other in some way.  
 
(This is used by `pipeline` and `child`.)
 
``` js
  var grep = cp.exec('grep Stream')
 
  es.duplex(grep.stdin, grep.stdout)
```
 
## child (child_process)
 
Create a through stream from a child process ...
 
``` js
  var cp = require('child_process')
 
  es.child(cp.exec('grep Stream')) // a through stream
 
```
 
## wait (callback)
 
waits for stream to emit 'end'.
joins chunks of a stream into a single string or buffer. 
takes an optional callback, which will be passed the 
complete string/buffer when it receives the 'end' event.
 
also, emits a single 'data' event.
 
``` js
 
readStream.pipe(es.wait(function (err, body) {
  // have complete text here.
}))
 
```
 
# Other Stream Modules
 
These modules are not included as a part of *EventStream* but may be
useful when working with streams.
 
## [reduce (syncFunction, initial)](https://github.com/parshap/node-stream-reduce)
 
Like `Array.prototype.reduce` but for streams. Given a sync reduce
function and an initial value it will return a through stream that emits
a single data event with the reduced value once the input stream ends.
 
``` js
var reduce = require("stream-reduce");
process.stdin.pipe(reduce(function(acc, data) {
  return acc + data.length;
}, 0)).on("data", function(length) {
  console.log("stdin size:", length);
});
```