diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index 95cbf23a5d..7a87523c02 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -821,6 +821,8 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun filter (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun filterNot (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun filterNotNull (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; + public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index 4afd0959b7..aba4410a20 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -4,6 +4,7 @@ @file:JvmMultifileClass @file:JvmName("FlowKt") +@file:Suppress("UNCHECKED_CAST") package kotlinx.coroutines.flow @@ -50,7 +51,7 @@ public suspend inline fun Flow.fold( } /** - * Terminal operator, that awaits for one and only one value to be published. + * The terminal operator, that awaits for one and only one value to be published. * Throws [NoSuchElementException] for empty flow and [IllegalStateException] for flow * that contains more than one element. */ @@ -68,7 +69,7 @@ public suspend fun Flow.single(): T { } /** - * Terminal operator, that awaits for one and only one value to be published. + * The terminal operator, that awaits for one and only one value to be published. * Throws [IllegalStateException] for flow that contains more than one element. */ @FlowPreview @@ -81,3 +82,45 @@ public suspend fun Flow.singleOrNull(): T? { return result } + +/** + * The terminal operator that returns the first element emitted by the flow and then cancels flow's collection. + * Throws [NoSuchElementException] if the flow was empty. + */ +@FlowPreview +public suspend fun Flow.first(): T { + var result: Any? = NullSurrogate + try { + collect { value -> + result = value + throw AbortFlowException() + } + } catch (e: AbortFlowException) { + // Do nothing + } + + if (result === NullSurrogate) throw NoSuchElementException("Expected at least one element") + return result as T +} + +/** + * The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection. + * Throws [NoSuchElementException] if the flow has not contained elements matching the [predicate]. + */ +@FlowPreview +public suspend fun Flow.first(predicate: suspend (T) -> Boolean): T { + var result: Any? = NullSurrogate + try { + collect { value -> + if (predicate(value)) { + result = value + throw AbortFlowException() + } + } + } catch (e: AbortFlowException) { + // Do nothing + } + + if (result === NullSurrogate) throw NoSuchElementException("Expected at least one element matching the predicate $predicate") + return result as T +} diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt new file mode 100644 index 0000000000..e84d4c7b77 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt @@ -0,0 +1,86 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlin.test.* + +class FirstTest : TestBase() { + @Test + fun testFirst() = runTest { + val flow = flowOf(1, 2, 3) + assertEquals(1, flow.first()) + } + + @Test + fun testNulls() = runTest { + val flow = flowOf(null, 1) + assertNull(flow.first()) + assertNull(flow.first { it == null }) + assertEquals(1, flow.first { it != null }) + } + + @Test + fun testFirstWithPredicate() = runTest { + val flow = flowOf(1, 2, 3) + assertEquals(1, flow.first { it > 0 }) + assertEquals(2, flow.first { it > 1 }) + assertFailsWith { flow.first { it > 3 } } + } + + @Test + fun testFirstCancellation() = runTest { + val latch = Channel() + val flow = flow { + coroutineScope { + launch { + latch.send(Unit) + hang { expect(1) } + } + emit(1) + emit(2) + } + } + + + val result = flow.first { + latch.receive() + true + } + assertEquals(1, result) + finish(2) + } + + @Test + fun testEmptyFlow() = runTest { + assertFailsWith { emptyFlow().first() } + assertFailsWith { emptyFlow().first { true } } + } + + @Test + fun testErrorCancelsUpstream() = runTest { + val latch = Channel() + val flow = flow { + coroutineScope { + launch { + latch.send(Unit) + hang { expect(1) } + } + emit(1) + } + } + + assertFailsWith { + flow.first { + latch.receive() + throw TestException() + } + } + + assertEquals(1, flow.first()) + finish(2) + } +}