Skip to content

Commit

Permalink
Scan and emitAll operators
Browse files Browse the repository at this point in the history
Fixes #1094
  • Loading branch information
qwwdfsad committed Jun 5, 2019
1 parent 14f2f38 commit 33befbf
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun distinctUntilChangedBy (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun drop (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static final fun dropWhile (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun emitAll (Lkotlinx/coroutines/flow/FlowCollector;Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun emptyFlow ()Lkotlinx/coroutines/flow/Flow;
public static final fun filter (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun filterNot (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
Expand Down Expand Up @@ -853,6 +854,8 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun sample (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun scan (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun scan (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun single (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun singleOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun switchMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
Expand All @@ -872,13 +875,15 @@ public final class kotlinx/coroutines/flow/MigrationKt {
public static final fun BehaviourSubject ()Ljava/lang/Object;
public static final fun PublishSubject ()Ljava/lang/Object;
public static final fun ReplaySubject ()Ljava/lang/Object;
public static final fun compose (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun concatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatten (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
public static final fun onErrorResume (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun publishOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
public static final fun skip (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;)V
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)V
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)V
Expand Down
10 changes: 10 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/Migration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,13 @@ public fun <T> Flow<Flow<T>>.flatten(): Flow<T> = error("Should not be called")
replaceWith = ReplaceWith("let(transformer)")
)
public fun <T, R> Flow<T>.compose(transformer: Flow<T>.() -> Flow<R>): Flow<R> = error("Should not be called")

/**
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Kotlin analogue of 'skip' is 'drop'",
replaceWith = ReplaceWith("drop(count)")
)
public fun <T> Flow<T>.skip(count: Int): Flow<T> = error("Should not be called")
4 changes: 1 addition & 3 deletions kotlinx-coroutines-core/common/src/flow/operators/Errors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ public fun <T> Flow<T>.onErrorCollect(
predicate: ExceptionPredicate = ALWAYS_TRUE
): Flow<T> = collectSafely { e ->
if (!predicate(e)) throw e
fallback.collect { value ->
emit(value)
}
emitAll(fallback)
}

/**
Expand Down
10 changes: 2 additions & 8 deletions kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,7 @@ public fun <T, R> Flow<T>.flatMapMerge(
*/
@FlowPreview
public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
collect { value ->
value.collect { innerValue ->
emit(innerValue)
}
}
collect { value -> emitAll(value) }
}

/**
Expand Down Expand Up @@ -137,9 +133,7 @@ public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): F
previousFlow?.cancelAndJoin()
// Undispatched to have better user experience in case of synchronous flows
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
transform(value).collect { innerValue ->
emit(innerValue)
}
emitAll(transform(value))
}
}
}
Expand Down
43 changes: 43 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/operators/Transform.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

@file:JvmMultifileClass
@file:JvmName("FlowKt")
@file:Suppress("UNCHECKED_CAST")

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.internal.NullSurrogate
import kotlin.jvm.*
import kotlinx.coroutines.flow.unsafeFlow as flow

Expand Down Expand Up @@ -97,3 +99,44 @@ public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = flow {
emit(value)
}
}

/**
* Reduces the given flow with [operation], emitting every intermediate result, including initial value.
* The first element is takes as initial value for operation accumulator.
* For example:
* ```
* flowOf(1, 2, 3, 4).scan { (v1, v2) -> v1 + v2 }.toList()
* ```
* will produce `[1, 3, 6, 10]`
*/
@FlowPreview
public fun <T> Flow<T>.scan(operation: suspend (accumulator: T, value: T) -> T): Flow<T> = flow {
var accumulator: Any? = NullSurrogate
collect { value ->
accumulator = if (accumulator === NullSurrogate) {
value
} else {
operation(accumulator as T, value)
}
emit(accumulator as T)
}
}

/**
* Reduces the given flow with [operation], emitting every intermediate result, including initial value.
* An initial value is provided lazily by [initialSupplier] and is always immediately emitted.
* For example:
* ```
* flowOf(1, 2, 3).scan(::emptyList) { acc: List<Int>, value -> acc + value }.toList()
* ```
* will produce `[], [1], [1, 2], [1, 2, 3]]`.
*/
@FlowPreview
public fun <T, R> Flow<T>.scan(initialSupplier: () -> R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = flow {
var accumulator: R = initialSupplier()
emit(accumulator)
collect { value ->
accumulator = operation(accumulator, value)
emit(accumulator)
}
}
6 changes: 6 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,9 @@ public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})

/**
* Collects all the values from the given [flow] and emits them to the collector.
* Shortcut for `flow.collect { value -> emit(value) }`.
*/
public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>) = flow.collect { value -> emit(value) }
68 changes: 68 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.test.*

class ScanTest : TestBase() {
@Test
fun testScan() = runTest {
val flow = flowOf(1, 2, 3, 4, 5)
val result = flow.scan { acc, v -> acc + v }.toList()
assertEquals(listOf(1, 3, 6, 10, 15), result)
}

@Test
fun testScanWithInitial() = runTest {
val flow = flowOf(1, 2, 3)
val result = flow.scan(::emptyList) { acc: List<Int>, value -> acc + value }.toList()
assertEquals(listOf(emptyList(), listOf(1), listOf(1, 2), listOf(1, 2, 3)), result)
}

@Test
fun testNulls() = runTest {
val flow = flowOf(null, 2, null, null, null, 5)
val result = flow.scan { acc, v -> if (v == null) acc else (if (acc == null) v else acc + v) }.toList()
assertEquals(listOf(null, 2, 2, 2, 2, 7), result)
}

@Test
fun testEmptyFlow() = runTest {
val result = emptyFlow<Int>().scan { _, _ -> 1 }.toList()
assertTrue(result.isEmpty())
}

@Test
fun testErrorCancelsUpstream() = runTest {
expect(1)
val latch = Channel<Unit>()
val flow = flow {
coroutineScope {
launch {
latch.send(Unit)
hang { expect(3) }
}
emit(1)
emit(2)
}
}.scan { _, value ->
expect(value) // 2
latch.receive()
throw TestException()
}.onErrorCollect(emptyFlow())

assertEquals(1, flow.single())
finish(4)
}

public operator fun <T> Collection<T>.plus(element: T): List<T> {
val result = ArrayList<T>(size + 1)
result.addAll(this)
result.add(element)
return result
}
}

0 comments on commit 33befbf

Please sign in to comment.