Skip to content

Commit

Permalink
fix(config): onStoppedNotification and onUnhandledError will now alwa…
Browse files Browse the repository at this point in the history
…ys async dispatch (#7344)

Resolves #7343
  • Loading branch information
benlesh committed Oct 27, 2023
1 parent 0bd47ea commit 8b9d0ef
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 94 deletions.
176 changes: 86 additions & 90 deletions packages/rxjs/spec/operators/share-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,12 @@ import {
} from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';
import { SinonSpy, spy } from 'sinon';
import { spy } from 'sinon';

const syncNotify = of(1);
const asapNotify = scheduled(syncNotify, asapScheduler);
const syncError = throwError(() => new Error());

function spyOnUnhandledError(fn: (spy: SinonSpy) => void): void {
const prevOnUnhandledError = config.onUnhandledError;

try {
const onUnhandledError = spy();
config.onUnhandledError = onUnhandledError;

fn(onUnhandledError);
} finally {
config.onUnhandledError = prevOnUnhandledError;
}
}

/** @test {share} */
describe('share', () => {
let rxTest: TestScheduler;
Expand Down Expand Up @@ -810,9 +797,22 @@ describe('share', () => {
});
});

it('should not reset on refCount 0 if reset notifier errors before emitting any value', () => {
spyOnUnhandledError((onUnhandledError) => {
describe('when config.onUnhandledError is set', () => {
afterEach(() => {
config.onUnhandledError = null;
});

it('should not reset on refCount 0 if reset notifier errors before emitting any value', (done) => {
const error = new Error();
let calls = 0;

config.onUnhandledError = spy((err) => {
calls++;
expect(err).to.equal(error);
if (calls === 2) {
done();
}
});

rxTest.run(({ hot, cold, expectObservable, expectSubscriptions }) => {
const source = hot(' ---1---2---3---4---(5 )---|');
Expand All @@ -830,17 +830,16 @@ describe('share', () => {
expectObservable(result, subscription).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

expect(onUnhandledError).to.have.been.calledTwice;
expect(onUnhandledError.getCall(0)).to.have.been.calledWithExactly(error);
expect(onUnhandledError.getCall(1)).to.have.been.calledWithExactly(error);
});
});

it('should not reset on error if reset notifier errors before emitting any value', () => {
spyOnUnhandledError((onUnhandledError) => {
it('should not reset on error if reset notifier errors before emitting any value', (done) => {
const error = new Error();

config.onUnhandledError = spy((err) => {
expect(err).to.equal(error);
done();
});

rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const source = cold(' ---1---2---# ');
const sourceSubs = ' ^----------! ';
Expand All @@ -856,89 +855,86 @@ describe('share', () => {
expectObservable(result, subscription).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

expect(onUnhandledError).to.have.been.calledOnce;
expect(onUnhandledError.getCall(0)).to.have.been.calledWithExactly(error);
});
});

it('should not reset on complete if reset notifier errors before emitting any value', () => {
spyOnUnhandledError((onUnhandledError) => {
const error = new Error();

rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const source = cold(' ---1---2---| ');
const sourceSubs = ' ^----------! ';
const expected = ' ---1---2------|';
const subscription = ' ^--------------';
const firstPause = cold(' -------|');
const reset = cold(' --# ', undefined, error);

const sharedSource = source.pipe(share({ resetOnComplete: () => reset, resetOnRefCountZero: false }), take(2));
it('should not reset on complete if reset notifier errors before emitting any value', (done) => {
const error = new Error();

const result = concat(sharedSource, firstPause, sharedSource);

expectObservable(result, subscription).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

expect(onUnhandledError).to.have.been.calledOnce;
expect(onUnhandledError.getCall(0)).to.have.been.calledWithExactly(error);
config.onUnhandledError = spy((err) => {
expect(err).to.equal(error);
done();
});
});

it('should not call "resetOnRefCountZero" on error', () => {
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const resetOnRefCountZero = spy(() => EMPTY);

const source = cold(' ---1---(2#) ');
// source: ' ---1---(2#) '
const sourceSubs = [
' ^------(! ) ',
// break the line, please
' -------(- )---^------(! ) ',
];
const expected = ' ---1---(2 )------1---(2#) ';
const subscription = ' ^------(- )----------(- ) ';
const firstPause = cold(' (- )---| ');
const reset = cold(' (- )-r ');
// reset: ' (- )-r'
const source = cold(' ---1---2---| ');
const sourceSubs = ' ^----------! ';
const expected = ' ---1---2------|';
const subscription = ' ^--------------';
const firstPause = cold(' -------|');
const reset = cold(' --# ', undefined, error);

const sharedSource = source.pipe(share({ resetOnError: () => reset, resetOnRefCountZero }));
const sharedSource = source.pipe(share({ resetOnComplete: () => reset, resetOnRefCountZero: false }), take(2));

const result = concat(sharedSource.pipe(onErrorResumeNextWith(firstPause)), sharedSource);
const result = concat(sharedSource, firstPause, sharedSource);

expectObservable(result, subscription).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
expect(resetOnRefCountZero).to.not.have.been.called;
});
});
});

it('should not call "resetOnRefCountZero" on complete', () => {
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const resetOnRefCountZero = spy(() => EMPTY);

const source = cold(' ---1---(2|) ');
// source: ' ---1---(2|) '
const sourceSubs = [
' ^------(! ) ',
// break the line, please
' -------(- )---^------(! ) ',
];
const expected = ' ---1---(2 )------1---(2|) ';
const subscription = ' ^------(- )----------(- ) ';
const firstPause = cold(' (- )---| ');
const reset = cold(' (- )-r ');
// reset: ' (- )-r'

const sharedSource = source.pipe(share({ resetOnComplete: () => reset, resetOnRefCountZero }));

const result = concat(sharedSource, firstPause, sharedSource);
it('should not call "resetOnRefCountZero" on error', () => {
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const resetOnRefCountZero = spy(() => EMPTY);

const source = cold(' ---1---(2#) ');
// source: ' ---1---(2#) '
const sourceSubs = [
' ^------(! ) ',
// break the line, please
' -------(- )---^------(! ) ',
];
const expected = ' ---1---(2 )------1---(2#) ';
const subscription = ' ^------(- )----------(- ) ';
const firstPause = cold(' (- )---| ');
const reset = cold(' (- )-r ');
// reset: ' (- )-r'

const sharedSource = source.pipe(share({ resetOnError: () => reset, resetOnRefCountZero }));

const result = concat(sharedSource.pipe(onErrorResumeNextWith(firstPause)), sharedSource);

expectObservable(result, subscription).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
expect(resetOnRefCountZero).to.not.have.been.called;
});
});

expectObservable(result, subscription).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
expect(resetOnRefCountZero).to.not.have.been.called;
});
it('should not call "resetOnRefCountZero" on complete', () => {
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const resetOnRefCountZero = spy(() => EMPTY);

const source = cold(' ---1---(2|) ');
// source: ' ---1---(2|) '
const sourceSubs = [
' ^------(! ) ',
// break the line, please
' -------(- )---^------(! ) ',
];
const expected = ' ---1---(2 )------1---(2|) ';
const subscription = ' ^------(- )----------(- ) ';
const firstPause = cold(' (- )---| ');
const reset = cold(' (- )-r ');
// reset: ' (- )-r'

const sharedSource = source.pipe(share({ resetOnComplete: () => reset, resetOnRefCountZero }));

const result = concat(sharedSource, firstPause, sharedSource);

expectObservable(result, subscription).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
expect(resetOnRefCountZero).to.not.have.been.called;
});
});
});
3 changes: 1 addition & 2 deletions packages/rxjs/src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { Subscription } from './Subscription';
import { config } from './config';
import { reportUnhandledError } from './util/reportUnhandledError';
import { nextNotification, errorNotification, COMPLETE_NOTIFICATION } from './NotificationFactories';
import { timeoutProvider } from './scheduler/timeoutProvider';

export interface SubscriberOverrides<T> {
/**
Expand Down Expand Up @@ -267,7 +266,7 @@ function createSafeObserver<T>(observerOrNext?: Partial<Observer<T>> | ((value:
*/
function handleStoppedNotification(notification: ObservableNotification<any>, subscriber: Subscriber<any>) {
const { onStoppedNotification } = config;
onStoppedNotification && timeoutProvider.setTimeout(() => onStoppedNotification(notification, subscriber));
onStoppedNotification && setTimeout(() => onStoppedNotification(notification, subscriber));
}

function hasAddAndUnsubscribe(value: any): value is Subscription {
Expand Down
3 changes: 1 addition & 2 deletions packages/rxjs/src/internal/util/reportUnhandledError.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { config } from '../config';
import { timeoutProvider } from '../scheduler/timeoutProvider';

/**
* Handles an error on another job either with the user-configured {@link onUnhandledError},
Expand All @@ -11,7 +10,7 @@ import { timeoutProvider } from '../scheduler/timeoutProvider';
* @param err the error to report
*/
export function reportUnhandledError(err: any) {
timeoutProvider.setTimeout(() => {
setTimeout(() => {
const { onUnhandledError } = config;
if (onUnhandledError) {
// Execute the user-configured error handler.
Expand Down

0 comments on commit 8b9d0ef

Please sign in to comment.