From d5478b68bd7b22806b8246d3cbd6b863bb93d8a7 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 6 Jun 2019 11:43:31 +0300 Subject: [PATCH] More operators (#1236) * Scan and emitAll operators * Flow.first operators family (without firstOrNull and firstOrDefault support) * More migrations Fixes #1094 Fixes #1078 Fixes #1244 --- .../kotlinx-coroutines-core.txt | 9 ++ .../common/src/flow/Migration.kt | 53 ++++++++++++ .../common/src/flow/operators/Errors.kt | 4 +- .../common/src/flow/operators/Merge.kt | 10 +-- .../common/src/flow/operators/Transform.kt | 45 ++++++++++ .../common/src/flow/terminal/Collect.kt | 6 ++ .../common/src/flow/terminal/Reduce.kt | 47 +++++++++- .../common/test/flow/operators/ScanTest.kt | 68 +++++++++++++++ .../common/test/flow/terminal/FirstTest.kt | 86 +++++++++++++++++++ 9 files changed, 315 insertions(+), 13 deletions(-) create mode 100644 kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt create mode 100644 kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index ab51a2dd72..1dcad707b1 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -822,10 +822,13 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun distinctUntilChangedBy (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static final fun drop (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow; public static final fun dropWhile (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; + public static final fun emitAll (Lkotlinx/coroutines/flow/FlowCollector;Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun emptyFlow ()Lkotlinx/coroutines/flow/Flow; public static final fun filter (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun filterNot (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun filterNotNull (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; + public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; @@ -853,6 +856,8 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun sample (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow; + public static final fun scan (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; + public static final fun scanReduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun single (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun singleOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun switchMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; @@ -872,13 +877,17 @@ public final class kotlinx/coroutines/flow/MigrationKt { public static final fun BehaviourSubject ()Ljava/lang/Object; public static final fun PublishSubject ()Ljava/lang/Object; public static final fun ReplaySubject ()Ljava/lang/Object; + public static final fun compose (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static final fun concatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun flatten (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; + public static final fun forEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)V public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow; public static final fun onErrorResume (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; public static final fun publishOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow; + public static final fun scanFold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; + public static final fun skip (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow; public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;)V public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)V public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)V diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt index bf20d2f2a2..114a32e10d 100644 --- a/kotlinx-coroutines-core/common/src/flow/Migration.kt +++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt @@ -193,3 +193,56 @@ public fun Flow>.merge(): Flow = error("Should not be called") replaceWith = ReplaceWith("flattenConcat()") ) public fun Flow>.flatten(): Flow = error("Should not be called") + +/** + * Kotlin has a built-in generic mechanism for making chained calls. + * If you wish to write something like + * ``` + * myFlow.compose(MyFlowExtensions.ignoreErrors()).collect { ... } + * ``` + * you can replace it with + * + * ``` + * myFlow.let(MyFlowExtensions.ignoreErrors()).collect { ... } + * ``` + * + * @suppress + */ +@Deprecated( + level = DeprecationLevel.ERROR, + message = "Kotlin analogue of compose is 'let'", + replaceWith = ReplaceWith("let(transformer)") +) +public fun Flow.compose(transformer: Flow.() -> Flow): Flow = error("Should not be called") + +/** + * @suppress + */ +@Deprecated( + level = DeprecationLevel.ERROR, + message = "Kotlin analogue of 'skip' is 'drop'", + replaceWith = ReplaceWith("drop(count)") +) +public fun Flow.skip(count: Int): Flow = error("Should not be called") + +/** + * Flow extension to iterate over elements is [collect]. + * Foreach wasn't introduced deliberately to avoid confusion. + * Flow is not a collection, iteration over it may be not idempotent + * and can *launch* computations with side-effects. + * This behaviour is not reflected in [forEach] name. + * @suppress + */ +@Deprecated( + level = DeprecationLevel.ERROR, + message = "Flow analogue of 'forEach' is 'collect'", + replaceWith = ReplaceWith("collect(block)") +) +public fun Flow.forEach(action: suspend (value: T) -> Unit): Unit = error("Should not be called") + +@Deprecated( + level = DeprecationLevel.ERROR, + message = "Flow has less verbose 'scan' shortcut", + replaceWith = ReplaceWith("scan(initial, operation)") +) +public fun Flow.scanFold(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow = error("Should not be called") diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt index de964da6ef..29777b7a83 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt @@ -27,9 +27,7 @@ public fun Flow.onErrorCollect( predicate: ExceptionPredicate = ALWAYS_TRUE ): Flow = collectSafely { e -> if (!predicate(e)) throw e - fallback.collect { value -> - emit(value) - } + emitAll(fallback) } /** diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt index 0fa6e8abd4..38b116a83f 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt @@ -80,11 +80,7 @@ public fun Flow.flatMapMerge( */ @FlowPreview public fun Flow>.flattenConcat(): Flow = flow { - collect { value -> - value.collect { innerValue -> - emit(innerValue) - } - } + collect { value -> emitAll(value) } } /** @@ -137,9 +133,7 @@ public fun Flow.switchMap(transform: suspend (value: T) -> Flow): F previousFlow?.join() // Undispatched to have better user experience in case of synchronous flows previousFlow = launch(start = CoroutineStart.UNDISPATCHED) { - transform(value).collect { innerValue -> - downstream.emit(innerValue) - } + downstream.emitAll(transform(value)) } } } diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt index aff523dd99..2ef4b97a9c 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt @@ -4,10 +4,12 @@ @file:JvmMultifileClass @file:JvmName("FlowKt") +@file:Suppress("UNCHECKED_CAST") package kotlinx.coroutines.flow import kotlinx.coroutines.* +import kotlinx.coroutines.flow.internal.NULL import kotlin.jvm.* import kotlinx.coroutines.flow.unsafeFlow as flow @@ -97,3 +99,46 @@ public fun Flow.onEach(action: suspend (T) -> Unit): Flow = flow { emit(value) } } + +/** + * Folds the given flow with [operation], emitting every intermediate result, including [initial] value. + * Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors. + * For example: + * ``` + * flowOf(1, 2, 3).accumulate(emptyList()) { acc, value -> acc + value }.toList() + * ``` + * will produce `[], [1], [1, 2], [1, 2, 3]]`. + */ +@FlowPreview +public fun Flow.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow = flow { + var accumulator: R = initial + emit(accumulator) + collect { value -> + accumulator = operation(accumulator, value) + emit(accumulator) + } +} + +/** + * Reduces the given flow with [operation], emitting every intermediate result, including initial value. + * The first element is taken as initial value for operation accumulator. + * This operator has a sibling with initial value -- [scan]. + * + * For example: + * ``` + * flowOf(1, 2, 3, 4).scan { (v1, v2) -> v1 + v2 }.toList() + * ``` + * will produce `[1, 3, 6, 10]` + */ +@FlowPreview +public fun Flow.scanReduce(operation: suspend (accumulator: T, value: T) -> T): Flow = flow { + var accumulator: Any? = NULL + collect { value -> + accumulator = if (accumulator === NULL) { + value + } else { + operation(accumulator as T, value) + } + emit(accumulator as T) + } +} diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt index 624b51f683..a6a218cf46 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt @@ -32,3 +32,9 @@ public suspend inline fun Flow.collect(crossinline action: suspend (value collect(object : FlowCollector { override suspend fun emit(value: T) = action(value) }) + +/** + * Collects all the values from the given [flow] and emits them to the collector. + * Shortcut for `flow.collect { value -> emit(value) }`. + */ +public suspend inline fun FlowCollector.emitAll(flow: Flow) = flow.collect(this) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index 3a519e6514..4eca3efaf6 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -4,6 +4,7 @@ @file:JvmMultifileClass @file:JvmName("FlowKt") +@file:Suppress("UNCHECKED_CAST") package kotlinx.coroutines.flow @@ -50,7 +51,7 @@ public suspend inline fun Flow.fold( } /** - * Terminal operator, that awaits for one and only one value to be published. + * The terminal operator, that awaits for one and only one value to be published. * Throws [NoSuchElementException] for empty flow and [IllegalStateException] for flow * that contains more than one element. */ @@ -68,7 +69,7 @@ public suspend fun Flow.single(): T { } /** - * Terminal operator, that awaits for one and only one value to be published. + * The terminal operator, that awaits for one and only one value to be published. * Throws [IllegalStateException] for flow that contains more than one element. */ @FlowPreview @@ -81,3 +82,45 @@ public suspend fun Flow.singleOrNull(): T? { return result } + +/** + * The terminal operator that returns the first element emitted by the flow and then cancels flow's collection. + * Throws [NoSuchElementException] if the flow was empty. + */ +@FlowPreview +public suspend fun Flow.first(): T { + var result: Any? = NULL + try { + collect { value -> + result = value + throw AbortFlowException() + } + } catch (e: AbortFlowException) { + // Do nothing + } + + if (result === NULL) throw NoSuchElementException("Expected at least one element") + return result as T +} + +/** + * The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection. + * Throws [NoSuchElementException] if the flow has not contained elements matching the [predicate]. + */ +@FlowPreview +public suspend fun Flow.first(predicate: suspend (T) -> Boolean): T { + var result: Any? = NULL + try { + collect { value -> + if (predicate(value)) { + result = value + throw AbortFlowException() + } + } + } catch (e: AbortFlowException) { + // Do nothing + } + + if (result === NULL) throw NoSuchElementException("Expected at least one element matching the predicate $predicate") + return result as T +} diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt new file mode 100644 index 0000000000..d739f1a64f --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt @@ -0,0 +1,68 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlin.test.* + +class ScanTest : TestBase() { + @Test + fun testScan() = runTest { + val flow = flowOf(1, 2, 3, 4, 5) + val result = flow.scanReduce { acc, v -> acc + v }.toList() + assertEquals(listOf(1, 3, 6, 10, 15), result) + } + + @Test + fun testScanWithInitial() = runTest { + val flow = flowOf(1, 2, 3) + val result = flow.scan(emptyList()) { acc, value -> acc + value }.toList() + assertEquals(listOf(emptyList(), listOf(1), listOf(1, 2), listOf(1, 2, 3)), result) + } + + @Test + fun testNulls() = runTest { + val flow = flowOf(null, 2, null, null, null, 5) + val result = flow.scanReduce { acc, v -> if (v == null) acc else (if (acc == null) v else acc + v) }.toList() + assertEquals(listOf(null, 2, 2, 2, 2, 7), result) + } + + @Test + fun testEmptyFlow() = runTest { + val result = emptyFlow().scanReduce { _, _ -> 1 }.toList() + assertTrue(result.isEmpty()) + } + + @Test + fun testErrorCancelsUpstream() = runTest { + expect(1) + val latch = Channel() + val flow = flow { + coroutineScope { + launch { + latch.send(Unit) + hang { expect(3) } + } + emit(1) + emit(2) + } + }.scanReduce { _, value -> + expect(value) // 2 + latch.receive() + throw TestException() + }.onErrorCollect(emptyFlow()) + + assertEquals(1, flow.single()) + finish(4) + } + + public operator fun Collection.plus(element: T): List { + val result = ArrayList(size + 1) + result.addAll(this) + result.add(element) + return result + } +} diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt new file mode 100644 index 0000000000..e84d4c7b77 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt @@ -0,0 +1,86 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlin.test.* + +class FirstTest : TestBase() { + @Test + fun testFirst() = runTest { + val flow = flowOf(1, 2, 3) + assertEquals(1, flow.first()) + } + + @Test + fun testNulls() = runTest { + val flow = flowOf(null, 1) + assertNull(flow.first()) + assertNull(flow.first { it == null }) + assertEquals(1, flow.first { it != null }) + } + + @Test + fun testFirstWithPredicate() = runTest { + val flow = flowOf(1, 2, 3) + assertEquals(1, flow.first { it > 0 }) + assertEquals(2, flow.first { it > 1 }) + assertFailsWith { flow.first { it > 3 } } + } + + @Test + fun testFirstCancellation() = runTest { + val latch = Channel() + val flow = flow { + coroutineScope { + launch { + latch.send(Unit) + hang { expect(1) } + } + emit(1) + emit(2) + } + } + + + val result = flow.first { + latch.receive() + true + } + assertEquals(1, result) + finish(2) + } + + @Test + fun testEmptyFlow() = runTest { + assertFailsWith { emptyFlow().first() } + assertFailsWith { emptyFlow().first { true } } + } + + @Test + fun testErrorCancelsUpstream() = runTest { + val latch = Channel() + val flow = flow { + coroutineScope { + launch { + latch.send(Unit) + hang { expect(1) } + } + emit(1) + } + } + + assertFailsWith { + flow.first { + latch.receive() + throw TestException() + } + } + + assertEquals(1, flow.first()) + finish(2) + } +}