| var async = require('./async.js'); | 
|   | 
| // API | 
| module.exports = { | 
|   iterator: wrapIterator, | 
|   callback: wrapCallback | 
| }; | 
|   | 
| /** | 
|  * Wraps iterators with long signature | 
|  * | 
|  * @this    ReadableAsyncKit# | 
|  * @param   {function} iterator - function to wrap | 
|  * @returns {function} - wrapped function | 
|  */ | 
| function wrapIterator(iterator) | 
| { | 
|   var stream = this; | 
|   | 
|   return function(item, key, cb) | 
|   { | 
|     var aborter | 
|       , wrappedCb = async(wrapIteratorCallback.call(stream, cb, key)) | 
|       ; | 
|   | 
|     stream.jobs[key] = wrappedCb; | 
|   | 
|     // it's either shortcut (item, cb) | 
|     if (iterator.length == 2) | 
|     { | 
|       aborter = iterator(item, wrappedCb); | 
|     } | 
|     // or long format (item, key, cb) | 
|     else | 
|     { | 
|       aborter = iterator(item, key, wrappedCb); | 
|     } | 
|   | 
|     return aborter; | 
|   }; | 
| } | 
|   | 
| /** | 
|  * Wraps provided callback function | 
|  * allowing to execute snitch function before | 
|  * real callback | 
|  * | 
|  * @this    ReadableAsyncKit# | 
|  * @param   {function} callback - function to wrap | 
|  * @returns {function} - wrapped function | 
|  */ | 
| function wrapCallback(callback) | 
| { | 
|   var stream = this; | 
|   | 
|   var wrapped = function(error, result) | 
|   { | 
|     return finisher.call(stream, error, result, callback); | 
|   }; | 
|   | 
|   return wrapped; | 
| } | 
|   | 
| /** | 
|  * Wraps provided iterator callback function | 
|  * makes sure snitch only called once, | 
|  * but passes secondary calls to the original callback | 
|  * | 
|  * @this    ReadableAsyncKit# | 
|  * @param   {function} callback - callback to wrap | 
|  * @param   {number|string} key - iteration key | 
|  * @returns {function} wrapped callback | 
|  */ | 
| function wrapIteratorCallback(callback, key) | 
| { | 
|   var stream = this; | 
|   | 
|   return function(error, output) | 
|   { | 
|     // don't repeat yourself | 
|     if (!(key in stream.jobs)) | 
|     { | 
|       callback(error, output); | 
|       return; | 
|     } | 
|   | 
|     // clean up jobs | 
|     delete stream.jobs[key]; | 
|   | 
|     return streamer.call(stream, error, {key: key, value: output}, callback); | 
|   }; | 
| } | 
|   | 
| /** | 
|  * Stream wrapper for iterator callback | 
|  * | 
|  * @this  ReadableAsyncKit# | 
|  * @param {mixed} error - error response | 
|  * @param {mixed} output - iterator output | 
|  * @param {function} callback - callback that expects iterator results | 
|  */ | 
| function streamer(error, output, callback) | 
| { | 
|   if (error && !this.error) | 
|   { | 
|     this.error = error; | 
|     this.pause(); | 
|     this.emit('error', error); | 
|     // send back value only, as expected | 
|     callback(error, output && output.value); | 
|     return; | 
|   } | 
|   | 
|   // stream stuff | 
|   this.push(output); | 
|   | 
|   // back to original track | 
|   // send back value only, as expected | 
|   callback(error, output && output.value); | 
| } | 
|   | 
| /** | 
|  * Stream wrapper for finishing callback | 
|  * | 
|  * @this  ReadableAsyncKit# | 
|  * @param {mixed} error - error response | 
|  * @param {mixed} output - iterator output | 
|  * @param {function} callback - callback that expects final results | 
|  */ | 
| function finisher(error, output, callback) | 
| { | 
|   // signal end of the stream | 
|   // only for successfully finished streams | 
|   if (!error) | 
|   { | 
|     this.push(null); | 
|   } | 
|   | 
|   // back to original track | 
|   callback(error, output); | 
| } |