From 10cc8a68850e1da5e7af30fc34e84f52f65b2140 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 30 Jul 2020 19:36:47 -0500 Subject: [PATCH] refactor: Improve memory pressure (#5613) this is the duplicate of #5610, it is refactoring to ensure outer values are not retained when they do not have to be. It needs to be done in a separate PR because the branches diverge just enough to require it. This PR also has some mild, internal type fixes. --- src/internal/innerSubscribe.ts | 114 ++++++++++++++++++++ src/internal/observable/combineLatest.ts | 27 +++-- src/internal/observable/race.ts | 17 ++- src/internal/observable/zip.ts | 49 ++++----- src/internal/operators/audit.ts | 22 ++-- src/internal/operators/buffer.ts | 14 +-- src/internal/operators/bufferToggle.ts | 30 +++--- src/internal/operators/bufferWhen.ts | 24 ++--- src/internal/operators/catchError.ts | 10 +- src/internal/operators/debounce.ts | 29 +++-- src/internal/operators/delayWhen.ts | 12 +-- src/internal/operators/distinct.ts | 24 ++--- src/internal/operators/exhaust.ts | 15 ++- src/internal/operators/exhaustMap.ts | 33 +++--- src/internal/operators/expand.ts | 36 +++---- src/internal/operators/mergeMap.ts | 31 +++--- src/internal/operators/mergeScan.ts | 34 +++--- src/internal/operators/onErrorResumeNext.ts | 23 ++-- src/internal/operators/repeatWhen.ts | 31 +++--- src/internal/operators/retryWhen.ts | 33 +++--- src/internal/operators/sample.ts | 16 ++- src/internal/operators/skipUntil.ts | 16 ++- src/internal/operators/switchMap.ts | 34 +++--- src/internal/operators/switchMapTo.ts | 7 -- src/internal/operators/takeUntil.ts | 13 +-- src/internal/operators/throttle.ts | 27 ++--- src/internal/operators/timeoutWith.ts | 21 ++-- src/internal/operators/window.ts | 24 ++--- src/internal/operators/windowWhen.ts | 18 ++-- src/internal/operators/withLatestFrom.ts | 15 ++- 30 files changed, 403 insertions(+), 396 deletions(-) create mode 100644 src/internal/innerSubscribe.ts diff --git a/src/internal/innerSubscribe.ts b/src/internal/innerSubscribe.ts new file mode 100644 index 0000000000..53311c5241 --- /dev/null +++ b/src/internal/innerSubscribe.ts @@ -0,0 +1,114 @@ +/** @prettier */ +import { Subscription } from './Subscription'; +import { Subscriber } from './Subscriber'; +import { Observable } from './Observable'; +import { subscribeTo } from './util/subscribeTo'; + +interface SimpleOuterSubscriberLike { + /** + * A handler for inner next notifications from the inner subscription + * @param innerValue the value nexted by the inner producer + */ + notifyNext(innerValue: T): void; + /** + * A handler for inner error notifications from the inner subscription + * @param err the error from the inner producer + */ + notifyError(err: any): void; + /** + * A handler for inner complete notifications from the inner subscription. + */ + notifyComplete(): void; +} + +export class SimpleInnerSubscriber extends Subscriber { + constructor(private parent: SimpleOuterSubscriberLike) { + super(); + } + + protected _next(value: T): void { + this.parent.notifyNext(value); + } + + protected _error(error: any): void { + this.parent.notifyError(error); + this.unsubscribe(); + } + + protected _complete(): void { + this.parent.notifyComplete(); + this.unsubscribe(); + } +} + +export class ComplexInnerSubscriber extends Subscriber { + constructor(private parent: ComplexOuterSubscriber, public outerValue: T, public outerIndex: number) { + super(); + } + + protected _next(value: R): void { + this.parent.notifyNext(this.outerValue, value, this.outerIndex, this); + } + + protected _error(error: any): void { + this.parent.notifyError(error); + this.unsubscribe(); + } + + protected _complete(): void { + this.parent.notifyComplete(this); + this.unsubscribe(); + } +} + +export class SimpleOuterSubscriber extends Subscriber implements SimpleOuterSubscriberLike { + notifyNext(innerValue: R): void { + this.destination.next(innerValue); + } + + notifyError(err: any): void { + this.destination.error(err); + } + + notifyComplete(): void { + this.destination.complete(); + } +} + +/** + * DO NOT USE (formerly "OuterSubscriber") + * TODO: We want to refactor this and remove it. It is retaining values it shouldn't for long + * periods of time. + */ +export class ComplexOuterSubscriber extends Subscriber { + /** + * @param _outerValue Used by: bufferToggle, delayWhen, windowToggle + * @param innerValue Used by: subclass default, combineLatest, race, bufferToggle, windowToggle, withLatestFrom + * @param _outerIndex Used by: combineLatest, race, withLatestFrom + * @param _innerSub Used by: delayWhen + */ + notifyNext(_outerValue: T, innerValue: R, _outerIndex: number, _innerSub: ComplexInnerSubscriber): void { + this.destination.next(innerValue); + } + + notifyError(error: any): void { + this.destination.error(error); + } + + /** + * @param _innerSub Used by: race, bufferToggle, delayWhen, windowToggle, windowWhen + */ + notifyComplete(_innerSub: ComplexInnerSubscriber): void { + this.destination.complete(); + } +} + +export function innerSubscribe(result: any, innerSubscriber: Subscriber): Subscription | undefined { + if (innerSubscriber.closed) { + return undefined; + } + if (result instanceof Observable) { + return result.subscribe(innerSubscriber); + } + return subscribeTo(result)(innerSubscriber) as Subscription; +} diff --git a/src/internal/observable/combineLatest.ts b/src/internal/observable/combineLatest.ts index ce093c9c20..ac5e6ea08d 100644 --- a/src/internal/observable/combineLatest.ts +++ b/src/internal/observable/combineLatest.ts @@ -226,8 +226,8 @@ export function combineLatest(...observables: Array | (( export function combineLatest, R>( ...observables: (O | ((...values: ObservedValueOf[]) => R) | SchedulerLike)[] ): Observable { - let resultSelector: (...values: Array) => R = null; - let scheduler: SchedulerLike = null; + let resultSelector: ((...values: Array) => R) | undefined = undefined; + let scheduler: SchedulerLike|undefined = undefined; if (isScheduler(observables[observables.length - 1])) { scheduler = observables.pop() as SchedulerLike; @@ -243,7 +243,7 @@ export function combineLatest, R>( observables = observables[0] as any; } - return fromArray(observables, scheduler).lift(new CombineLatestOperator, R>(resultSelector)); + return fromArray(observables, scheduler).lift(new CombineLatestOperator(resultSelector)); } export class CombineLatestOperator implements Operator { @@ -264,7 +264,7 @@ export class CombineLatestSubscriber extends OuterSubscriber { private active: number = 0; private values: any[] = []; private observables: any[] = []; - private toRespond: number; + private toRespond?: number; constructor(destination: Subscriber, private resultSelector?: (...values: Array) => R) { super(destination); @@ -279,26 +279,25 @@ export class CombineLatestSubscriber extends OuterSubscriber { const observables = this.observables; const len = observables.length; if (len === 0) { - this.destination.complete(); + this.destination.complete!(); } else { this.active = len; this.toRespond = len; for (let i = 0; i < len; i++) { const observable = observables[i]; - this.add(subscribeToResult(this, observable, observable, i)); + this.add(subscribeToResult(this, observable, undefined, i)); } } } notifyComplete(unused: Subscriber): void { if ((this.active -= 1) === 0) { - this.destination.complete(); + this.destination.complete!(); } } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(_outerValue: T, innerValue: R, + outerIndex: number): void { const values = this.values; const oldVal = values[outerIndex]; const toRespond = !this.toRespond @@ -310,7 +309,7 @@ export class CombineLatestSubscriber extends OuterSubscriber { if (this.resultSelector) { this._tryResultSelector(values); } else { - this.destination.next(values.slice()); + this.destination.next!(values.slice()); } } } @@ -318,11 +317,11 @@ export class CombineLatestSubscriber extends OuterSubscriber { private _tryResultSelector(values: any[]) { let result: any; try { - result = this.resultSelector.apply(this, values); + result = this.resultSelector!.apply(this, values); } catch (err) { - this.destination.error(err); + this.destination.error!(err); return; } - this.destination.next(result); + this.destination.next!(result); } } diff --git a/src/internal/observable/race.ts b/src/internal/observable/race.ts index e6e13ded30..06b587c2db 100644 --- a/src/internal/observable/race.ts +++ b/src/internal/observable/race.ts @@ -102,24 +102,23 @@ export class RaceSubscriber extends OuterSubscriber { const len = observables.length; if (len === 0) { - this.destination.complete(); + this.destination.complete!(); } else { for (let i = 0; i < len && !this.hasFirst; i++) { - let observable = observables[i]; - let subscription = subscribeToResult(this, observable, observable as any, i); + const observable = observables[i]; + const subscription = subscribeToResult(this, observable, undefined, i)!; if (this.subscriptions) { this.subscriptions.push(subscription); } this.add(subscription); } - this.observables = null; + this.observables = null!; } } - notifyNext(outerValue: T, innerValue: T, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(_outerValue: T, innerValue: T, + outerIndex: number): void { if (!this.hasFirst) { this.hasFirst = true; @@ -132,9 +131,9 @@ export class RaceSubscriber extends OuterSubscriber { } } - this.subscriptions = null; + this.subscriptions = null!; } - this.destination.next(innerValue); + this.destination.next!(innerValue); } } diff --git a/src/internal/observable/zip.ts b/src/internal/observable/zip.ts index fbe0756573..1f03caa30c 100644 --- a/src/internal/observable/zip.ts +++ b/src/internal/observable/zip.ts @@ -5,10 +5,8 @@ import { Operator } from '../Operator'; import { ObservableInput, PartialObserver, ObservedValueOf } from '../types'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { iterator as Symbol_iterator } from '../../internal/symbol/iterator'; +import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe'; /* tslint:disable:max-line-length */ /** @deprecated resultSelector is no longer supported, pipe to map instead */ @@ -87,7 +85,7 @@ export function zip, R>( export class ZipOperator implements Operator { - resultSelector: (...values: Array) => R; + resultSelector?: (...values: Array) => R; constructor(resultSelector?: (...values: Array) => R) { this.resultSelector = resultSelector; @@ -104,17 +102,14 @@ export class ZipOperator implements Operator { * @extends {Ignored} */ export class ZipSubscriber extends Subscriber { - private values: any; - private resultSelector: (...values: Array) => R; private iterators: LookAheadIterator[] = []; private active = 0; constructor(destination: Subscriber, - resultSelector?: (...values: Array) => R, + private resultSelector?: (...values: Array) => R, values: any = Object.create(null)) { super(destination); - this.resultSelector = (typeof resultSelector === 'function') ? resultSelector : null; - this.values = values; + this.resultSelector = (typeof resultSelector === 'function') ? resultSelector : undefined; } protected _next(value: any) { @@ -135,7 +130,7 @@ export class ZipSubscriber extends Subscriber { this.unsubscribe(); if (len === 0) { - this.destination.complete(); + this.destination.complete!(); return; } @@ -144,7 +139,7 @@ export class ZipSubscriber extends Subscriber { let iterator: ZipBufferIterator = iterators[i]; if (iterator.stillUnsubscribed) { const destination = this.destination as Subscription; - destination.add(iterator.subscribe(iterator, i)); + destination.add(iterator.subscribe()); } else { this.active--; // not an observable } @@ -154,7 +149,7 @@ export class ZipSubscriber extends Subscriber { notifyInactive() { this.active--; if (this.active === 0) { - this.destination.complete(); + this.destination.complete!(); } } @@ -184,7 +179,7 @@ export class ZipSubscriber extends Subscriber { } if (result.done) { - destination.complete(); + destination.complete!(); return; } @@ -194,23 +189,23 @@ export class ZipSubscriber extends Subscriber { if (this.resultSelector) { this._tryresultSelector(args); } else { - destination.next(args); + destination.next!(args); } if (shouldComplete) { - destination.complete(); + destination.complete!(); } } protected _tryresultSelector(args: any[]) { let result: any; try { - result = this.resultSelector.apply(this, args); + result = this.resultSelector!.apply(this, args); } catch (err) { - this.destination.error(err); + this.destination.error!(err); return; } - this.destination.next(result); + this.destination.next!(result); } } @@ -236,9 +231,9 @@ class StaticIterator implements LookAheadIterator { return result; } - hasCompleted() { + hasCompleted(): boolean { const nextResult = this.nextResult; - return nextResult && nextResult.done; + return Boolean(nextResult && nextResult.done); } } @@ -274,7 +269,7 @@ class StaticArrayIterator implements LookAheadIterator { * @ignore * @extends {Ignored} */ -class ZipBufferIterator extends OuterSubscriber implements LookAheadIterator { +class ZipBufferIterator extends SimpleOuterSubscriber implements LookAheadIterator { stillUnsubscribed = true; buffer: T[] = []; isComplete = false; @@ -296,7 +291,7 @@ class ZipBufferIterator extends OuterSubscriber implements LookAhead if (buffer.length === 0 && this.isComplete) { return { value: null, done: true }; } else { - return { value: buffer.shift(), done: false }; + return { value: buffer.shift()!, done: false }; } } @@ -313,18 +308,16 @@ class ZipBufferIterator extends OuterSubscriber implements LookAhead this.isComplete = true; this.parent.notifyInactive(); } else { - this.destination.complete(); + this.destination.complete!(); } } - notifyNext(outerValue: T, innerValue: any, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(innerValue: any): void { this.buffer.push(innerValue); this.parent.checkIterators(); } - subscribe(value: any, index: number) { - return subscribeToResult(this, this.observable, this, index); + subscribe() { + return innerSubscribe(this.observable, new SimpleInnerSubscriber(this)); } } diff --git a/src/internal/operators/audit.ts b/src/internal/operators/audit.ts index 9028abdb6d..9418ff992e 100644 --- a/src/internal/operators/audit.ts +++ b/src/internal/operators/audit.ts @@ -3,9 +3,7 @@ import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { Subscription } from '../Subscription'; import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from '../types'; - -import { OuterSubscriber } from '../OuterSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /** * Ignores source values for a duration determined by another Observable, then @@ -73,11 +71,11 @@ class AuditOperator implements Operator { * @ignore * @extends {Ignored} */ -class AuditSubscriber extends OuterSubscriber { +class AuditSubscriber extends SimpleOuterSubscriber { - private value: T; + private value?: T; private hasValue: boolean = false; - private throttled: Subscription; + private throttled?: Subscription; constructor(destination: Subscriber, private durationSelector: (value: T) => SubscribableOrPromise) { @@ -93,9 +91,9 @@ class AuditSubscriber extends OuterSubscriber { const { durationSelector } = this; duration = durationSelector(value); } catch (err) { - return this.destination.error(err); + return this.destination.error!(err); } - const innerSubscription = subscribeToResult(this, duration); + const innerSubscription = innerSubscribe(duration, new SimpleInnerSubscriber(this)); if (!innerSubscription || innerSubscription.closed) { this.clearThrottle(); } else { @@ -108,17 +106,17 @@ class AuditSubscriber extends OuterSubscriber { const { value, hasValue, throttled } = this; if (throttled) { this.remove(throttled); - this.throttled = null; + this.throttled = undefined; throttled.unsubscribe(); } if (hasValue) { - this.value = null; + this.value = undefined; this.hasValue = false; - this.destination.next(value); + this.destination.next!(value); } } - notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { + notifyNext(): void { this.clearThrottle(); } diff --git a/src/internal/operators/buffer.ts b/src/internal/operators/buffer.ts index a1abae420e..d3bb274e4e 100644 --- a/src/internal/operators/buffer.ts +++ b/src/internal/operators/buffer.ts @@ -1,10 +1,8 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { OperatorFunction } from '../types'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /** * Buffers the source Observable values until `closingNotifier` emits. @@ -67,23 +65,21 @@ class BufferOperator implements Operator { * @ignore * @extends {Ignored} */ -class BufferSubscriber extends OuterSubscriber { +class BufferSubscriber extends SimpleOuterSubscriber { private buffer: T[] = []; constructor(destination: Subscriber, closingNotifier: Observable) { super(destination); - this.add(subscribeToResult(this, closingNotifier)); + this.add(innerSubscribe(closingNotifier, new SimpleInnerSubscriber(this))); } protected _next(value: T) { this.buffer.push(value); } - notifyNext(outerValue: T, innerValue: any, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(): void { const buffer = this.buffer; this.buffer = []; - this.destination.next(buffer); + this.destination.next!(buffer); } } diff --git a/src/internal/operators/bufferToggle.ts b/src/internal/operators/bufferToggle.ts index 8676fa7495..19c5c8940e 100644 --- a/src/internal/operators/bufferToggle.ts +++ b/src/internal/operators/bufferToggle.ts @@ -87,7 +87,7 @@ class BufferToggleSubscriber extends OuterSubscriber { private contexts: Array> = []; constructor(destination: Subscriber, - private openings: SubscribableOrPromise, + openings: SubscribableOrPromise, private closingSelector: (value: O) => SubscribableOrPromise | void) { super(destination); this.add(subscribeToResult(this, openings)); @@ -104,31 +104,29 @@ class BufferToggleSubscriber extends OuterSubscriber { protected _error(err: any): void { const contexts = this.contexts; while (contexts.length > 0) { - const context = contexts.shift(); + const context = contexts.shift()!; context.subscription.unsubscribe(); - context.buffer = null; - context.subscription = null; + context.buffer = null!; + context.subscription = null!; } - this.contexts = null; + this.contexts = null!; super._error(err); } protected _complete(): void { const contexts = this.contexts; while (contexts.length > 0) { - const context = contexts.shift(); - this.destination.next(context.buffer); + const context = contexts.shift()!; + this.destination.next!(context.buffer); context.subscription.unsubscribe(); - context.buffer = null; - context.subscription = null; + context.buffer = null!; + context.subscription = null!; } - this.contexts = null; + this.contexts = null!; super._complete(); } - notifyNext(outerValue: any, innerValue: O, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(outerValue: any, innerValue: O): void { outerValue ? this.closeBuffer(outerValue) : this.openBuffer(innerValue); } @@ -153,7 +151,7 @@ class BufferToggleSubscriber extends OuterSubscriber { if (contexts && context) { const { buffer, subscription } = context; - this.destination.next(buffer); + this.destination.next!(buffer); contexts.splice(contexts.indexOf(context), 1); this.remove(subscription); subscription.unsubscribe(); @@ -168,12 +166,12 @@ class BufferToggleSubscriber extends OuterSubscriber { const context = { buffer, subscription }; contexts.push(context); - const innerSubscription = subscribeToResult(this, closingNotifier, context); + const innerSubscription = subscribeToResult(this, closingNotifier, context as any); if (!innerSubscription || innerSubscription.closed) { this.closeBuffer(context); } else { - ( innerSubscription).context = context; + (innerSubscription as any).context = context; this.add(innerSubscription); subscription.add(innerSubscription); diff --git a/src/internal/operators/bufferWhen.ts b/src/internal/operators/bufferWhen.ts index 2c3e96bc2e..3ea0169292 100644 --- a/src/internal/operators/bufferWhen.ts +++ b/src/internal/operators/bufferWhen.ts @@ -2,10 +2,8 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { OperatorFunction } from '../types'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /** * Buffers the source Observable values, using a factory function of closing @@ -70,10 +68,10 @@ class BufferWhenOperator implements Operator { * @ignore * @extends {Ignored} */ -class BufferWhenSubscriber extends OuterSubscriber { - private buffer: T[]; +class BufferWhenSubscriber extends SimpleOuterSubscriber { + private buffer?: T[]; private subscribing: boolean = false; - private closingSubscription: Subscription; + private closingSubscription?: Subscription; constructor(destination: Subscriber, private closingSelector: () => Observable) { super(destination); @@ -81,26 +79,24 @@ class BufferWhenSubscriber extends OuterSubscriber { } protected _next(value: T) { - this.buffer.push(value); + this.buffer!.push(value); } protected _complete() { const buffer = this.buffer; if (buffer) { - this.destination.next(buffer); + this.destination.next!(buffer); } super._complete(); } /** @deprecated This is an internal implementation detail, do not use. */ _unsubscribe() { - this.buffer = null; + this.buffer = undefined; this.subscribing = false; } - notifyNext(outerValue: T, innerValue: any, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(): void { this.openBuffer(); } @@ -122,7 +118,7 @@ class BufferWhenSubscriber extends OuterSubscriber { const buffer = this.buffer; if (this.buffer) { - this.destination.next(buffer); + this.destination.next!(buffer); } this.buffer = []; @@ -138,7 +134,7 @@ class BufferWhenSubscriber extends OuterSubscriber { this.closingSubscription = closingSubscription; this.add(closingSubscription); this.subscribing = true; - closingSubscription.add(subscribeToResult(this, closingNotifier)); + closingSubscription.add(innerSubscribe(closingNotifier, new SimpleInnerSubscriber(this))); this.subscribing = false; } } diff --git a/src/internal/operators/catchError.ts b/src/internal/operators/catchError.ts index 5aeb76fdad..af36ed7f3b 100644 --- a/src/internal/operators/catchError.ts +++ b/src/internal/operators/catchError.ts @@ -2,10 +2,8 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; +import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe'; /* tslint:disable:max-line-length */ export function catchError>(selector: (err: any, caught: Observable) => O): OperatorFunction>; @@ -113,7 +111,7 @@ class CatchOperator implements Operator { * @ignore * @extends {Ignored} */ -class CatchSubscriber extends OuterSubscriber { +class CatchSubscriber extends SimpleOuterSubscriber { constructor(destination: Subscriber, private selector: (err: any, caught: Observable) => ObservableInput, private caught: Observable) { @@ -135,9 +133,9 @@ class CatchSubscriber extends OuterSubscriber { return; } this._unsubscribeAndRecycle(); - const innerSubscriber = new InnerSubscriber(this, undefined, undefined); + const innerSubscriber = new SimpleInnerSubscriber(this); this.add(innerSubscriber); - const innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber); + const innerSubscription = innerSubscribe(result, innerSubscriber); // The returned subscription will usually be the subscriber that was // passed. However, interop subscribers will be wrapped and for // unsubscriptions to chain correctly, the wrapper needs to be added, too. diff --git a/src/internal/operators/debounce.ts b/src/internal/operators/debounce.ts index be2167d9bf..78fad34c68 100644 --- a/src/internal/operators/debounce.ts +++ b/src/internal/operators/debounce.ts @@ -3,10 +3,7 @@ import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from '../types'; - -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /** * Emits a value from the source Observable only after a particular time span @@ -74,10 +71,10 @@ class DebounceOperator implements Operator { * @ignore * @extends {Ignored} */ -class DebounceSubscriber extends OuterSubscriber { - private value: T; - private hasValue: boolean = false; - private durationSubscription: Subscription = null; +class DebounceSubscriber extends SimpleOuterSubscriber { + private value?: T; + private hasValue = false; + private durationSubscription?: Subscription; constructor(destination: Subscriber, private durationSelector: (value: T) => SubscribableOrPromise) { @@ -92,13 +89,13 @@ class DebounceSubscriber extends OuterSubscriber { this._tryNext(value, result); } } catch (err) { - this.destination.error(err); + this.destination.error!(err); } } protected _complete(): void { this.emitValue(); - this.destination.complete(); + this.destination.complete!(); } private _tryNext(value: T, duration: SubscribableOrPromise): void { @@ -110,15 +107,13 @@ class DebounceSubscriber extends OuterSubscriber { this.remove(subscription); } - subscription = subscribeToResult(this, duration); + subscription = innerSubscribe(duration, new SimpleInnerSubscriber(this)); if (subscription && !subscription.closed) { this.add(this.durationSubscription = subscription); } } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(): void { this.emitValue(); } @@ -131,7 +126,7 @@ class DebounceSubscriber extends OuterSubscriber { const value = this.value; const subscription = this.durationSubscription; if (subscription) { - this.durationSubscription = null; + this.durationSubscription = undefined; subscription.unsubscribe(); this.remove(subscription); } @@ -140,9 +135,9 @@ class DebounceSubscriber extends OuterSubscriber { // the value to synchronously re-enter this operator // recursively if the duration selector Observable // emits synchronously - this.value = null; + this.value = undefined; this.hasValue = false; - super._next(value); + super._next(value!); } } } diff --git a/src/internal/operators/delayWhen.ts b/src/internal/operators/delayWhen.ts index ce41541ce3..33cc44882b 100644 --- a/src/internal/operators/delayWhen.ts +++ b/src/internal/operators/delayWhen.ts @@ -107,10 +107,10 @@ class DelayWhenSubscriber extends OuterSubscriber { super(destination); } - notifyNext(outerValue: T, innerValue: any, - outerIndex: number, innerIndex: number, + notifyNext(outerValue: T, _innerValue: any, + _outerIndex: number, _innerIndex: number, innerSub: InnerSubscriber): void { - this.destination.next(outerValue); + this.destination.next!(outerValue); this.removeSubscription(innerSub); this.tryComplete(); } @@ -122,7 +122,7 @@ class DelayWhenSubscriber extends OuterSubscriber { notifyComplete(innerSub: InnerSubscriber): void { const value = this.removeSubscription(innerSub); if (value) { - this.destination.next(value); + this.destination.next!(value); } this.tryComplete(); } @@ -135,7 +135,7 @@ class DelayWhenSubscriber extends OuterSubscriber { this.tryDelay(delayNotifier, value); } } catch (err) { - this.destination.error(err); + this.destination.error!(err); } } @@ -168,7 +168,7 @@ class DelayWhenSubscriber extends OuterSubscriber { private tryComplete(): void { if (this.completed && this.delayNotifierSubscriptions.length === 0) { - this.destination.complete(); + this.destination.complete!(); } } } diff --git a/src/internal/operators/distinct.ts b/src/internal/operators/distinct.ts index b464420fca..7a9f9bee63 100644 --- a/src/internal/operators/distinct.ts +++ b/src/internal/operators/distinct.ts @@ -1,10 +1,8 @@ import { Observable } from '../Observable'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /** * Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items. @@ -70,7 +68,7 @@ export function distinct(keySelector?: (value: T) => K, } class DistinctOperator implements Operator { - constructor(private keySelector: (value: T) => K, private flushes: Observable) { + constructor(private keySelector?: (value: T) => K, private flushes?: Observable) { } call(subscriber: Subscriber, source: any): TeardownLogic { @@ -83,24 +81,22 @@ class DistinctOperator implements Operator { * @ignore * @extends {Ignored} */ -export class DistinctSubscriber extends OuterSubscriber { +export class DistinctSubscriber extends SimpleOuterSubscriber { private values = new Set(); - constructor(destination: Subscriber, private keySelector: (value: T) => K, flushes: Observable) { + constructor(destination: Subscriber, private keySelector?: (value: T) => K, flushes?: Observable) { super(destination); if (flushes) { - this.add(subscribeToResult(this, flushes)); + this.add(innerSubscribe(flushes, new SimpleInnerSubscriber(this))); } } - notifyNext(outerValue: T, innerValue: T, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(): void { this.values.clear(); } - notifyError(error: any, innerSub: InnerSubscriber): void { + notifyError(error: any): void { this._error(error); } @@ -116,9 +112,9 @@ export class DistinctSubscriber extends OuterSubscriber { let key: K; const { destination } = this; try { - key = this.keySelector(value); + key = this.keySelector!(value); } catch (err) { - destination.error(err); + destination.error!(err); return; } this._finalizeNext(key, value); @@ -128,7 +124,7 @@ export class DistinctSubscriber extends OuterSubscriber { const { values } = this; if (!values.has(key)) { values.add(key); - this.destination.next(value); + this.destination.next!(value); } } diff --git a/src/internal/operators/exhaust.ts b/src/internal/operators/exhaust.ts index ad14b760d6..54c5002a30 100644 --- a/src/internal/operators/exhaust.ts +++ b/src/internal/operators/exhaust.ts @@ -1,10 +1,8 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; -import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, TeardownLogic } from '../types'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; export function exhaust(): OperatorFunction, T>; export function exhaust(): OperatorFunction; @@ -68,7 +66,7 @@ class SwitchFirstOperator implements Operator { * @ignore * @extends {Ignored} */ -class SwitchFirstSubscriber extends OuterSubscriber { +class SwitchFirstSubscriber extends SimpleOuterSubscriber { private hasCompleted: boolean = false; private hasSubscription: boolean = false; @@ -79,22 +77,21 @@ class SwitchFirstSubscriber extends OuterSubscriber { protected _next(value: T): void { if (!this.hasSubscription) { this.hasSubscription = true; - this.add(subscribeToResult(this, value)); + this.add(innerSubscribe(value, new SimpleInnerSubscriber(this))); } } protected _complete(): void { this.hasCompleted = true; if (!this.hasSubscription) { - this.destination.complete(); + this.destination.complete!(); } } - notifyComplete(innerSub: Subscription): void { - this.remove(innerSub); + notifyComplete(): void { this.hasSubscription = false; if (this.hasCompleted) { - this.destination.complete(); + this.destination.complete!(); } } } diff --git a/src/internal/operators/exhaustMap.ts b/src/internal/operators/exhaustMap.ts index 3bf6ff4a41..9a50669c22 100644 --- a/src/internal/operators/exhaustMap.ts +++ b/src/internal/operators/exhaustMap.ts @@ -2,12 +2,10 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; import { map } from './map'; import { from } from '../observable/from'; +import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe'; /* tslint:disable:max-line-length */ export function exhaustMap>(project: (value: T, index: number) => O): OperatorFunction>; @@ -92,7 +90,7 @@ class ExhaustMapOperator implements Operator { * @ignore * @extends {Ignored} */ -class ExhaustMapSubscriber extends OuterSubscriber { +class ExhaustMapSubscriber extends SimpleOuterSubscriber { private hasSubscription = false; private hasCompleted = false; private index = 0; @@ -114,18 +112,18 @@ class ExhaustMapSubscriber extends OuterSubscriber { try { result = this.project(value, index); } catch (err) { - this.destination.error(err); + this.destination.error!(err); return; } this.hasSubscription = true; - this._innerSub(result, value, index); + this._innerSub(result); } - private _innerSub(result: ObservableInput, value: T, index: number): void { - const innerSubscriber = new InnerSubscriber(this, value, index); + private _innerSub(result: ObservableInput): void { + const innerSubscriber = new SimpleInnerSubscriber(this); const destination = this.destination as Subscription; destination.add(innerSubscriber); - const innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber); + const innerSubscription = innerSubscribe(result, innerSubscriber); // The returned subscription will usually be the subscriber that was // passed. However, interop subscribers will be wrapped and for // unsubscriptions to chain correctly, the wrapper needs to be added, too. @@ -137,28 +135,23 @@ class ExhaustMapSubscriber extends OuterSubscriber { protected _complete(): void { this.hasCompleted = true; if (!this.hasSubscription) { - this.destination.complete(); + this.destination.complete!(); } this.unsubscribe(); } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - this.destination.next(innerValue); + notifyNext(innerValue: R): void { + this.destination.next!(innerValue); } notifyError(err: any): void { - this.destination.error(err); + this.destination.error!(err); } - notifyComplete(innerSub: Subscription): void { - const destination = this.destination as Subscription; - destination.remove(innerSub); - + notifyComplete(): void { this.hasSubscription = false; if (this.hasCompleted) { - this.destination.complete(); + this.destination.complete!(); } } } diff --git a/src/internal/operators/expand.ts b/src/internal/operators/expand.ts index b686d61573..2047a78078 100644 --- a/src/internal/operators/expand.ts +++ b/src/internal/operators/expand.ts @@ -2,10 +2,8 @@ import { Observable } from '../Observable'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, OperatorFunction, ObservableInput, SchedulerLike } from '../types'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /* tslint:disable:max-line-length */ export function expand(project: (value: T, index: number) => ObservableInput, concurrent?: number, scheduler?: SchedulerLike): OperatorFunction; @@ -66,7 +64,7 @@ export function expand(project: (value: T, index: number) => ObservableInput< */ export function expand(project: (value: T, index: number) => ObservableInput, concurrent: number = Number.POSITIVE_INFINITY, - scheduler: SchedulerLike = undefined): OperatorFunction { + scheduler?: SchedulerLike): OperatorFunction { concurrent = (concurrent || 0) < 1 ? Number.POSITIVE_INFINITY : concurrent; return (source: Observable) => source.lift(new ExpandOperator(project, concurrent, scheduler)); @@ -75,7 +73,7 @@ export function expand(project: (value: T, index: number) => ObservableInp export class ExpandOperator implements Operator { constructor(private project: (value: T, index: number) => ObservableInput, private concurrent: number, - private scheduler: SchedulerLike) { + private scheduler?: SchedulerLike) { } call(subscriber: Subscriber, source: any): any { @@ -95,16 +93,16 @@ interface DispatchArg { * @ignore * @extends {Ignored} */ -export class ExpandSubscriber extends OuterSubscriber { +export class ExpandSubscriber extends SimpleOuterSubscriber { private index: number = 0; private active: number = 0; private hasCompleted: boolean = false; - private buffer: any[]; + private buffer?: any[]; constructor(destination: Subscriber, private project: (value: T, index: number) => ObservableInput, private concurrent: number, - private scheduler: SchedulerLike) { + private scheduler?: SchedulerLike) { super(destination); if (concurrent < Number.POSITIVE_INFINITY) { this.buffer = []; @@ -126,7 +124,7 @@ export class ExpandSubscriber extends OuterSubscriber { const index = this.index++; if (this.active < this.concurrent) { - destination.next(value); + destination.next!(value); try { const { project } = this; const result = project(value, index); @@ -135,46 +133,42 @@ export class ExpandSubscriber extends OuterSubscriber { } else { const state: DispatchArg = { subscriber: this, result, value, index }; const destination = this.destination as Subscription; - destination.add(this.scheduler.schedule>(ExpandSubscriber.dispatch, 0, state)); + destination.add(this.scheduler.schedule>(ExpandSubscriber.dispatch as any, 0, state)); } } catch (e) { - destination.error(e); + destination.error!(e); } } else { - this.buffer.push(value); + this.buffer!.push(value); } } private subscribeToProjection(result: any, value: T, index: number): void { this.active++; const destination = this.destination as Subscription; - destination.add(subscribeToResult(this, result, value, index)); + destination.add(innerSubscribe(result, new SimpleInnerSubscriber(this))); } protected _complete(): void { this.hasCompleted = true; if (this.hasCompleted && this.active === 0) { - this.destination.complete(); + this.destination.complete!(); } this.unsubscribe(); } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(innerValue: R): void { this._next(innerValue); } - notifyComplete(innerSub: Subscription): void { + notifyComplete(): void { const buffer = this.buffer; - const destination = this.destination as Subscription; - destination.remove(innerSub); this.active--; if (buffer && buffer.length > 0) { this._next(buffer.shift()); } if (this.hasCompleted && this.active === 0) { - this.destination.complete(); + this.destination.complete!(); } } } diff --git a/src/internal/operators/mergeMap.ts b/src/internal/operators/mergeMap.ts index 884e11271b..d8858462f1 100644 --- a/src/internal/operators/mergeMap.ts +++ b/src/internal/operators/mergeMap.ts @@ -2,12 +2,10 @@ import { Observable } from '../Observable'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; -import { subscribeToResult } from '../util/subscribeToResult'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; import { map } from './map'; import { from } from '../observable/from'; +import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe'; /* tslint:disable:max-line-length */ export function mergeMap>(project: (value: T, index: number) => O, concurrent?: number): OperatorFunction>; @@ -106,7 +104,7 @@ export class MergeMapOperator implements Operator { * @ignore * @extends {Ignored} */ -export class MergeMapSubscriber extends OuterSubscriber { +export class MergeMapSubscriber extends SimpleOuterSubscriber { private hasCompleted: boolean = false; private buffer: T[] = []; private active: number = 0; @@ -132,18 +130,18 @@ export class MergeMapSubscriber extends OuterSubscriber { try { result = this.project(value, index); } catch (err) { - this.destination.error(err); + this.destination.error!(err); return; } this.active++; - this._innerSub(result, value, index); + this._innerSub(result); } - private _innerSub(ish: ObservableInput, value: T, index: number): void { - const innerSubscriber = new InnerSubscriber(this, value, index); + private _innerSub(ish: ObservableInput): void { + const innerSubscriber = new SimpleInnerSubscriber(this); const destination = this.destination as Subscription; destination.add(innerSubscriber); - const innerSubscription = subscribeToResult(this, ish, undefined, undefined, innerSubscriber); + const innerSubscription = innerSubscribe(ish, innerSubscriber); // The returned subscription will usually be the subscriber that was // passed. However, interop subscribers will be wrapped and for // unsubscriptions to chain correctly, the wrapper needs to be added, too. @@ -155,25 +153,22 @@ export class MergeMapSubscriber extends OuterSubscriber { protected _complete(): void { this.hasCompleted = true; if (this.active === 0 && this.buffer.length === 0) { - this.destination.complete(); + this.destination.complete!(); } this.unsubscribe(); } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - this.destination.next(innerValue); + notifyNext(innerValue: R): void { + this.destination.next!(innerValue); } - notifyComplete(innerSub: Subscription): void { + notifyComplete(): void { const buffer = this.buffer; - this.remove(innerSub); this.active--; if (buffer.length > 0) { - this._next(buffer.shift()); + this._next(buffer.shift()!); } else if (this.active === 0 && this.hasCompleted) { - this.destination.complete(); + this.destination.complete!(); } } } diff --git a/src/internal/operators/mergeScan.ts b/src/internal/operators/mergeScan.ts index 60ff2882f9..c734852191 100644 --- a/src/internal/operators/mergeScan.ts +++ b/src/internal/operators/mergeScan.ts @@ -2,10 +2,8 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; -import { subscribeToResult } from '../util/subscribeToResult'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; import { ObservableInput, OperatorFunction } from '../types'; +import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe'; /** * Applies an accumulator function over the source Observable where the @@ -70,7 +68,7 @@ export class MergeScanOperator implements Operator { * @ignore * @extends {Ignored} */ -export class MergeScanSubscriber extends OuterSubscriber { +export class MergeScanSubscriber extends SimpleOuterSubscriber { private hasValue: boolean = false; private hasCompleted: boolean = false; private buffer: Observable[] = []; @@ -93,20 +91,20 @@ export class MergeScanSubscriber extends OuterSubscriber { const { accumulator } = this; ish = accumulator(this.acc, value, index); } catch (e) { - return destination.error(e); + return destination.error!(e); } this.active++; - this._innerSub(ish, value, index); + this._innerSub(ish); } else { this.buffer.push(value); } } - private _innerSub(ish: any, value: T, index: number): void { - const innerSubscriber = new InnerSubscriber(this, value, index); + private _innerSub(ish: any): void { + const innerSubscriber = new SimpleInnerSubscriber(this); const destination = this.destination as Subscription; destination.add(innerSubscriber); - const innerSubscription = subscribeToResult(this, ish, undefined, undefined, innerSubscriber); + const innerSubscription = innerSubscribe(ish, innerSubscriber); // The returned subscription will usually be the subscriber that was // passed. However, interop subscribers will be wrapped and for // unsubscriptions to chain correctly, the wrapper needs to be added, too. @@ -119,34 +117,30 @@ export class MergeScanSubscriber extends OuterSubscriber { this.hasCompleted = true; if (this.active === 0 && this.buffer.length === 0) { if (this.hasValue === false) { - this.destination.next(this.acc); + this.destination.next!(this.acc); } - this.destination.complete(); + this.destination.complete!(); } this.unsubscribe(); } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(innerValue: R): void { const { destination } = this; this.acc = innerValue; this.hasValue = true; - destination.next(innerValue); + destination.next!(innerValue); } - notifyComplete(innerSub: Subscription): void { + notifyComplete(): void { const buffer = this.buffer; - const destination = this.destination as Subscription; - destination.remove(innerSub); this.active--; if (buffer.length > 0) { this._next(buffer.shift()); } else if (this.active === 0 && this.hasCompleted) { if (this.hasValue === false) { - this.destination.next(this.acc); + this.destination.next!(this.acc); } - this.destination.complete(); + this.destination.complete!(); } } } diff --git a/src/internal/operators/onErrorResumeNext.ts b/src/internal/operators/onErrorResumeNext.ts index 058ca8ec0c..104ed9f5a4 100644 --- a/src/internal/operators/onErrorResumeNext.ts +++ b/src/internal/operators/onErrorResumeNext.ts @@ -4,10 +4,8 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; import { isArray } from '../util/isArray'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction } from '../types'; +import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe'; /* tslint:disable:max-line-length */ export function onErrorResumeNext(): OperatorFunction; @@ -113,14 +111,15 @@ export function onErrorResumeNextStatic(array: ObservableInput[]): Obser export function onErrorResumeNextStatic(...nextSources: Array | Array> | ((...values: Array) => R)>): Observable { - let source: ObservableInput = null; + let source: ObservableInput|undefined = undefined; if (nextSources.length === 1 && isArray(nextSources[0])) { - nextSources = >>nextSources[0]; + nextSources = nextSources[0] as ObservableInput[]; } - source = nextSources.shift(); + // TODO: resolve issue with passing no arguments. + source = nextSources.shift()!; - return from(source, null).lift(new OnErrorResumeNextOperator(nextSources)); + return from(source).lift(new OnErrorResumeNextOperator(nextSources)); } class OnErrorResumeNextOperator implements Operator { @@ -132,17 +131,17 @@ class OnErrorResumeNextOperator implements Operator { } } -class OnErrorResumeNextSubscriber extends OuterSubscriber { +class OnErrorResumeNextSubscriber extends SimpleOuterSubscriber { constructor(protected destination: Subscriber, private nextSources: Array>) { super(destination); } - notifyError(error: any, innerSub: InnerSubscriber): void { + notifyError(): void { this.subscribeToNextSource(); } - notifyComplete(innerSub: InnerSubscriber): void { + notifyComplete(): void { this.subscribeToNextSource(); } @@ -159,10 +158,10 @@ class OnErrorResumeNextSubscriber extends OuterSubscriber { private subscribeToNextSource(): void { const next = this.nextSources.shift(); if (!!next) { - const innerSubscriber = new InnerSubscriber(this, undefined, undefined); + const innerSubscriber = new SimpleInnerSubscriber(this); const destination = this.destination as Subscription; destination.add(innerSubscriber); - const innerSubscription = subscribeToResult(this, next, undefined, undefined, innerSubscriber); + const innerSubscription = innerSubscribe(next, innerSubscriber); // The returned subscription will usually be the subscriber that was // passed. However, interop subscribers will be wrapped and for // unsubscriptions to chain correctly, the wrapper needs to be added, too. diff --git a/src/internal/operators/repeatWhen.ts b/src/internal/operators/repeatWhen.ts index ecdbe3112e..744ec4b29b 100644 --- a/src/internal/operators/repeatWhen.ts +++ b/src/internal/operators/repeatWhen.ts @@ -4,11 +4,8 @@ import { Observable } from '../Observable'; import { Subject } from '../Subject'; import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; - import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /** * Returns an Observable that mirrors the source Observable with the exception of a `complete`. If the source @@ -58,11 +55,11 @@ class RepeatWhenOperator implements Operator { * @ignore * @extends {Ignored} */ -class RepeatWhenSubscriber extends OuterSubscriber { +class RepeatWhenSubscriber extends SimpleOuterSubscriber { - private notifications: Subject; - private retries: Observable; - private retriesSubscription: Subscription; + private notifications?: Subject; + private retries?: Observable; + private retriesSubscription?: Subscription; private sourceIsBeingSubscribedTo: boolean = true; constructor(destination: Subscriber, @@ -71,14 +68,12 @@ class RepeatWhenSubscriber extends OuterSubscriber { super(destination); } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(): void { this.sourceIsBeingSubscribedTo = true; this.source.subscribe(this); } - notifyComplete(innerSub: InnerSubscriber): void { + notifyComplete(): void { if (this.sourceIsBeingSubscribedTo === false) { return super.complete(); } @@ -96,7 +91,7 @@ class RepeatWhenSubscriber extends OuterSubscriber { } this._unsubscribeAndRecycle(); - this.notifications.next(); + this.notifications!.next(undefined); } } @@ -105,20 +100,20 @@ class RepeatWhenSubscriber extends OuterSubscriber { const { notifications, retriesSubscription } = this; if (notifications) { notifications.unsubscribe(); - this.notifications = null; + this.notifications = undefined; } if (retriesSubscription) { retriesSubscription.unsubscribe(); - this.retriesSubscription = null; + this.retriesSubscription = undefined; } - this.retries = null; + this.retries = undefined; } /** @deprecated This is an internal implementation detail, do not use. */ _unsubscribeAndRecycle(): Subscriber { const { _unsubscribe } = this; - this._unsubscribe = null; + this._unsubscribe = null!; super._unsubscribeAndRecycle(); this._unsubscribe = _unsubscribe; @@ -135,6 +130,6 @@ class RepeatWhenSubscriber extends OuterSubscriber { return super.complete(); } this.retries = retries; - this.retriesSubscription = subscribeToResult(this, retries); + this.retriesSubscription = innerSubscribe(retries, new SimpleInnerSubscriber(this)); } } diff --git a/src/internal/operators/retryWhen.ts b/src/internal/operators/retryWhen.ts index 325742dcb6..3be9fed715 100644 --- a/src/internal/operators/retryWhen.ts +++ b/src/internal/operators/retryWhen.ts @@ -4,11 +4,8 @@ import { Observable } from '../Observable'; import { Subject } from '../Subject'; import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; - import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /** * Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable @@ -43,11 +40,11 @@ class RetryWhenOperator implements Operator { * @ignore * @extends {Ignored} */ -class RetryWhenSubscriber extends OuterSubscriber { +class RetryWhenSubscriber extends SimpleOuterSubscriber { - private errors: Subject; - private retries: Observable; - private retriesSubscription: Subscription; + private errors?: Subject; + private retries?: Observable; + private retriesSubscription?: Subscription; constructor(destination: Subscriber, private notifier: (errors: Observable) => Observable, @@ -70,10 +67,10 @@ class RetryWhenSubscriber extends OuterSubscriber { } catch (e) { return super.error(e); } - retriesSubscription = subscribeToResult(this, retries); + retriesSubscription = innerSubscribe(retries, new SimpleInnerSubscriber(this)); } else { - this.errors = null; - this.retriesSubscription = null; + this.errors = undefined; + this.retriesSubscription = undefined; } this._unsubscribeAndRecycle(); @@ -82,7 +79,7 @@ class RetryWhenSubscriber extends OuterSubscriber { this.retries = retries; this.retriesSubscription = retriesSubscription; - errors.next(err); + errors!.next(err); } } @@ -91,21 +88,19 @@ class RetryWhenSubscriber extends OuterSubscriber { const { errors, retriesSubscription } = this; if (errors) { errors.unsubscribe(); - this.errors = null; + this.errors = undefined; } if (retriesSubscription) { retriesSubscription.unsubscribe(); - this.retriesSubscription = null; + this.retriesSubscription = undefined; } - this.retries = null; + this.retries = undefined; } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(): void { const { _unsubscribe } = this; - this._unsubscribe = null; + this._unsubscribe = null!; this._unsubscribeAndRecycle(); this._unsubscribe = _unsubscribe; diff --git a/src/internal/operators/sample.ts b/src/internal/operators/sample.ts index 5665dec1ba..1a26baff32 100644 --- a/src/internal/operators/sample.ts +++ b/src/internal/operators/sample.ts @@ -1,11 +1,9 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /** * Emits the most recently emitted value from the source Observable whenever @@ -58,7 +56,7 @@ class SampleOperator implements Operator { call(subscriber: Subscriber, source: any): TeardownLogic { const sampleSubscriber = new SampleSubscriber(subscriber); const subscription = source.subscribe(sampleSubscriber); - subscription.add(subscribeToResult(sampleSubscriber, this.notifier)); + subscription.add(innerSubscribe(this.notifier, new SimpleInnerSubscriber(sampleSubscriber))); return subscription; } } @@ -68,8 +66,8 @@ class SampleOperator implements Operator { * @ignore * @extends {Ignored} */ -class SampleSubscriber extends OuterSubscriber { - private value: T; +class SampleSubscriber extends SimpleOuterSubscriber { + private value?: T; private hasValue: boolean = false; protected _next(value: T) { @@ -77,9 +75,7 @@ class SampleSubscriber extends OuterSubscriber { this.hasValue = true; } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(): void { this.emitValue(); } @@ -90,7 +86,7 @@ class SampleSubscriber extends OuterSubscriber { emitValue() { if (this.hasValue) { this.hasValue = false; - this.destination.next(this.value); + this.destination.next!(this.value!); } } } diff --git a/src/internal/operators/skipUntil.ts b/src/internal/operators/skipUntil.ts index 8310adca58..66e2531842 100644 --- a/src/internal/operators/skipUntil.ts +++ b/src/internal/operators/skipUntil.ts @@ -1,11 +1,9 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic, ObservableInput } from '../types'; import { Subscription } from '../Subscription'; +import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe'; /** * Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item. @@ -64,17 +62,17 @@ class SkipUntilOperator implements Operator { * @ignore * @extends {Ignored} */ -class SkipUntilSubscriber extends OuterSubscriber { +class SkipUntilSubscriber extends SimpleOuterSubscriber { private hasValue: boolean = false; - private innerSubscription: Subscription; + private innerSubscription?: Subscription; constructor(destination: Subscriber, notifier: ObservableInput) { super(destination); - const innerSubscriber = new InnerSubscriber(this, undefined, undefined); + const innerSubscriber = new SimpleInnerSubscriber(this); this.add(innerSubscriber); this.innerSubscription = innerSubscriber; - const innerSubscription = subscribeToResult(this, notifier, undefined, undefined, innerSubscriber); + const innerSubscription = innerSubscribe(notifier, innerSubscriber); // The returned subscription will usually be the subscriber that was // passed. However, interop subscribers will be wrapped and for // unsubscriptions to chain correctly, the wrapper needs to be added, too. @@ -90,9 +88,7 @@ class SkipUntilSubscriber extends OuterSubscriber { } } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(): void { this.hasValue = true; if (this.innerSubscription) { this.innerSubscription.unsubscribe(); diff --git a/src/internal/operators/switchMap.ts b/src/internal/operators/switchMap.ts index c3c831ad88..cbedac438d 100644 --- a/src/internal/operators/switchMap.ts +++ b/src/internal/operators/switchMap.ts @@ -2,12 +2,10 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; import { map } from './map'; import { from } from '../observable/from'; +import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe'; /* tslint:disable:max-line-length */ export function switchMap>(project: (value: T, index: number) => O): OperatorFunction>; @@ -107,9 +105,9 @@ class SwitchMapOperator implements Operator { * @ignore * @extends {Ignored} */ -class SwitchMapSubscriber extends OuterSubscriber { - private index: number = 0; - private innerSubscription: Subscription; +class SwitchMapSubscriber extends SimpleOuterSubscriber { + private index = 0; + private innerSubscription?: Subscription; constructor(destination: Subscriber, private project: (value: T, index: number) => ObservableInput) { @@ -122,21 +120,21 @@ class SwitchMapSubscriber extends OuterSubscriber { try { result = this.project(value, index); } catch (error) { - this.destination.error(error); + this.destination.error!(error); return; } - this._innerSub(result, value, index); + this._innerSub(result); } - private _innerSub(result: ObservableInput, value: T, index: number) { + private _innerSub(result: ObservableInput) { const innerSubscription = this.innerSubscription; if (innerSubscription) { innerSubscription.unsubscribe(); } - const innerSubscriber = new InnerSubscriber(this, value, index); + const innerSubscriber = new SimpleInnerSubscriber(this); const destination = this.destination as Subscription; destination.add(innerSubscriber); - this.innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber); + this.innerSubscription = innerSubscribe(result, innerSubscriber); // The returned subscription will usually be the subscriber that was // passed. However, interop subscribers will be wrapped and for // unsubscriptions to chain correctly, the wrapper needs to be added, too. @@ -154,21 +152,17 @@ class SwitchMapSubscriber extends OuterSubscriber { } protected _unsubscribe() { - this.innerSubscription = null; + this.innerSubscription = undefined; } - notifyComplete(innerSub: Subscription): void { - const destination = this.destination as Subscription; - destination.remove(innerSub); - this.innerSubscription = null; + notifyComplete(): void { + this.innerSubscription = undefined; if (this.isStopped) { super._complete(); } } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - this.destination.next(innerValue); + notifyNext(innerValue: R): void { + this.destination.next!(innerValue); } } diff --git a/src/internal/operators/switchMapTo.ts b/src/internal/operators/switchMapTo.ts index 84424744d4..dc71e4408a 100644 --- a/src/internal/operators/switchMapTo.ts +++ b/src/internal/operators/switchMapTo.ts @@ -1,10 +1,3 @@ -import { Operator } from '../Operator'; -import { Observable } from '../Observable'; -import { Subscriber } from '../Subscriber'; -import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction } from '../types'; import { switchMap } from './switchMap'; diff --git a/src/internal/operators/takeUntil.ts b/src/internal/operators/takeUntil.ts index 84ea1c7fbf..898dc9aef8 100644 --- a/src/internal/operators/takeUntil.ts +++ b/src/internal/operators/takeUntil.ts @@ -2,11 +2,8 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; - import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { innerSubscribe, SimpleInnerSubscriber, SimpleOuterSubscriber } from '../innerSubscribe'; /** * Emits the values emitted by the source Observable until a `notifier` @@ -58,7 +55,7 @@ class TakeUntilOperator implements Operator { call(subscriber: Subscriber, source: any): TeardownLogic { const takeUntilSubscriber = new TakeUntilSubscriber(subscriber); - const notifierSubscription = subscribeToResult(takeUntilSubscriber, this.notifier); + const notifierSubscription = innerSubscribe(this.notifier, new SimpleInnerSubscriber(takeUntilSubscriber)); if (notifierSubscription && !takeUntilSubscriber.seenValue) { takeUntilSubscriber.add(notifierSubscription); return source.subscribe(takeUntilSubscriber); @@ -72,16 +69,14 @@ class TakeUntilOperator implements Operator { * @ignore * @extends {Ignored} */ -class TakeUntilSubscriber extends OuterSubscriber { +class TakeUntilSubscriber extends SimpleOuterSubscriber { seenValue = false; constructor(destination: Subscriber, ) { super(destination); } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(): void { this.seenValue = true; this.complete(); } diff --git a/src/internal/operators/throttle.ts b/src/internal/operators/throttle.ts index e855ff0a8a..380e72e63b 100644 --- a/src/internal/operators/throttle.ts +++ b/src/internal/operators/throttle.ts @@ -3,11 +3,8 @@ import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; - import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from '../types'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; export interface ThrottleConfig { leading?: boolean; @@ -67,7 +64,7 @@ export const defaultThrottleConfig: ThrottleConfig = { */ export function throttle(durationSelector: (value: T) => SubscribableOrPromise, config: ThrottleConfig = defaultThrottleConfig): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new ThrottleOperator(durationSelector, config.leading, config.trailing)); + return (source: Observable) => source.lift(new ThrottleOperator(durationSelector, !!config.leading, !!config.trailing)); } class ThrottleOperator implements Operator { @@ -88,9 +85,9 @@ class ThrottleOperator implements Operator { * @ignore * @extends {Ignored} */ -class ThrottleSubscriber extends OuterSubscriber { - private _throttled: Subscription; - private _sendValue: T; +class ThrottleSubscriber extends SimpleOuterSubscriber { + private _throttled?: Subscription; + private _sendValue?: T; private _hasValue = false; constructor(protected destination: Subscriber, @@ -117,20 +114,20 @@ class ThrottleSubscriber extends OuterSubscriber { const { _hasValue, _sendValue } = this; if (_hasValue) { this.destination.next(_sendValue); - this.throttle(_sendValue); + this.throttle(_sendValue!); } this._hasValue = false; - this._sendValue = null; + this._sendValue = undefined; } private throttle(value: T): void { const duration = this.tryDurationSelector(value); if (!!duration) { - this.add(this._throttled = subscribeToResult(this, duration)); + this.add(this._throttled = innerSubscribe(duration, new SimpleInnerSubscriber(this))); } } - private tryDurationSelector(value: T): SubscribableOrPromise { + private tryDurationSelector(value: T): SubscribableOrPromise | null { try { return this.durationSelector(value); } catch (err) { @@ -144,16 +141,14 @@ class ThrottleSubscriber extends OuterSubscriber { if (_throttled) { _throttled.unsubscribe(); } - this._throttled = null; + this._throttled = undefined; if (_trailing) { this.send(); } } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(): void { this.throttlingDone(); } diff --git a/src/internal/operators/timeoutWith.ts b/src/internal/operators/timeoutWith.ts index f984e82658..9578851720 100644 --- a/src/internal/operators/timeoutWith.ts +++ b/src/internal/operators/timeoutWith.ts @@ -3,9 +3,8 @@ import { Subscriber } from '../Subscriber'; import { async } from '../scheduler/async'; import { Observable } from '../Observable'; import { isDate } from '../util/isDate'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; -import { ObservableInput, OperatorFunction, MonoTypeOperatorFunction, SchedulerAction, SchedulerLike, TeardownLogic } from '../types'; +import { ObservableInput, OperatorFunction, SchedulerAction, SchedulerLike, TeardownLogic } from '../types'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /* tslint:disable:max-line-length */ export function timeoutWith(due: number | Date, withObservable: ObservableInput, scheduler?: SchedulerLike): OperatorFunction; @@ -93,9 +92,9 @@ class TimeoutWithOperator implements Operator { * @ignore * @extends {Ignored} */ -class TimeoutWithSubscriber extends OuterSubscriber { +class TimeoutWithSubscriber extends SimpleOuterSubscriber { - private action: SchedulerAction> = null; + private action?: SchedulerAction>; constructor(destination: Subscriber, private absoluteTimeout: boolean, @@ -108,8 +107,8 @@ class TimeoutWithSubscriber extends OuterSubscriber { private static dispatchTimeout(subscriber: TimeoutWithSubscriber): void { const { withObservable } = subscriber; - ( subscriber)._unsubscribeAndRecycle(); - subscriber.add(subscribeToResult(subscriber, withObservable)); + subscriber._unsubscribeAndRecycle(); + subscriber.add(innerSubscribe(withObservable, new SimpleInnerSubscriber(subscriber))); } private scheduleTimeout(): void { @@ -123,7 +122,7 @@ class TimeoutWithSubscriber extends OuterSubscriber { this.action = (>> action.schedule(this, this.waitFor)); } else { this.add(this.action = (>> this.scheduler.schedule>( - TimeoutWithSubscriber.dispatchTimeout, this.waitFor, this + TimeoutWithSubscriber.dispatchTimeout as any, this.waitFor, this ))); } } @@ -137,8 +136,8 @@ class TimeoutWithSubscriber extends OuterSubscriber { /** @deprecated This is an internal implementation detail, do not use. */ _unsubscribe() { - this.action = null; - this.scheduler = null; - this.withObservable = null; + this.action = undefined; + this.scheduler = null!; + this.withObservable = null!; } } diff --git a/src/internal/operators/window.ts b/src/internal/operators/window.ts index f373dbc399..48390a3218 100644 --- a/src/internal/operators/window.ts +++ b/src/internal/operators/window.ts @@ -2,10 +2,8 @@ import { Observable } from '../Observable'; import { OperatorFunction } from '../types'; import { Subject } from '../Subject'; import { Subscriber } from '../Subscriber'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; import { Operator } from '../Operator'; +import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /** * Branch out the source Observable values as a nested Observable whenever @@ -65,7 +63,7 @@ class WindowOperator implements Operator> { const windowSubscriber = new WindowSubscriber(subscriber); const sourceSubscription = source.subscribe(windowSubscriber); if (!sourceSubscription.closed) { - windowSubscriber.add(subscribeToResult(windowSubscriber, this.windowBoundaries)); + windowSubscriber.add(innerSubscribe(this.windowBoundaries, new SimpleInnerSubscriber(windowSubscriber))); } return sourceSubscription; } @@ -76,7 +74,7 @@ class WindowOperator implements Operator> { * @ignore * @extends {Ignored} */ -class WindowSubscriber extends OuterSubscriber { +class WindowSubscriber extends SimpleOuterSubscriber { private window: Subject = new Subject(); @@ -85,17 +83,15 @@ class WindowSubscriber extends OuterSubscriber { destination.next(this.window); } - notifyNext(outerValue: T, innerValue: any, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(): void { this.openWindow(); } - notifyError(error: any, innerSub: InnerSubscriber): void { + notifyError(error: any): void { this._error(error); } - notifyComplete(innerSub: InnerSubscriber): void { + notifyComplete(): void { this._complete(); } @@ -105,17 +101,17 @@ class WindowSubscriber extends OuterSubscriber { protected _error(err: any): void { this.window.error(err); - this.destination.error(err); + this.destination.error!(err); } protected _complete(): void { this.window.complete(); - this.destination.complete(); + this.destination.complete!(); } /** @deprecated This is an internal implementation detail, do not use. */ _unsubscribe() { - this.window = null; + this.window = null!; } private openWindow(): void { @@ -125,6 +121,6 @@ class WindowSubscriber extends OuterSubscriber { } const destination = this.destination; const newWindow = this.window = new Subject(); - destination.next(newWindow); + destination.next!(newWindow); } } diff --git a/src/internal/operators/windowWhen.ts b/src/internal/operators/windowWhen.ts index aa0cf2cf67..67e0b16b1c 100644 --- a/src/internal/operators/windowWhen.ts +++ b/src/internal/operators/windowWhen.ts @@ -74,8 +74,8 @@ class WindowOperator implements Operator> { * @extends {Ignored} */ class WindowSubscriber extends OuterSubscriber { - private window: Subject; - private closingNotification: Subscription; + private window?: Subject; + private closingNotification?: Subscription; constructor(protected destination: Subscriber>, private closingSelector: () => Observable) { @@ -83,13 +83,13 @@ class WindowSubscriber extends OuterSubscriber { this.openWindow(); } - notifyNext(outerValue: T, innerValue: any, - outerIndex: number, innerIndex: number, + notifyNext(_outerValue: T, _innerValue: any, + _outerIndex: number, _innerIndex: number, innerSub: InnerSubscriber): void { this.openWindow(innerSub); } - notifyError(error: any, innerSub: InnerSubscriber): void { + notifyError(error: any): void { this._error(error); } @@ -98,17 +98,17 @@ class WindowSubscriber extends OuterSubscriber { } protected _next(value: T): void { - this.window.next(value); + this.window!.next(value); } protected _error(err: any): void { - this.window.error(err); + this.window!.error(err); this.destination.error(err); this.unsubscribeClosingNotification(); } protected _complete(): void { - this.window.complete(); + this.window!.complete(); this.destination.complete(); this.unsubscribeClosingNotification(); } @@ -119,7 +119,7 @@ class WindowSubscriber extends OuterSubscriber { } } - private openWindow(innerSub: InnerSubscriber = null): void { + private openWindow(innerSub: InnerSubscriber | null = null): void { if (innerSub) { this.remove(innerSub); innerSub.unsubscribe(); diff --git a/src/internal/operators/withLatestFrom.ts b/src/internal/operators/withLatestFrom.ts index 85586b34d1..007b19419e 100644 --- a/src/internal/operators/withLatestFrom.ts +++ b/src/internal/operators/withLatestFrom.ts @@ -111,13 +111,12 @@ class WithLatestFromSubscriber extends OuterSubscriber { for (let i = 0; i < len; i++) { let observable = observables[i]; - this.add(subscribeToResult(this, observable, observable, i)); + this.add(subscribeToResult(this, observable, undefined, i)); } } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + notifyNext(_outerValue: T, innerValue: R, + outerIndex: number): void { this.values[outerIndex] = innerValue; const toRespond = this.toRespond; if (toRespond.length > 0) { @@ -138,7 +137,7 @@ class WithLatestFromSubscriber extends OuterSubscriber { if (this.project) { this._tryProject(args); } else { - this.destination.next(args); + this.destination.next!(args); } } } @@ -146,11 +145,11 @@ class WithLatestFromSubscriber extends OuterSubscriber { private _tryProject(args: any[]) { let result: any; try { - result = this.project.apply(this, args); + result = this.project!.apply(this, args); } catch (err) { - this.destination.error(err); + this.destination.error!(err); return; } - this.destination.next(result); + this.destination.next!(result); } }