Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why WebSocketSubjects multiplex does not return a Subject? #2442

Closed
mpodlasin opened this issue Mar 4, 2017 · 4 comments
Closed

Why WebSocketSubjects multiplex does not return a Subject? #2442

mpodlasin opened this issue Mar 4, 2017 · 4 comments

Comments

@mpodlasin
Copy link
Contributor

mpodlasin commented Mar 4, 2017

Hi.

Let me just start by saying that there is a possibility I misunderstood multiplex operator completely. If that is so, sorry for spamming (and I would be super thankful for some resources on the subject).

As far as I understand, multiplex simulates having websocket connections to multpile endpoints, while it really has only single connection to a "gateway" endpoint that can forward messages incoming from endpoints we pretend we have connection to. For example we pretend we have a connection per conversation in a chat app, while in fact there is only one connection to a server that marks incoming messages with flag that forwards a message to proper Observable.

If I get it right, then the question is, why we have a possibility to simulate such a connection only in one way (server -> client messages only)? Since multiplex returns an Observable, we do not have a possibility to next message directly to fake socket that represents single conversation in my chat app. I have to use origin WebSocketSubject and while calling next on it, every time manually mark my message with a flag that forwards my message to proper conversation.

In fact, we could take it much further and make multiplex return WebSocketSubject, so that it could be itself multiplexed recursively again and again. In complex chat app with tree-like threads, conversations and user structure, I could represent such structure with my WebSocketSubjects and always forward messages directly where I want, without any boilerplate.

Also quite interestingly if done right, it would not be a breaking change at all.

Thoughts?

@huan
Copy link
Contributor

huan commented May 16, 2017

+1

@HipsterZipster
Copy link

I agree with you 100% @mpodlasin and I find myself in a similar scenario. How did you resolve this? Can you post code?

@benlesh
Copy link
Member

benlesh commented Aug 18, 2020

Closing this, as we've updated the documentation. Perhaps at some point we can add a feature as outlined, but there hasn't been much demand for it, TMK.

@benlesh benlesh closed this as completed Aug 18, 2020
@IceBjerg
Copy link

Hey everyone!

I currently have some problems regarding websockets + rxjs.
My initial problem is that I want to control the network errors, outages, etc separately from business logic, preferably in the service itself. Optionally, I would have somewhere a function wich would return an observable that emits 'OK' or 'Err' just in case I want to have some icon somewhere that the connection is lost.

It was not easy and intuitive, and I don't even think it's the proper way to do this, but given the tools with RxJS + websocket, I could not do it any better.

My problem was that it's not easy to find a way to implement the reconnect functionality.

This is basically my code:

import { webSocket } from 'rxjs/webSocket';
import './style.css';

import { BehaviorSubject, retry, Observable, shareReplay } from 'rxjs';

const RECONNECT_DELAY = 5000;

const errors$ = new BehaviorSubject<Error | undefined>(undefined);
const subject$ = webSocket({
  url: 'wss://example.com/ws',
  closeObserver: {
    next: (event) => {
      console.log(event);
      errors$.next(new Error('Connection closed'));
    },
  },
  openObserver: {
    next: (event) => {
      errors$.next(undefined);
    },
  },
});

const channel1: Observable<unknown> = subject$
  .multiplex(
    () => ({
      join: 'JOIN',
      channel_name: 'channel_1',
    }),
    () => {
      return {
        action: 'LEAVE',
        channel_name: 'channel_1',
      };
    },
    (msg: any) => msg.channel_name === 'channel_1'
  )
  .pipe(
    retry({ delay: RECONNECT_DELAY }),
    shareReplay({ bufferSize: 1, refCount: true })
  );

channel1.subscribe((x) => console.log('channel_1', x));

const channel2: Observable<unknown> = subject$
  .multiplex(
    () => ({
      join: 'JOIN',
      channel_name: 'channel_2',
    }),
    () => {
      return {
        action: 'LEAVE',
        channel_name: 'channel_2',
      };
    },
    (msg: any) => msg.channel_name === 'channel_2'
  )
  .pipe(
    retry({ delay: RECONNECT_DELAY }),
    shareReplay({ bufferSize: 1, refCount: true })
  );

channel2.subscribe((x) => console.log('channel_2', x));

This somehow works, and if there is a need for reconnect, this code does the job and reconnects. After a successful reconnect, the channel subscriptions are also sent, and we are happy.

I am only not happy about one thing: because writing to a channel is done like this:

subject$.next({channel_name: 'channel_1', message: 'hello'});

It can happen that in case of an accidental disconnect, messages could be queued. When a reconnect then happens in the future, these messages are sent before the subscription messages are sent.

What do you think? Am I missing something, or is there anything I can do better?
If multiplex would return a Subject, these cases I think could be fixable at the subject side, because it could already know the channel, and could possibly link with the messages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants