Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add first { … } terminal operator to Flow #1078

Closed
LouisCAD opened this issue Apr 9, 2019 · 12 comments
Closed

Add first { … } terminal operator to Flow #1078

LouisCAD opened this issue Apr 9, 2019 · 12 comments

Comments

@LouisCAD
Copy link
Contributor

LouisCAD commented Apr 9, 2019

Would be much like first { … } from kotlin.collections, collecting all elements until the predicate returns true, after which it would cancel the flow.

I'm not sure what the behavior should be when the flow is empty though.

@JakeWharton
Copy link
Contributor

You throw NoSuchElementException

@fvasco
Copy link
Contributor

fvasco commented Apr 9, 2019

like a Sequence https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/first.html

@kevinherron
Copy link

and firstOrNull that doesn't throw?

@elizarov
Copy link
Contributor

Add some use-case, please

@LouisCAD
Copy link
Contributor Author

I made a library to return scan results from the Bluetooth Low Energy API of Android, and often, the use case is to scan for all devices, or devices responding to a filter list passed to the system, to find one device, the one you want, which effectively, is the first one, or the first one matching a predicate.

I currently do this using a Channel that becomes hot as soon as it is returned (scan is started before returning), then results are sent to the channel, and scan is stopped when the channel is stopped.

I would like to try to use flow as an alternative, and see what is a better fit between Flow and Channel, keeping both if they each have valid use cases, in line with the semantics of Flow and Channel.

Here's the relevant lines of code for the channel based API: https://github.com/Beepiz/BleScanCoroutines/blob/5904bc44089e722145f987d653f1c77f479b9af2/core/src/main/java/com/beepiz/bluetooth/scancoroutines/experimental/BluetoothLeScanner.kt#L18-L29

@LouisCAD
Copy link
Contributor Author

I'm not used to reactive streams at all though. Maybe @JakeWharton or others can bring some use cases, or tell if that doesn't make sense or is a recipe for undesirable use of reactive streams.

@JakeWharton
Copy link
Contributor

Not sure I've used it much except for in tests.

If you're doing something with a nested stream (like concat(memoryCache, diskCache, network)) you can always just flatMap a take(1) instead of map-ing first() to merge it back in an enclosing stream. But aside from that and maybe bridging with some legacy system that isn't reactive, the need for something like that should be relatively minor.

@PaulWoitaschek
Copy link
Contributor

Add some use-case, please

I have a function where I need to fetch the next new element receied from a ConflatedBroadcastChannel. So I use .drop(1).first().

private object NullSurrogate

private object FirstException : CancellationException("First received")

suspend fun <T> Flow<T>.first(): T {
  var result: Any? = NullSurrogate
  try {
    collect { value ->
      result = value
      throw FirstException
    }
  } catch (ignored: FirstException) {
  }

  if (result === NullSurrogate) {
    throw NoSuchElementException("Expected at least one element")
  }
  @Suppress("UNCHECKED_CAST")
  return result as T
}

Would this be a good implementation of first()?

@LouisCAD
Copy link
Contributor Author

@PaulWoitaschek You're talking about ConflatedBroacastChannel, yet you show an implementation for Flow. I'm not sure I understand. Note that ReceiveChannel (returned by openSubscription() in ConflatedBroacastChannel) already has a drop function and a first function.

@PaulWoitaschek
Copy link
Contributor

Instead of openSubscription I call asFlow().drop(1).first()

@ZakTaccardi
Copy link

My use case is that I needed to observe a Flow<T> filter the value, and grab the first emission before carrying on in a coroutine.

launch {
  val setting = settingRepository.states()
    .map { it.selected }
    .awaitFirst() // no such operator for `Flow`, only for `ReceiveChannel`

@ivan-moto
Copy link

Any reason firstOrNull wasn't implemented?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

9 participants