Skip to content

Commit

Permalink
refactor(WebSocketSubject): no longer relies on ReplaySubject.
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Nov 22, 2023
1 parent 989c187 commit 245c412
Showing 1 changed file with 53 additions and 45 deletions.
98 changes: 53 additions & 45 deletions packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Subject } from '../../Subject.js';
import { Subscriber, Observable, Subscription, operate } from '../../Observable.js';
import { ReplaySubject } from '../../ReplaySubject.js';
import { Observer, NextObserver } from '../../types.js';
import { NextObserver } from '../../types.js';

/**
* WebSocketSubjectConfig is a plain Object that allows us to make our
Expand Down Expand Up @@ -158,7 +157,13 @@ export class WebSocketSubject<T> extends Subject<T> {

private _socket: WebSocket | null = null;

private _input: Observer<T>;
private _inputBuffer: T[] = [];

private _hasError = false;

private _error: any;

private _isComplete = false;

constructor(urlConfigOrSource: string | WebSocketSubjectConfig<T>) {
super();
Expand All @@ -177,13 +182,15 @@ export class WebSocketSubject<T> extends Subject<T> {
}

this._output = new Subject<T>();
this._input = new ReplaySubject();
}

private _resetState() {
this._socket = null;
this._input = new ReplaySubject();
this._output = new Subject<T>();
this._inputBuffer = [];
this._hasError = false;
this._isComplete = false;
this._error = null;
}

/**
Expand Down Expand Up @@ -250,44 +257,14 @@ export class WebSocketSubject<T> extends Subject<T> {

this._config.openObserver?.next(evt);

const previousInput = this._input;

// We switch over now to passthrough all messages directly to the
// to the socket, where previously we were queuing them up with
// a ReplaySubject.
this._input = new Subscriber<T>({
next: (x) => {
if (socket!.readyState === 1) {
try {
socket!.send(this._config.serializer!(x));
} catch (e) {
this._input.error(e);
}
}
},
error: (err: any) => {
this._config.closingObserver?.next(undefined);
if (err?.code) {
socket!.close(err.code, err.reason);
} else {
_output.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT));
}
this._resetState();
},
complete: () => {
this._config.closingObserver?.next(undefined);
socket!.close();
this._resetState();
},
});
while (this._inputBuffer.length > 0) {
this.next(this._inputBuffer.shift()!);
}

// If the _input was a ReplaySubject before, when we
// subscribe right now, it will synchronously emit all
// of the buffered values.
if (previousInput instanceof ReplaySubject) {
// Note that since `_input` is a `Subscriber`, this will
// automatically wire up the subscription.
previousInput.subscribe(this._input);
if (this._hasError) {
this.error(this._error);
} else if (this._isComplete) {
this.complete();
}
};

Expand Down Expand Up @@ -322,15 +299,46 @@ export class WebSocketSubject<T> extends Subject<T> {
}

next(value: T) {
this._input.next(value);
if (this._socket?.readyState !== 1) {
this._inputBuffer.push(value);
} else {
try {
this._socket.send(this._config.serializer!(value));
} catch (err: any) {
this._config.closingObserver?.next(undefined);
if (err?.code) {
this._socket.close(err.code, err.reason);
} else {
this._output.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT));
}
this._resetState();
}
}
}

error(err: any) {
this._input.error(err);
if (this._socket?.readyState === 1) {
this._config.closingObserver?.next(undefined);
if (err?.code) {
this._socket?.close(err.code, err.reason);
} else {
this._output.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT));
}
this._resetState();
} else {
this._hasError = true;
this._error = err;
}
}

complete() {
this._input.complete();
if (this._socket?.readyState === 1) {
this._config.closingObserver?.next(undefined);
this._socket.close();
this._resetState();
} else {
this._isComplete = true;
}
}

/** @internal */
Expand Down

0 comments on commit 245c412

Please sign in to comment.