diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt index 359c2e948ed6215083176ba02a1635aa7bbdf50c..a52a476a6c9a10d6cd420d5cf0705e56295c8fe9 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt @@ -6,7 +6,6 @@ import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharingStarted -import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.distinctUntilChanged @@ -15,6 +14,8 @@ import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.shareIn import kotlinx.coroutines.plus +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import timber.log.Timber import kotlin.coroutines.CoroutineContext @@ -36,29 +37,35 @@ class HotDataFlow<T : Any>( extraBufferCapacity = Int.MAX_VALUE, onBufferOverflow = BufferOverflow.SUSPEND ) + private val valueGuard = Mutex() private val internalFlow = channelFlow { - var currentValue = startValueProvider().also { - Timber.tag(tag).v("startValue=%s", it) - send(it) + var currentValue = valueGuard.withLock { + startValueProvider().also { send(it) } } + Timber.tag(tag).v("startValue=%s", currentValue) - updateActions.collect { updateAction -> - currentValue = updateAction(currentValue).also { - currentValue = it - send(it) + updateActions + .onCompletion { + Timber.tag(tag).v("updateActions onCompletion -> resetReplayCache()") + updateActions.resetReplayCache() } - } + .collect { updateAction -> + currentValue = valueGuard.withLock { + updateAction(currentValue).also { send(it) } + } + } + + Timber.tag(tag).v("internal channelFlow finished.") } val data: Flow<T> = internalFlow .distinctUntilChanged() .onStart { Timber.tag(tag).v("internal onStart") } - .catch { - Timber.tag(tag).e(it, "internal Error") - throw it + .onCompletion { err -> + if (err != null) Timber.tag(tag).w(err, "internal onCompletion due to error") + else Timber.tag(tag).v("internal onCompletion") } - .onCompletion { Timber.tag(tag).v("internal onCompletion") } .shareIn( scope = scope + coroutineContext, replay = 1, diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt index 0a5702aa451a11d0064f51a1d9f97fa5e54ab376..0d6642641c839b32e302ee81304897a82ec3cdb0 100644 --- a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt @@ -1,5 +1,6 @@ package de.rki.coronawarnapp.util.flow +import de.rki.coronawarnapp.util.mutate import io.kotest.matchers.shouldBe import io.kotest.matchers.types.instanceOf import io.mockk.coEvery @@ -122,6 +123,48 @@ class HotDataFlowTest : BaseTest() { coVerify(exactly = 1) { valueProvider.invoke(any()) } } + data class TestData( + val number: Long = 1 + ) + + @Test + fun `check multi threading value updates with more complex data`() { + val testScope = TestCoroutineScope() + val valueProvider = mockk<suspend CoroutineScope.() -> Map<String, TestData>>() + coEvery { valueProvider.invoke(any()) } returns mapOf("data" to TestData()) + + val hotData = HotDataFlow( + loggingTag = "tag", + scope = testScope, + startValueProvider = valueProvider, + sharingBehavior = SharingStarted.Lazily + ) + + val testCollector = hotData.data.test(startOnScope = testScope) + testCollector.silent = true + + (1..10).forEach { _ -> + thread { + (1..400).forEach { _ -> + hotData.updateSafely { + mutate { + this["data"] = getValue("data").copy( + number = getValue("data").number + 1 + ) + } + } + } + } + } + + runBlocking { + testCollector.await { list, l -> list.size == 4001 } + testCollector.latestValues.map { it.values.single().number } shouldBe (1L..4001L).toList() + } + + coVerify(exactly = 1) { valueProvider.invoke(any()) } + } + @Test fun `only emit new values if they actually changed updates`() { val testScope = TestCoroutineScope() @@ -163,9 +206,9 @@ class HotDataFlowTest : BaseTest() { testScope.runBlockingTest2(permanentJobs = true) { - val sub1 = hotData.data.test().start(scope = this) - val sub2 = hotData.data.test().start(scope = this) - val sub3 = hotData.data.test().start(scope = this) + val sub1 = hotData.data.test(tag = "sub1", startOnScope = this) + val sub2 = hotData.data.test(tag = "sub2", startOnScope = this) + val sub3 = hotData.data.test(tag = "sub3", startOnScope = this) hotData.updateSafely { "A" } hotData.updateSafely { "B" } @@ -181,4 +224,47 @@ class HotDataFlowTest : BaseTest() { } coVerify(exactly = 1) { valueProvider.invoke(any()) } } + + @Test + fun `update queue is wiped on completion`() = runBlockingTest2(permanentJobs = true) { + val valueProvider = mockk<suspend CoroutineScope.() -> Long>() + coEvery { valueProvider.invoke(any()) } returns 1 + + val hotData = HotDataFlow( + loggingTag = "tag", + scope = this, + coroutineContext = this.coroutineContext, + startValueProvider = valueProvider, + sharingBehavior = SharingStarted.WhileSubscribed(replayExpirationMillis = 0) + ) + + val testCollector1 = hotData.data.test(tag = "collector1", startOnScope = this) + testCollector1.silent = false + + (1..10).forEach { _ -> + hotData.updateSafely { + this + 1L + } + } + + advanceUntilIdle() + + testCollector1.await { list, _ -> list.size == 11 } + testCollector1.latestValues shouldBe (1L..11L).toList() + + testCollector1.cancel() + testCollector1.awaitFinal() + + val testCollector2 = hotData.data.test(tag = "collector2", startOnScope = this) + testCollector2.silent = false + + advanceUntilIdle() + + testCollector2.cancel() + testCollector2.awaitFinal() + + testCollector2.latestValues shouldBe listOf(1L) + + coVerify(exactly = 2) { valueProvider.invoke(any()) } + } } diff --git a/Corona-Warn-App/src/test/java/testhelpers/coroutines/FlowTest.kt b/Corona-Warn-App/src/test/java/testhelpers/coroutines/FlowTest.kt index 975a8c48dd522f417915c601cca231fea22e0bf2..d874d9ea36af77611d62841ad35a3bfc5fd308e7 100644 --- a/Corona-Warn-App/src/test/java/testhelpers/coroutines/FlowTest.kt +++ b/Corona-Warn-App/src/test/java/testhelpers/coroutines/FlowTest.kt @@ -16,14 +16,17 @@ import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.withTimeout +import org.joda.time.Duration import timber.log.Timber fun <T> Flow<T>.test( tag: String? = null, - startOnScope: CoroutineScope -): TestCollector<T> = test(tag ?: "FlowTest").start(scope = startOnScope) + startOnScope: CoroutineScope = TestCoroutineScope() +): TestCollector<T> = createTest(tag ?: "FlowTest").start(scope = startOnScope) -fun <T> Flow<T>.test( +fun <T> Flow<T>.createTest( tag: String? = null ): TestCollector<T> = TestCollector(this, tag ?: "FlowTest") @@ -74,9 +77,14 @@ class TestCollector<T>( val latestValues: List<T> get() = collectedValues - fun await(condition: (List<T>, T) -> Boolean): T = runBlocking { - emissions().first { - condition(collectedValues, it) + fun await( + timeout: Duration = Duration.standardSeconds(10), + condition: (List<T>, T) -> Boolean + ): T = runBlocking { + withTimeout(timeMillis = timeout.millis) { + emissions().first { + condition(collectedValues, it) + } } } @@ -94,6 +102,8 @@ class TestCollector<T>( } fun cancel() { + if (job.isCompleted) throw IllegalStateException("Flow is already canceled.") + runBlocking { job.cancelAndJoin() }