From c408acda16d654682596822c118c31a814dae5c9 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 21 Nov 2023 14:02:51 -0600 Subject: [PATCH] feat(webSocket): now allows input and output typing to differ --- .../spec/observables/dom/webSocket-spec.ts | 269 +++++++++--------- .../observable/dom/WebSocketSubject.ts | 29 +- .../src/internal/observable/dom/webSocket.ts | 32 ++- 3 files changed, 170 insertions(+), 160 deletions(-) diff --git a/packages/rxjs/spec/observables/dom/webSocket-spec.ts b/packages/rxjs/spec/observables/dom/webSocket-spec.ts index 3aed1eb2f7..2023b6df7d 100644 --- a/packages/rxjs/spec/observables/dom/webSocket-spec.ts +++ b/packages/rxjs/spec/observables/dom/webSocket-spec.ts @@ -3,15 +3,13 @@ import * as sinon from 'sinon'; import { webSocket } from 'rxjs/webSocket'; import { map, retry, take, repeat, takeWhile } from 'rxjs/operators'; -const root: any = (typeof globalThis !== 'undefined' && globalThis) - || (typeof self !== 'undefined' && self) - || global; +const root: any = (typeof globalThis !== 'undefined' && globalThis) || (typeof self !== 'undefined' && self) || global; enum WebSocketState { CONNECTING = 0, OPEN = 1, CLOSING = 2, - CLOSED = 3 + CLOSED = 3, } /** @test {webSocket} */ @@ -43,7 +41,7 @@ describe('webSocket', () => { subject.next('ping'); - subject.subscribe(x => { + subject.subscribe((x) => { expect(x).to.equal('pong'); messageReceived = true; }); @@ -64,10 +62,7 @@ describe('webSocket', () => { const subject = webSocket('ws://mysocket'); const results: any[] = []; - subject.pipe( - map(x => x + '!'), - ) - .subscribe(x => results.push(x)); + subject.pipe(map((x) => x + '!')).subscribe((x) => results.push(x)); MockWebSocket.lastSocket.triggerMessage(JSON.stringify('ngconf 2018 bug')); @@ -79,7 +74,7 @@ describe('webSocket', () => { const results: string[] = []; const subject = webSocket('ws://mysocket'); - subject.subscribe(x => { + subject.subscribe((x) => { results.push(x); }); @@ -87,7 +82,7 @@ describe('webSocket', () => { socket.open(); - expected.forEach(x => { + expected.forEach((x) => { socket.triggerMessage(JSON.stringify(x)); }); @@ -100,7 +95,7 @@ describe('webSocket', () => { const expected = ['make', 'him', 'walk', 'the', 'plank']; const subject = webSocket('ws://mysocket'); - expected.forEach(x => { + expected.forEach((x) => { subject.next(x); }); @@ -226,7 +221,7 @@ describe('webSocket', () => { // Close socket after socket2 has opened socket2.open(); expect(socket2.readyState).to.equal(WebSocketState.OPEN); - socket.triggerClose({wasClean: true}); + socket.triggerClose({ wasClean: true }); expect(socket.readyState).to.equal(WebSocketState.CLOSED); expect(socket2.close).have.not.been.called; @@ -247,7 +242,7 @@ describe('webSocket', () => { sinon.spy(socket, 'close'); expect(socket.close).not.have.been.called; - subject.error({ code: 1337, reason: 'Too bad, so sad :('}); + subject.error({ code: 1337, reason: 'Too bad, so sad :(' }); expect(socket.close).have.been.calledWith(1337, 'Too bad, so sad :('); subject.unsubscribe(); @@ -310,7 +305,6 @@ describe('webSocket', () => { }); describe('with a config object', () => { - beforeEach(() => { setupMockWebSocket(); }); @@ -325,7 +319,7 @@ describe('webSocket', () => { subject.next('ping'); - subject.subscribe(x => { + subject.subscribe((x) => { expect(x).to.equal('pong'); messageReceived = true; }); @@ -345,7 +339,7 @@ describe('webSocket', () => { it('should take a protocol and set it properly on the web socket', () => { const subject = webSocket({ url: 'ws://mysocket', - protocol: 'someprotocol' + protocol: 'someprotocol', }); subject.subscribe(); @@ -359,7 +353,7 @@ describe('webSocket', () => { it('should take a binaryType and set it properly on the web socket', () => { const subject = webSocket({ url: 'ws://mysocket', - binaryType: 'blob' + binaryType: 'blob', }); subject.subscribe(); @@ -377,7 +371,7 @@ describe('webSocket', () => { url: 'ws://mysocket', deserializer: (e: any) => { return e.data + '!'; - } + }, }); subject.subscribe((x: any) => { @@ -400,14 +394,17 @@ describe('webSocket', () => { url: 'ws://mysocket', deserializer: (e: any) => { throw new Error('I am a bad error'); - } + }, }); - subject.subscribe({ next: (x: any) => { - expect(x).to.equal('this should not happen'); - }, error: (err: any) => { - expect(err).to.be.an('error', 'I am a bad error'); - } }); + subject.subscribe({ + next: (x: any) => { + expect(x).to.equal('this should not happen'); + }, + error: (err: any) => { + expect(err).to.be.an('error', 'I am a bad error'); + }, + }); const socket = MockWebSocket.lastSocket; socket.open(); @@ -424,8 +421,8 @@ describe('webSocket', () => { next(x: any) { calls++; expect(x).to.be.an('undefined'); - } - } + }, + }, }); subject.subscribe(); @@ -455,8 +452,8 @@ describe('webSocket', () => { closeObserver: { next(e: any) { closes.push(e); - } - } + }, + }, }); subject.subscribe(); @@ -468,9 +465,11 @@ describe('webSocket', () => { socket.triggerClose(expected[0]); expect(closes.length).to.equal(1); - subject.subscribe({ error: function (err) { - expect(err).to.equal(expected[1]); - } }); + subject.subscribe({ + error: function (err) { + expect(err).to.equal(expected[1]); + }, + }); socket = MockWebSocket.lastSocket; socket.open(); @@ -489,21 +488,23 @@ describe('webSocket', () => { url: 'bad_url', WebSocketCtor: (url: string, protocol?: string | string[]): WebSocket => { throw new Error(`connection refused`); - } + }, }); - subject.subscribe({ next: (x: any) => { - expect(x).to.equal('this should not happen'); - }, error: (err: any) => { - expect(err).to.be.an('error', 'connection refused'); - } }); + subject.subscribe({ + next: (x: any) => { + expect(x).to.equal('this should not happen'); + }, + error: (err: any) => { + expect(err).to.be.an('error', 'connection refused'); + }, + }); subject.unsubscribe(); }); }); describe('multiplex', () => { - beforeEach(() => { setupMockWebSocket(); }); @@ -514,20 +515,22 @@ describe('webSocket', () => { it('should be retryable', () => { const results = [] as string[]; - const subject = webSocket<{ name: string, value: string }>('ws://websocket'); + const subject = webSocket<{ sub: string } | { unsub: string }, { name: string; value: string }>('ws://websocket'); const source = subject.multiplex( () => ({ sub: 'foo' }), () => ({ unsub: 'foo' }), - value => value.name === 'foo' + (value) => value.name === 'foo' ); - source.pipe( - retry(1), - map(x => x.value), - take(2), - ).subscribe(x => { - results.push(x); - }); + source + .pipe( + retry(1), + map((x) => x.value), + take(2) + ) + .subscribe((x) => { + results.push(x); + }); const socket = MockWebSocket.lastSocket; socket.open(); @@ -549,19 +552,20 @@ describe('webSocket', () => { it('should be repeatable', () => { const results = [] as string[]; - const subject = webSocket<{ name: string, value: string }>('ws://websocket'); + const subject = webSocket<{ sub: string } | { unsub: string }, { name: string; value: string }>('ws://websocket'); + const source = subject.multiplex( () => ({ sub: 'foo' }), () => ({ unsub: 'foo' }), - value => value.name === 'foo' + (value) => value.name === 'foo' ); source .pipe( repeat(2), - map(x => x.value) + map((x) => x.value) ) - .subscribe(x => { + .subscribe((x) => { results.push(x); }); @@ -586,12 +590,12 @@ describe('webSocket', () => { }); it('should multiplex over the webSocket', () => { - const results = [] as Array<{ value: number, name: string }>; - const subject = webSocket<{ value: number, name: string }>('ws://websocket'); + const results = [] as Array<{ value: number; name: string }>; + const subject = webSocket<{ sub: string } | { unsub: string }, { value: number; name: string }>('ws://websocket'); const source = subject.multiplex( - () => ({ sub: 'foo'}), + () => ({ sub: 'foo' }), () => ({ unsub: 'foo' }), - value => value.name === 'foo' + (value) => value.name === 'foo' ); const sub = source.subscribe(function (x: any) { @@ -602,14 +606,16 @@ describe('webSocket', () => { expect(socket.lastMessageSent).to.deep.equal(JSON.stringify({ sub: 'foo' })); - [1, 2, 3, 4, 5].map((x: number) => { - return { - name: x % 3 === 0 ? 'bar' : 'foo', - value: x - }; - }).forEach((x: any) => { - socket.triggerMessage(JSON.stringify(x)); - }); + [1, 2, 3, 4, 5] + .map((x: number) => { + return { + name: x % 3 === 0 ? 'bar' : 'foo', + value: x, + }; + }) + .forEach((x: any) => { + socket.triggerMessage(JSON.stringify(x)); + }); expect(results).to.deep.equal([1, 2, 4, 5]); @@ -622,34 +628,34 @@ describe('webSocket', () => { }); it('should keep the same socket for multiple multiplex subscriptions', () => { - const socketSubject = webSocket({url: 'ws://mysocket'}); + const socketSubject = webSocket({ url: 'ws://mysocket' }); const results = [] as string[]; - const socketMessages = [ - {id: 'A'}, - {id: 'B'}, - {id: 'A'}, - {id: 'B'}, - {id: 'B'}, - ]; - - const sub1 = socketSubject.multiplex( - () => 'no-op', - () => results.push('A unsub'), - (req: any) => req.id === 'A' - ).pipe( - takeWhile((req: any) => !req.complete) - ) - .subscribe( - { next: () => results.push('A next'), error: (e) => results.push('A error ' + e), complete: () => results.push('A complete') } - ); + const socketMessages = [{ id: 'A' }, { id: 'B' }, { id: 'A' }, { id: 'B' }, { id: 'B' }]; + + const sub1 = socketSubject + .multiplex( + () => 'no-op', + () => (results.push('A unsub'), 'no-op'), + (req: any) => req.id === 'A' + ) + .pipe(takeWhile((req: any) => !req.complete)) + .subscribe({ + next: () => results.push('A next'), + error: (e) => results.push('A error ' + e), + complete: () => results.push('A complete'), + }); - socketSubject.multiplex( - () => 'no-op', - () => results.push('B unsub'), - (req: any) => req.id === 'B') - .subscribe( - { next: () => results.push('B next'), error: (e) => results.push('B error ' + e), complete: () => results.push('B complete') } - ); + socketSubject + .multiplex( + () => 'no-op', + () => (results.push('B unsub'), 'no-op'), + (req: any) => req.id === 'B' + ) + .subscribe({ + next: () => results.push('B next'), + error: (e) => results.push('B error ' + e), + complete: () => results.push('B complete'), + }); // Setup socket and send messages const socket = MockWebSocket.lastSocket; @@ -663,47 +669,39 @@ describe('webSocket', () => { }); socket.triggerClose({ wasClean: true }); - expect(results).to.deep.equal([ - 'A next', - 'A unsub', - 'B next', - 'B next', - 'B next', - 'B complete', - 'B unsub', - ]); + expect(results).to.deep.equal(['A next', 'A unsub', 'B next', 'B next', 'B next', 'B complete', 'B unsub']); }); it('should not close the socket until all subscriptions complete', () => { - const socketSubject = webSocket<{ id: string, complete: boolean }>({url: 'ws://mysocket'}); + const socketSubject = webSocket({ url: 'ws://mysocket' }); const results = [] as string[]; - const socketMessages = [ - {id: 'A'}, - {id: 'B'}, - {id: 'A', complete: true}, - {id: 'B'}, - {id: 'B', complete: true}, - ]; - - socketSubject.multiplex( - () => 'no-op', - () => results.push('A unsub'), - req => req.id === 'A' - ).pipe( - takeWhile(req => !req.complete) - ).subscribe( - { next: () => results.push('A next'), error: (e) => results.push('A error ' + e), complete: () => results.push('A complete') } - ); + const socketMessages = [{ id: 'A' }, { id: 'B' }, { id: 'A', complete: true }, { id: 'B' }, { id: 'B', complete: true }]; - socketSubject.multiplex( - () => 'no-op', - () => results.push('B unsub'), - req => req.id === 'B' - ).pipe( - takeWhile(req => !req.complete) - ).subscribe( - { next: () => results.push('B next'), error: (e) => results.push('B error ' + e), complete: () => results.push('B complete') } - ); + socketSubject + .multiplex( + () => 'no-op', + () => (results.push('A unsub'), 'no-op'), + (req) => req.id === 'A' + ) + .pipe(takeWhile((req) => !req.complete)) + .subscribe({ + next: () => results.push('A next'), + error: (e) => results.push('A error ' + e), + complete: () => results.push('A complete'), + }); + + socketSubject + .multiplex( + () => 'no-op', + () => (results.push('B unsub'), 'no-op'), + (req) => req.id === 'B' + ) + .pipe(takeWhile((req) => !req.complete)) + .subscribe({ + next: () => results.push('B next'), + error: (e) => results.push('B error ' + e), + complete: () => results.push('B complete'), + }); // Setup socket and send messages const socket = MockWebSocket.lastSocket; @@ -712,15 +710,7 @@ describe('webSocket', () => { socket.triggerMessage(JSON.stringify(msg)); }); - expect(results).to.deep.equal([ - 'A next', - 'B next', - 'A unsub', - 'A complete', - 'B next', - 'B unsub', - 'B complete', - ]); + expect(results).to.deep.equal(['A next', 'B next', 'A unsub', 'A complete', 'B next', 'B unsub', 'B complete']); }); }); @@ -729,12 +719,12 @@ describe('webSocket', () => { let messageReceived = false; const subject = webSocket({ url: 'ws://mysocket', - WebSocketCtor: MockWebSocket + WebSocketCtor: MockWebSocket, }); subject.next('ping'); - subject.subscribe(x => { + subject.subscribe((x) => { expect(x).to.equal('pong'); messageReceived = true; }); @@ -752,16 +742,13 @@ describe('webSocket', () => { }); it('should handle constructor errors if no WebSocketCtor', () => { - expect(() => { const subject = webSocket({ - url: 'ws://mysocket' + url: 'ws://mysocket', }); }).to.throw('no WebSocket constructor can be found'); - }); }); - }); class MockWebSocket { diff --git a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts index c9e827ed2e..187c04c63b 100644 --- a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts +++ b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts @@ -98,23 +98,23 @@ import { NextObserver } from '../../types.js'; * // Connection ok * ``` */ -export interface WebSocketSubjectConfig { +export interface WebSocketSubjectConfig { /** The url of the socket server to connect to */ url: string; /** The protocol to use to connect */ protocol?: string | Array; /** @deprecated Will be removed in v8. Use {@link deserializer} instead. */ - resultSelector?: (e: MessageEvent) => T; + resultSelector?: (e: MessageEvent) => Out; /** * A serializer used to create messages from passed values before the * messages are sent to the server. Defaults to JSON.stringify. */ - serializer?: (value: T) => WebSocketMessage; + serializer?: (value: In) => WebSocketMessage; /** * A deserializer used for messages arriving on the socket from the * server. Defaults to JSON.parse. */ - deserializer?: (e: MessageEvent) => T; + deserializer?: (e: MessageEvent) => Out; /** * An Observer that watches when open events occur on the underlying web socket. */ @@ -148,12 +148,13 @@ const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT = export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView; -export class WebSocketSubject extends Observable { - private _config: WebSocketSubjectConfig & Required, 'WebSocketCtor' | 'serializer' | 'deserializer'>>; +export class WebSocketSubject extends Observable { + private _config: WebSocketSubjectConfig & + Required, 'WebSocketCtor' | 'serializer' | 'deserializer'>>; private _socket: WebSocket | null = null; - private _inputBuffer: T[] = []; + private _inputBuffer: In[] = []; private _hasError = false; @@ -163,13 +164,13 @@ export class WebSocketSubject extends Observable { private _subscriberCounter = 0; - private _subscribers = new Map>(); + private _subscribers = new Map>(); get observed() { return this._subscribers.size > 0; } - constructor(urlConfigOrSource: string | WebSocketSubjectConfig) { + constructor(urlConfigOrSource: string | WebSocketSubjectConfig) { super(); const userConfig = typeof urlConfigOrSource === 'string' ? { url: urlConfigOrSource } : urlConfigOrSource; this._config = { @@ -214,8 +215,8 @@ export class WebSocketSubject extends Observable { * @param messageFilter A predicate for selecting the appropriate messages * from the server for the output stream. */ - multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) { - return new Observable((destination) => { + multiplex(subMsg: () => In, unsubMsg: () => In, messageFilter: (value: Out) => boolean) { + return new Observable((destination) => { this.next(subMsg()); destination.add(() => { this.next(unsubMsg()); @@ -233,7 +234,7 @@ export class WebSocketSubject extends Observable { }); } - #outputNext(value: T) { + #outputNext(value: Out) { for (const subscriber of Array.from(this._subscribers.values())) { subscriber.next(value); } @@ -318,7 +319,7 @@ export class WebSocketSubject extends Observable { }; } - next(value: T) { + next(value: In) { if (this._socket?.readyState !== 1) { this._inputBuffer.push(value); } else { @@ -363,7 +364,7 @@ export class WebSocketSubject extends Observable { } /** @internal */ - protected _subscribe(subscriber: Subscriber): Subscription { + protected _subscribe(subscriber: Subscriber): Subscription { if (!this._socket) { this._connectSocket(); } diff --git a/packages/rxjs/src/internal/observable/dom/webSocket.ts b/packages/rxjs/src/internal/observable/dom/webSocket.ts index b9631628f2..02bd79fedb 100644 --- a/packages/rxjs/src/internal/observable/dom/webSocket.ts +++ b/packages/rxjs/src/internal/observable/dom/webSocket.ts @@ -89,7 +89,7 @@ import { WebSocketSubject, WebSocketSubjectConfig } from './WebSocketSubject.js' * ```ts * import { webSocket } from 'rxjs/webSocket'; * - * const subject = webSocket('ws://localhost:8081'); + * const subject = webSocket('ws://localhost:8081'); * * subject.subscribe({ * next: msg => console.log('message received: ' + msg), // Called whenever there is a message from the server. @@ -103,7 +103,15 @@ import { WebSocketSubject, WebSocketSubjectConfig } from './WebSocketSubject.js' * ```ts * import { webSocket } from 'rxjs/webSocket'; * - * const subject = webSocket('ws://localhost:8081'); + * interface SendMsg { + * message: string; + * } + * + * interface RespMsg { + * data: any; + * } + * + * const subject = webSocket('ws://localhost:8081'); * * subject.subscribe(); * // Note that at least one consumer has to subscribe to the created subject - otherwise "nexted" values will be just buffered and not sent, @@ -123,7 +131,21 @@ import { WebSocketSubject, WebSocketSubjectConfig } from './WebSocketSubject.js' * ```ts * import { webSocket } from 'rxjs/webSocket'; * - * const subject = webSocket('ws://localhost:8081'); + * interface SubMsg { + * subscribe: string; + * } + * + * interface UnsubMsg { + * unsubscribe: string; + * } + * + * interface RespMsg { + * type: string; + * data: any; + * } + * + * + * const subject = webSocket('ws://localhost:8081'); * * const observableA = subject.multiplex( * () => ({ subscribe: 'A' }), // When server gets this message, it will start sending messages for 'A'... @@ -156,6 +178,6 @@ import { WebSocketSubject, WebSocketSubjectConfig } from './WebSocketSubject.js' * @param urlConfigOrSource The WebSocket endpoint as an url or an object with configuration and additional Observers. * @return Subject which allows to both send and receive messages via WebSocket connection. */ -export function webSocket(urlConfigOrSource: string | WebSocketSubjectConfig): WebSocketSubject { - return new WebSocketSubject(urlConfigOrSource); +export function webSocket(urlConfigOrSource: string | WebSocketSubjectConfig): WebSocketSubject { + return new WebSocketSubject(urlConfigOrSource); }