Skip to content

Commit

Permalink
fix(TestScheduler): explicit unsubscribe works properly with toEqual (#…
Browse files Browse the repository at this point in the history
…7403)

* fix(TestScheduler): explicit unsubscribe works properly with toEqual

Fixes #7399

* chore: clean up tests, add expected observable test
  • Loading branch information
benlesh committed Dec 12, 2023
1 parent 480c6a2 commit ad54bd8
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 28 deletions.
27 changes: 27 additions & 0 deletions packages/rxjs/spec/schedulers/TestScheduler-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -707,5 +707,32 @@ describe('TestScheduler', () => {
});
});
});

it('should unsubscribe source with toBe', () => {
const testScheduler = new TestScheduler(assertDeepEquals);
let disposed = false;
testScheduler.run(({ expectObservable }) => {
expectObservable(new Observable((_) => () => (disposed = true)), '^!').toBe('');
});
expect(disposed).to.be.true;
});

it('should unsubscribe source with toEqual', () => {
const testScheduler = new TestScheduler(assertDeepEquals);
let disposed = false;
testScheduler.run(({ cold, expectObservable }) => {
expectObservable(new Observable((_) => () => (disposed = true)), '^!').toEqual(cold(''));
});
expect(disposed).to.be.true;
});

it('should unsubscribe the expected observable with toEqual', () => {
const testScheduler = new TestScheduler(assertDeepEquals);
let disposed = false;
testScheduler.run(({ cold, expectObservable }) => {
expectObservable(cold(''), '^!').toEqual(new Observable((_) => () => (disposed = true)));
});
expect(disposed).to.be.true;
});
});
});
60 changes: 32 additions & 28 deletions packages/rxjs/src/internal/testing/TestScheduler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Subscription} from '../Observable.js';
import { Subscription } from '../Observable.js';
import { Observable, COMPLETE_NOTIFICATION, errorNotification, nextNotification } from '../Observable.js';
import { ColdObservable } from './ColdObservable.js';
import { HotObservable } from './HotObservable.js';
Expand Down Expand Up @@ -135,22 +135,24 @@ export class TestScheduler extends VirtualTimeScheduler {
const subscriptionParsed = TestScheduler.parseMarblesAsSubscriptions(subscriptionMarbles, this.runMode);
const subscriptionFrame = subscriptionParsed.subscribedFrame === Infinity ? 0 : subscriptionParsed.subscribedFrame;
const unsubscriptionFrame = subscriptionParsed.unsubscribedFrame;
let subscription: Subscription;
const subscription = new Subscription();

this.schedule(() => {
subscription = observable.subscribe({
next: (x) => {
// Support Observable-of-Observables
const value = x instanceof Observable ? this.materializeInnerObservable(x, this.frame) : x;
actual.push({ frame: this.frame, notification: nextNotification(value) });
},
error: (error) => {
actual.push({ frame: this.frame, notification: errorNotification(error) });
},
complete: () => {
actual.push({ frame: this.frame, notification: COMPLETE_NOTIFICATION });
},
});
subscription.add(
observable.subscribe({
next: (x) => {
// Support Observable-of-Observables
const value = x instanceof Observable ? this.materializeInnerObservable(x, this.frame) : x;
actual.push({ frame: this.frame, notification: nextNotification(value) });
},
error: (error) => {
actual.push({ frame: this.frame, notification: errorNotification(error) });
},
complete: () => {
actual.push({ frame: this.frame, notification: COMPLETE_NOTIFICATION });
},
})
);
}, subscriptionFrame);

if (unsubscriptionFrame !== Infinity) {
Expand All @@ -169,19 +171,21 @@ export class TestScheduler extends VirtualTimeScheduler {
flushTest.ready = true;
flushTest.expected = [];
this.schedule(() => {
subscription = other.subscribe({
next: (x) => {
// Support Observable-of-Observables
const value = x instanceof Observable ? this.materializeInnerObservable(x, this.frame) : x;
flushTest.expected!.push({ frame: this.frame, notification: nextNotification(value) });
},
error: (error) => {
flushTest.expected!.push({ frame: this.frame, notification: errorNotification(error) });
},
complete: () => {
flushTest.expected!.push({ frame: this.frame, notification: COMPLETE_NOTIFICATION });
},
});
subscription.add(
other.subscribe({
next: (x) => {
// Support Observable-of-Observables
const value = x instanceof Observable ? this.materializeInnerObservable(x, this.frame) : x;
flushTest.expected!.push({ frame: this.frame, notification: nextNotification(value) });
},
error: (error) => {
flushTest.expected!.push({ frame: this.frame, notification: errorNotification(error) });
},
complete: () => {
flushTest.expected!.push({ frame: this.frame, notification: COMPLETE_NOTIFICATION });
},
})
);
}, subscriptionFrame);
},
};
Expand Down

0 comments on commit ad54bd8

Please sign in to comment.