| import { Operator } from '../Operator'; | 
| import { Subscriber } from '../Subscriber'; | 
| import { Observable } from '../Observable'; | 
| import { Subject } from '../Subject'; | 
| import { Subscription } from '../Subscription'; | 
| import { OuterSubscriber } from '../OuterSubscriber'; | 
| import { InnerSubscriber } from '../InnerSubscriber'; | 
| import { subscribeToResult } from '../util/subscribeToResult'; | 
| import { OperatorFunction } from '../types'; | 
|   | 
| /** | 
|  * Branch out the source Observable values as a nested Observable using a | 
|  * factory function of closing Observables to determine when to start a new | 
|  * window. | 
|  * | 
|  * <span class="informal">It's like {@link bufferWhen}, but emits a nested | 
|  * Observable instead of an array.</span> | 
|  * | 
|  *  | 
|  * | 
|  * Returns an Observable that emits windows of items it collects from the source | 
|  * Observable. The output Observable emits connected, non-overlapping windows. | 
|  * It emits the current window and opens a new one whenever the Observable | 
|  * produced by the specified `closingSelector` function emits an item. The first | 
|  * window is opened immediately when subscribing to the output Observable. | 
|  * | 
|  * ## Example | 
|  * Emit only the first two clicks events in every window of [1-5] random seconds | 
|  * ```ts | 
|  * import { fromEvent, interval } from 'rxjs'; | 
|  * import { windowWhen, map, mergeAll, take } from 'rxjs/operators'; | 
|  * | 
|  * const clicks = fromEvent(document, 'click'); | 
|  * const result = clicks.pipe( | 
|  *   windowWhen(() => interval(1000 + Math.random() * 4000)), | 
|  *   map(win => win.pipe(take(2))),     // each window has at most 2 emissions | 
|  *   mergeAll()                         // flatten the Observable-of-Observables | 
|  * ); | 
|  * result.subscribe(x => console.log(x)); | 
|  * ``` | 
|  * | 
|  * @see {@link window} | 
|  * @see {@link windowCount} | 
|  * @see {@link windowTime} | 
|  * @see {@link windowToggle} | 
|  * @see {@link bufferWhen} | 
|  * | 
|  * @param {function(): Observable} closingSelector A function that takes no | 
|  * arguments and returns an Observable that signals (on either `next` or | 
|  * `complete`) when to close the previous window and start a new one. | 
|  * @return {Observable<Observable<T>>} An observable of windows, which in turn | 
|  * are Observables. | 
|  * @method windowWhen | 
|  * @owner Observable | 
|  */ | 
| export function windowWhen<T>(closingSelector: () => Observable<any>): OperatorFunction<T, Observable<T>> { | 
|   return function windowWhenOperatorFunction(source: Observable<T>) { | 
|     return source.lift(new WindowOperator<T>(closingSelector)); | 
|   }; | 
| } | 
|   | 
| class WindowOperator<T> implements Operator<T, Observable<T>> { | 
|   constructor(private closingSelector: () => Observable<any>) { | 
|   } | 
|   | 
|   call(subscriber: Subscriber<Observable<T>>, source: any): any { | 
|     return source.subscribe(new WindowSubscriber(subscriber, this.closingSelector)); | 
|   } | 
| } | 
|   | 
| /** | 
|  * We need this JSDoc comment for affecting ESDoc. | 
|  * @ignore | 
|  * @extends {Ignored} | 
|  */ | 
| class WindowSubscriber<T> extends OuterSubscriber<T, any> { | 
|   private window: Subject<T>; | 
|   private closingNotification: Subscription; | 
|   | 
|   constructor(protected destination: Subscriber<Observable<T>>, | 
|               private closingSelector: () => Observable<any>) { | 
|     super(destination); | 
|     this.openWindow(); | 
|   } | 
|   | 
|   notifyNext(outerValue: T, innerValue: any, | 
|              outerIndex: number, innerIndex: number, | 
|              innerSub: InnerSubscriber<T, any>): void { | 
|     this.openWindow(innerSub); | 
|   } | 
|   | 
|   notifyError(error: any, innerSub: InnerSubscriber<T, any>): void { | 
|     this._error(error); | 
|   } | 
|   | 
|   notifyComplete(innerSub: InnerSubscriber<T, any>): void { | 
|     this.openWindow(innerSub); | 
|   } | 
|   | 
|   protected _next(value: T): void { | 
|     this.window.next(value); | 
|   } | 
|   | 
|   protected _error(err: any): void { | 
|     this.window.error(err); | 
|     this.destination.error(err); | 
|     this.unsubscribeClosingNotification(); | 
|   } | 
|   | 
|   protected _complete(): void { | 
|     this.window.complete(); | 
|     this.destination.complete(); | 
|     this.unsubscribeClosingNotification(); | 
|   } | 
|   | 
|   private unsubscribeClosingNotification(): void { | 
|     if (this.closingNotification) { | 
|       this.closingNotification.unsubscribe(); | 
|     } | 
|   } | 
|   | 
|   private openWindow(innerSub: InnerSubscriber<T, any> = null): void { | 
|     if (innerSub) { | 
|       this.remove(innerSub); | 
|       innerSub.unsubscribe(); | 
|     } | 
|   | 
|     const prevWindow = this.window; | 
|     if (prevWindow) { | 
|       prevWindow.complete(); | 
|     } | 
|   | 
|     const window = this.window = new Subject<T>(); | 
|     this.destination.next(window); | 
|   | 
|     let closingNotifier; | 
|     try { | 
|       const { closingSelector } = this; | 
|       closingNotifier = closingSelector(); | 
|     } catch (e) { | 
|       this.destination.error(e); | 
|       this.window.error(e); | 
|       return; | 
|     } | 
|     this.add(this.closingNotification = subscribeToResult(this, closingNotifier)); | 
|   } | 
| } |