Skip to content

Commit

Permalink
refactor: Improve memory pressure (#5613)
Browse files Browse the repository at this point in the history
this is the duplicate of #5610, it is refactoring to ensure outer values are not retained when they do not have to be. It needs to be done in a separate PR because the branches diverge just enough to require it. This PR also has some mild, internal type fixes.
  • Loading branch information
benlesh committed Jul 31, 2020
1 parent 075af28 commit 10cc8a6
Show file tree
Hide file tree
Showing 30 changed files with 403 additions and 396 deletions.
114 changes: 114 additions & 0 deletions src/internal/innerSubscribe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/** @prettier */
import { Subscription } from './Subscription';
import { Subscriber } from './Subscriber';
import { Observable } from './Observable';
import { subscribeTo } from './util/subscribeTo';

interface SimpleOuterSubscriberLike<T> {
/**
* A handler for inner next notifications from the inner subscription
* @param innerValue the value nexted by the inner producer
*/
notifyNext(innerValue: T): void;
/**
* A handler for inner error notifications from the inner subscription
* @param err the error from the inner producer
*/
notifyError(err: any): void;
/**
* A handler for inner complete notifications from the inner subscription.
*/
notifyComplete(): void;
}

export class SimpleInnerSubscriber<T> extends Subscriber<T> {
constructor(private parent: SimpleOuterSubscriberLike<any>) {
super();
}

protected _next(value: T): void {
this.parent.notifyNext(value);
}

protected _error(error: any): void {
this.parent.notifyError(error);
this.unsubscribe();
}

protected _complete(): void {
this.parent.notifyComplete();
this.unsubscribe();
}
}

export class ComplexInnerSubscriber<T, R> extends Subscriber<R> {
constructor(private parent: ComplexOuterSubscriber<T, R>, public outerValue: T, public outerIndex: number) {
super();
}

protected _next(value: R): void {
this.parent.notifyNext(this.outerValue, value, this.outerIndex, this);
}

protected _error(error: any): void {
this.parent.notifyError(error);
this.unsubscribe();
}

protected _complete(): void {
this.parent.notifyComplete(this);
this.unsubscribe();
}
}

export class SimpleOuterSubscriber<T, R> extends Subscriber<T> implements SimpleOuterSubscriberLike<R> {
notifyNext(innerValue: R): void {
this.destination.next(innerValue);
}

notifyError(err: any): void {
this.destination.error(err);
}

notifyComplete(): void {
this.destination.complete();
}
}

/**
* DO NOT USE (formerly "OuterSubscriber")
* TODO: We want to refactor this and remove it. It is retaining values it shouldn't for long
* periods of time.
*/
export class ComplexOuterSubscriber<T, R> extends Subscriber<T> {
/**
* @param _outerValue Used by: bufferToggle, delayWhen, windowToggle
* @param innerValue Used by: subclass default, combineLatest, race, bufferToggle, windowToggle, withLatestFrom
* @param _outerIndex Used by: combineLatest, race, withLatestFrom
* @param _innerSub Used by: delayWhen
*/
notifyNext(_outerValue: T, innerValue: R, _outerIndex: number, _innerSub: ComplexInnerSubscriber<T, R>): void {
this.destination.next(innerValue);
}

notifyError(error: any): void {
this.destination.error(error);
}

/**
* @param _innerSub Used by: race, bufferToggle, delayWhen, windowToggle, windowWhen
*/
notifyComplete(_innerSub: ComplexInnerSubscriber<T, R>): void {
this.destination.complete();
}
}

export function innerSubscribe(result: any, innerSubscriber: Subscriber<any>): Subscription | undefined {
if (innerSubscriber.closed) {
return undefined;
}
if (result instanceof Observable) {
return result.subscribe(innerSubscriber);
}
return subscribeTo(result)(innerSubscriber) as Subscription;
}
27 changes: 13 additions & 14 deletions src/internal/observable/combineLatest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ export function combineLatest<R>(...observables: Array<ObservableInput<any> | ((
export function combineLatest<O extends ObservableInput<any>, R>(
...observables: (O | ((...values: ObservedValueOf<O>[]) => R) | SchedulerLike)[]
): Observable<R> {
let resultSelector: (...values: Array<any>) => R = null;
let scheduler: SchedulerLike = null;
let resultSelector: ((...values: Array<any>) => R) | undefined = undefined;
let scheduler: SchedulerLike|undefined = undefined;

if (isScheduler(observables[observables.length - 1])) {
scheduler = observables.pop() as SchedulerLike;
Expand All @@ -243,7 +243,7 @@ export function combineLatest<O extends ObservableInput<any>, R>(
observables = observables[0] as any;
}

return fromArray(observables, scheduler).lift(new CombineLatestOperator<ObservedValueOf<O>, R>(resultSelector));
return fromArray(observables, scheduler).lift(new CombineLatestOperator(resultSelector));
}

export class CombineLatestOperator<T, R> implements Operator<T, R> {
Expand All @@ -264,7 +264,7 @@ export class CombineLatestSubscriber<T, R> extends OuterSubscriber<T, R> {
private active: number = 0;
private values: any[] = [];
private observables: any[] = [];
private toRespond: number;
private toRespond?: number;

constructor(destination: Subscriber<R>, private resultSelector?: (...values: Array<any>) => R) {
super(destination);
Expand All @@ -279,26 +279,25 @@ export class CombineLatestSubscriber<T, R> extends OuterSubscriber<T, R> {
const observables = this.observables;
const len = observables.length;
if (len === 0) {
this.destination.complete();
this.destination.complete!();
} else {
this.active = len;
this.toRespond = len;
for (let i = 0; i < len; i++) {
const observable = observables[i];
this.add(subscribeToResult(this, observable, observable, i));
this.add(subscribeToResult(this, observable, undefined, i));
}
}
}

notifyComplete(unused: Subscriber<R>): void {
if ((this.active -= 1) === 0) {
this.destination.complete();
this.destination.complete!();
}
}

notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
notifyNext(_outerValue: T, innerValue: R,
outerIndex: number): void {
const values = this.values;
const oldVal = values[outerIndex];
const toRespond = !this.toRespond
Expand All @@ -310,19 +309,19 @@ export class CombineLatestSubscriber<T, R> extends OuterSubscriber<T, R> {
if (this.resultSelector) {
this._tryResultSelector(values);
} else {
this.destination.next(values.slice());
this.destination.next!(values.slice());
}
}
}

private _tryResultSelector(values: any[]) {
let result: any;
try {
result = this.resultSelector.apply(this, values);
result = this.resultSelector!.apply(this, values);
} catch (err) {
this.destination.error(err);
this.destination.error!(err);
return;
}
this.destination.next(result);
this.destination.next!(result);
}
}
17 changes: 8 additions & 9 deletions src/internal/observable/race.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,24 +102,23 @@ export class RaceSubscriber<T> extends OuterSubscriber<T, T> {
const len = observables.length;

if (len === 0) {
this.destination.complete();
this.destination.complete!();
} else {
for (let i = 0; i < len && !this.hasFirst; i++) {
let observable = observables[i];
let subscription = subscribeToResult(this, observable, observable as any, i);
const observable = observables[i];
const subscription = subscribeToResult(this, observable, undefined, i)!;

if (this.subscriptions) {
this.subscriptions.push(subscription);
}
this.add(subscription);
}
this.observables = null;
this.observables = null!;
}
}

notifyNext(outerValue: T, innerValue: T,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, T>): void {
notifyNext(_outerValue: T, innerValue: T,
outerIndex: number): void {
if (!this.hasFirst) {
this.hasFirst = true;

Expand All @@ -132,9 +131,9 @@ export class RaceSubscriber<T> extends OuterSubscriber<T, T> {
}
}

this.subscriptions = null;
this.subscriptions = null!;
}

this.destination.next(innerValue);
this.destination.next!(innerValue);
}
}
Loading

0 comments on commit 10cc8a6

Please sign in to comment.