Skip to content

Commit

Permalink
refactor(WebSocketSubject): clean up config a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Nov 22, 2023
1 parent dc95d32 commit 27075b6
Showing 1 changed file with 8 additions and 10 deletions.
18 changes: 8 additions & 10 deletions packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ export interface WebSocketSubjectConfig<T> {
binaryType?: 'blob' | 'arraybuffer';
}

const DEFAULT_WEBSOCKET_CONFIG: WebSocketSubjectConfig<any> = {
url: '',
const DEFAULT_WEBSOCKET_CONFIG = {
deserializer: (e: MessageEvent) => JSON.parse(e.data),
serializer: (value: any) => JSON.stringify(value),
};
Expand All @@ -150,7 +149,7 @@ const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT =
export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView;

export class WebSocketSubject<T> extends Observable<T> {
private _config!: WebSocketSubjectConfig<T>;
private _config: WebSocketSubjectConfig<T> & Required<Pick<WebSocketSubjectConfig<T>, 'WebSocketCtor' | 'serializer' | 'deserializer'>>;

private _socket: WebSocket | null = null;

Expand Down Expand Up @@ -255,11 +254,11 @@ export class WebSocketSubject<T> extends Observable<T> {
}

private _connectSocket() {
const { WebSocketCtor, protocol, url, binaryType } = this._config;
const { WebSocketCtor, protocol, url, binaryType, deserializer, openObserver, closeObserver } = this._config;

let socket: WebSocket | null = null;
try {
socket = protocol ? new WebSocketCtor!(url, protocol) : new WebSocketCtor!(url);
socket = protocol ? new WebSocketCtor(url, protocol) : new WebSocketCtor(url);
this._socket = socket;
if (binaryType) {
this._socket.binaryType = binaryType;
Expand All @@ -275,7 +274,7 @@ export class WebSocketSubject<T> extends Observable<T> {
return;
}

this._config.openObserver?.next(evt);
openObserver?.next(evt);

while (this._inputBuffer.length > 0) {
this.next(this._inputBuffer.shift()!);
Expand All @@ -301,7 +300,7 @@ export class WebSocketSubject<T> extends Observable<T> {
return;
}

this._config.closeObserver?.next(e);
closeObserver?.next(e);

if (e.wasClean) {
this.#outputComplete();
Expand All @@ -312,8 +311,7 @@ export class WebSocketSubject<T> extends Observable<T> {

socket.onmessage = (e: MessageEvent) => {
try {
const { deserializer } = this._config;
this.#outputNext(deserializer!(e));
this.#outputNext(deserializer(e));
} catch (err) {
this.#outputError(err);
}
Expand All @@ -325,7 +323,7 @@ export class WebSocketSubject<T> extends Observable<T> {
this._inputBuffer.push(value);
} else {
try {
this._socket.send(this._config.serializer!(value));
this._socket.send(this._config.serializer(value));
} catch (err: any) {
this.error(err);
}
Expand Down

0 comments on commit 27075b6

Please sign in to comment.