Skip to content

Commit

Permalink
refactor(WebSocketSubject): directly inherit Subject
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Nov 22, 2023
1 parent f6e54f0 commit ae005b4
Showing 1 changed file with 18 additions and 2 deletions.
20 changes: 18 additions & 2 deletions packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Subject, AnonymousSubject } from '../../Subject.js';
import { Subject } from '../../Subject.js';
import { Subscriber, Observable, Subscription, operate } from '../../Observable.js';
import { ReplaySubject } from '../../ReplaySubject.js';
import { Observer, NextObserver } from '../../types.js';
Expand Down Expand Up @@ -151,14 +151,18 @@ const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT =

export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView;

export class WebSocketSubject<T> extends AnonymousSubject<T> {
export class WebSocketSubject<T> extends Subject<T> {
private _config!: WebSocketSubjectConfig<T>;

/** @internal */
_output!: Subject<T>;

private _socket: WebSocket | null = null;

private destination: Observer<T> | undefined = undefined;

private _source: Observable<T> | undefined = undefined;

constructor(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>) {
super();
if (urlConfigOrSource instanceof Observable) {
Expand Down Expand Up @@ -334,6 +338,18 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
};
}

next(value: T) {
this.destination?.next?.(value);
}

error(err: any) {
this.destination?.error?.(err);
}

complete() {
this.destination?.complete?.();
}

/** @internal */
protected _subscribe(subscriber: Subscriber<T>): Subscription {
const { _source } = this;
Expand Down

0 comments on commit ae005b4

Please sign in to comment.