Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow.collectWhile #1087

Open
fvasco opened this issue Apr 12, 2019 · 7 comments
Open

Flow.collectWhile #1087

fvasco opened this issue Apr 12, 2019 · 7 comments

Comments

@fvasco
Copy link
Contributor

fvasco commented Apr 12, 2019

takeXxx share a common logic, have you considered collectWhile?

Here a POC

@FlowPreview
public fun <T> Flow<T>.take(count: Int): Flow<T> {
    if (count == 0) return emptyFlow()
    require(count > 0) { "Take count should be non negative, but had $count" }
    return flow {
        var consumed = 0
        collectWhile { value ->
            if (consumed != count) {
                emit(value)
                consumed++
                true
            } else false
        }
    }
}

public suspend fun <T> Flow<T>.collectWhile(action: suspend (value: T) -> Boolean) {
    var cancellationException: CancellationException? = null
    try {
        collect { value ->
            val loop = action(value)
            if (!loop) {
                val exception = CancellationException()
                cancellationException = exception
                throw exception
            }
        }
    } catch (e: CancellationException) {
        if (e !== cancellationException) throw e
    }
}
@elizarov elizarov added the flow label Apr 12, 2019
@elizarov
Copy link
Contributor

Do you see any collectWhile use-cases beyond takeXxx implementation?

@JakeWharton
Copy link
Contributor

Would be used also for #1078 (first() / firstOrNull()) if that is accepted as a valid operator.

@fvasco
Copy link
Contributor Author

fvasco commented Apr 14, 2019

Do you see any collectWhile use-cases beyond takeXxx implementation?

@elizarov your question looks like: "Should we allow break in the for loop?". Frankly I feel to not be the right person to discuss about the structured programming, especially if actually codebase already proved that an exception is a sufficient break replacement.

So I don't see any strongly argument to break a collect, however an explicit operator can only result clearer to the reader than an quick and dirty goto replacement.

firstXxx can be build using take, as @JakeWharton already noted, so this should look a consideration against this proposal.
At same time take can be build using takeWhile, instead the current implementation does not do that and breaks the collect in the same way. Probably firstXxx and any further custom operators will do the same.

This proposal defines a clean method to break a collect.

@fvasco
Copy link
Contributor Author

fvasco commented Feb 11, 2020

firstOrNull use case #1796

@Zhuinden
Copy link

Apparently collectWhile exists but it's internal

// Internal building block for non-tailcalling flow-truncating operators
internal suspend inline fun <T> Flow<T>.collectWhile(crossinline predicate: suspend (value: T) -> Boolean) {
val collector = object : FlowCollector<T> {
override suspend fun emit(value: T) {
// Note: we are checking predicate first, then throw. If the predicate does suspend (calls emit, for example)
// the the resulting code is never tail-suspending and produces a state-machine
if (!predicate(value)) {
throw AbortFlowException(this)
}
}
}
try {
collect(collector)
} catch (e: AbortFlowException) {
e.checkOwnership(collector)
}
}

Most likely on purpose, of course.

@lowasser
Copy link
Contributor

We've come across a few cases where there is state we want to update which affects downstream operations. The first place we came across is roughly takeUnits, where each element may represent a different number of units -- especially bytes, where a Flow<ByteString>.takeBytes(bytes: Long) needs to measure each byte string. (Also note that the last ByteString may be truncated, not passed through, which is an additional mismatch for takeWhile. transformWhile is a little more sensical, but you still need to wrap it in flow { var bytesEncountered = 0; emitAll(transformWhile { ... }) } which is more intuitive as collectWhile.

and(Flow<Boolean>) is doable with dropWhile { it }.firstOrNull() != null, but is a little more intuitive with collectWhile.

We might also suggest calling this doCollectWhile, as the condition "comes at the end" and is evaluated at the end. For example,

suspend fun Flow<Boolean>.and(): Boolean {
  var soFar = true
  doCollectWhile {
    soFar = it
    it
  }
  return soFar
}

@lowasser
Copy link
Contributor

Update: we're no longer quite as convinced about doCollectWhile as a name, but we have also found this useful on SharedFlow. SharedFlows always need a "while" condition, and when there's any sort of state involved in the while condition, collect is usually just a better match than transformWhile or takeWhile.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants