Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

withLatestFrom completes with no emissions when input observable has delay #7068

Open
csvenke opened this issue Sep 24, 2022 · 5 comments
Open

Comments

@csvenke
Copy link

csvenke commented Sep 24, 2022

Describe the bug

In the code below example$ will complete with no emissions

const source$ = range(1, 3).pipe(
  delay(0)
)

const example$ =
  range(1, 3).pipe(
    withLatestFrom(source$)
  )

example$.subscribe(console.log)

Adding observeOn(asyncScheduler) to example$ makes it work as expected

const source$ = range(1, 3).pipe(
  delay(0)
)

const example$ =
  range(1, 3).pipe(
    observeOn(asyncScheduler),
    withLatestFrom(source$)
  )

example$.subscribe(console.log)
// [ 1, 3 ]
// [ 2, 3 ]
// [ 3, 3 ]

However, bumping the delay to a number above 2 it will make it complete with no emissions again

const source$ = range(1, 3).pipe(
  delay(3)
)

const example$ =
  range(1, 3).pipe(
    observeOn(asyncScheduler),
    withLatestFrom(source$)
  )

example$.subscribe(console.log)

Expected behavior

When running the code below I expect these emissions after a delay of 300 ms

[ 1, 3 ]
[ 2, 3 ]
[ 3, 3 ]
const source$ = range(1, 3).pipe(
  delay(300)
)

const example$ =
  range(1, 3).pipe(
    withLatestFrom(source$)
  )

example$.subscribe(console.log)

Reproduction code

No response

Reproduction URL

https://stackblitz.com/edit/rxjs-se9x2m

Version

7.5.6

Environment

No response

Additional context

No response

@LBBO
Copy link

LBBO commented Dec 28, 2022

I would argue that withLatestFrom is working as designed here. Consider this marble diagram from the docs that visualizes how withLatestFrom is designed to work:

image

The resulting observable can only emit a value when the piped observable omits a value. The other observables provided to withLatestFrom are just queried in that moment but their emissions cannot trigger any new emission from our result.

Coming back to your example, that means that range(1, 3) emits three events immediately. For all three, source$ is queried for the latest emission. However, due to the delay, it hasn't emitted anything yet and so withLatestFrom is not able to emit anything for all three events from range. When you're using a smaller delay and the asyncSchedular, I assume that some details in RxJS's implementation and in the JavaScript event loop are causing code execution to be timed such that the range's events are emitted after the 0-2 ms delay has passed and source has actually emitted something. Probably the delay could even be larger on a slower computer, but all of that's just speculation.

Perhaps combineLatest would be better suited for your use case? In your specific example, you could also flip the two observables to get your desired behavior:

const source$ = range(1, 3).pipe(
  delay(300)
)

const example$ =
  source$.pipe(
    withLatestFrom(range(1, 3))
  )

example$.subscribe(console.log)

@wSedlacek
Copy link

The issue I am noticing is with b1 from the diagram.
Yes once the argument of withLatestFrom() has emitted a value future values should be ignored until the next emission of the input.

But when the input has emitted a value and withLatestFrom() encounters its first value it would be expected (and was the behavior in previous versions) that it would emit.

@LBBO
Copy link

LBBO commented Jan 16, 2023

So you would like to see an a1 emission before b1, correct? In my opinion, that behavior would contradict the documentation:

Combines the source Observable with other Observables to create an Observable whose values are calculated from the latest values of each, only when the source emits.

Emitting a1 when 1 is emitted would be an emission caused by something other than the source emission.

@wSedlacek
Copy link

You are correct. That would be a1 and not b1. Thank you for clarifying.

@conblem
Copy link

conblem commented Jun 12, 2024

For those stumbling upon this issue I solved it like this in my case

import { range, combineLatestWith, delay, distinctUntilChanged } from "rxjs";

const source$ = range(1, 3).pipe(delay(0));

const workingExample$ = range(1, 3).pipe(
  map((val, i) => <const>[val, i]),
  combineLatestWith(source$),
  distinctUntilChanged(undefined!, ([[_, i]]) => i),
  map(([[val], source]) => <const>[val, source])
);

workingExample$.subscribe(console.log);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants