Skip to content

Commit

Permalink
Flow.first operators family (without firstOrNull and firstOrDefault s…
Browse files Browse the repository at this point in the history
…upport)

Fixes #1078
  • Loading branch information
qwwdfsad committed May 31, 2019
1 parent a7a0dde commit 620c0b6
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,8 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun filter (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun filterNot (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun filterNotNull (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
Expand Down
47 changes: 45 additions & 2 deletions kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

@file:JvmMultifileClass
@file:JvmName("FlowKt")
@file:Suppress("UNCHECKED_CAST")

package kotlinx.coroutines.flow

Expand Down Expand Up @@ -50,7 +51,7 @@ public suspend inline fun <T, R> Flow<T>.fold(
}

/**
* Terminal operator, that awaits for one and only one value to be published.
* The terminal operator, that awaits for one and only one value to be published.
* Throws [NoSuchElementException] for empty flow and [IllegalStateException] for flow
* that contains more than one element.
*/
Expand All @@ -68,7 +69,7 @@ public suspend fun <T> Flow<T>.single(): T {
}

/**
* Terminal operator, that awaits for one and only one value to be published.
* The terminal operator, that awaits for one and only one value to be published.
* Throws [IllegalStateException] for flow that contains more than one element.
*/
@FlowPreview
Expand All @@ -81,3 +82,45 @@ public suspend fun <T: Any> Flow<T>.singleOrNull(): T? {

return result
}

/**
* The terminal operator that returns the first element emitted by the flow and then cancels flow's collection.
* Throws [NoSuchElementException] if the flow was empty.
*/
@FlowPreview
public suspend fun <T> Flow<T>.first(): T {
var result: Any? = NullSurrogate
try {
collect { value ->
result = value
throw AbortFlowException()
}
} catch (e: AbortFlowException) {
// Do nothing
}

if (result === NullSurrogate) throw NoSuchElementException("Expected at least one element")
return result as T
}

/**
* The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection.
* Throws [NoSuchElementException] if the flow has not contained elements matching the [predicate].
*/
@FlowPreview
public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T {
var result: Any? = NullSurrogate
try {
collect { value ->
if (predicate(value)) {
result = value
throw AbortFlowException()
}
}
} catch (e: AbortFlowException) {
// Do nothing
}

if (result === NullSurrogate) throw NoSuchElementException("Expected at least one element matching the predicate $predicate")
return result as T
}
86 changes: 86 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.test.*

class FirstTest : TestBase() {
@Test
fun testFirst() = runTest {
val flow = flowOf(1, 2, 3)
assertEquals(1, flow.first())
}

@Test
fun testNulls() = runTest {
val flow = flowOf(null, 1)
assertNull(flow.first())
assertNull(flow.first { it == null })
assertEquals(1, flow.first { it != null })
}

@Test
fun testFirstWithPredicate() = runTest {
val flow = flowOf(1, 2, 3)
assertEquals(1, flow.first { it > 0 })
assertEquals(2, flow.first { it > 1 })
assertFailsWith<NoSuchElementException> { flow.first { it > 3 } }
}

@Test
fun testFirstCancellation() = runTest {
val latch = Channel<Unit>()
val flow = flow {
coroutineScope {
launch {
latch.send(Unit)
hang { expect(1) }
}
emit(1)
emit(2)
}
}


val result = flow.first {
latch.receive()
true
}
assertEquals(1, result)
finish(2)
}

@Test
fun testEmptyFlow() = runTest {
assertFailsWith<NoSuchElementException> { emptyFlow<Int>().first() }
assertFailsWith<NoSuchElementException> { emptyFlow<Int>().first { true } }
}

@Test
fun testErrorCancelsUpstream() = runTest {
val latch = Channel<Unit>()
val flow = flow {
coroutineScope {
launch {
latch.send(Unit)
hang { expect(1) }
}
emit(1)
}
}

assertFailsWith<TestException> {
flow.first {
latch.receive()
throw TestException()
}
}

assertEquals(1, flow.first())
finish(2)
}
}

0 comments on commit 620c0b6

Please sign in to comment.