Skip to content
Snippets Groups Projects
Unverified Commit 0f866e50 authored by Matthias Urhahn's avatar Matthias Urhahn Committed by GitHub
Browse files

Improve HotDataFlow behavior (EXPOSUREAPP-3777) (#1612)

* Fix HotDataFlow behavior.
* Prevent re-execution of past submissions when the observable goes cold.
* Guard internal value updates with a mutex in case the value update is not as thread-safe as we think it is.

* Fix problematic testcases.
parent a6335e7f
No related branches found
No related tags found
No related merge requests found
...@@ -6,7 +6,6 @@ import kotlinx.coroutines.channels.BufferOverflow ...@@ -6,7 +6,6 @@ import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.distinctUntilChanged
...@@ -15,6 +14,8 @@ import kotlinx.coroutines.flow.onCompletion ...@@ -15,6 +14,8 @@ import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.shareIn import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.plus import kotlinx.coroutines.plus
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import timber.log.Timber import timber.log.Timber
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
...@@ -36,29 +37,35 @@ class HotDataFlow<T : Any>( ...@@ -36,29 +37,35 @@ class HotDataFlow<T : Any>(
extraBufferCapacity = Int.MAX_VALUE, extraBufferCapacity = Int.MAX_VALUE,
onBufferOverflow = BufferOverflow.SUSPEND onBufferOverflow = BufferOverflow.SUSPEND
) )
private val valueGuard = Mutex()
private val internalFlow = channelFlow { private val internalFlow = channelFlow {
var currentValue = startValueProvider().also { var currentValue = valueGuard.withLock {
Timber.tag(tag).v("startValue=%s", it) startValueProvider().also { send(it) }
send(it)
} }
Timber.tag(tag).v("startValue=%s", currentValue)
updateActions.collect { updateAction -> updateActions
currentValue = updateAction(currentValue).also { .onCompletion {
currentValue = it Timber.tag(tag).v("updateActions onCompletion -> resetReplayCache()")
send(it) updateActions.resetReplayCache()
} }
} .collect { updateAction ->
currentValue = valueGuard.withLock {
updateAction(currentValue).also { send(it) }
}
}
Timber.tag(tag).v("internal channelFlow finished.")
} }
val data: Flow<T> = internalFlow val data: Flow<T> = internalFlow
.distinctUntilChanged() .distinctUntilChanged()
.onStart { Timber.tag(tag).v("internal onStart") } .onStart { Timber.tag(tag).v("internal onStart") }
.catch { .onCompletion { err ->
Timber.tag(tag).e(it, "internal Error") if (err != null) Timber.tag(tag).w(err, "internal onCompletion due to error")
throw it else Timber.tag(tag).v("internal onCompletion")
} }
.onCompletion { Timber.tag(tag).v("internal onCompletion") }
.shareIn( .shareIn(
scope = scope + coroutineContext, scope = scope + coroutineContext,
replay = 1, replay = 1,
......
package de.rki.coronawarnapp.util.flow package de.rki.coronawarnapp.util.flow
import de.rki.coronawarnapp.util.mutate
import io.kotest.matchers.shouldBe import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.instanceOf import io.kotest.matchers.types.instanceOf
import io.mockk.coEvery import io.mockk.coEvery
...@@ -122,6 +123,48 @@ class HotDataFlowTest : BaseTest() { ...@@ -122,6 +123,48 @@ class HotDataFlowTest : BaseTest() {
coVerify(exactly = 1) { valueProvider.invoke(any()) } coVerify(exactly = 1) { valueProvider.invoke(any()) }
} }
data class TestData(
val number: Long = 1
)
@Test
fun `check multi threading value updates with more complex data`() {
val testScope = TestCoroutineScope()
val valueProvider = mockk<suspend CoroutineScope.() -> Map<String, TestData>>()
coEvery { valueProvider.invoke(any()) } returns mapOf("data" to TestData())
val hotData = HotDataFlow(
loggingTag = "tag",
scope = testScope,
startValueProvider = valueProvider,
sharingBehavior = SharingStarted.Lazily
)
val testCollector = hotData.data.test(startOnScope = testScope)
testCollector.silent = true
(1..10).forEach { _ ->
thread {
(1..400).forEach { _ ->
hotData.updateSafely {
mutate {
this["data"] = getValue("data").copy(
number = getValue("data").number + 1
)
}
}
}
}
}
runBlocking {
testCollector.await { list, l -> list.size == 4001 }
testCollector.latestValues.map { it.values.single().number } shouldBe (1L..4001L).toList()
}
coVerify(exactly = 1) { valueProvider.invoke(any()) }
}
@Test @Test
fun `only emit new values if they actually changed updates`() { fun `only emit new values if they actually changed updates`() {
val testScope = TestCoroutineScope() val testScope = TestCoroutineScope()
...@@ -163,9 +206,9 @@ class HotDataFlowTest : BaseTest() { ...@@ -163,9 +206,9 @@ class HotDataFlowTest : BaseTest() {
testScope.runBlockingTest2(permanentJobs = true) { testScope.runBlockingTest2(permanentJobs = true) {
val sub1 = hotData.data.test().start(scope = this) val sub1 = hotData.data.test(tag = "sub1", startOnScope = this)
val sub2 = hotData.data.test().start(scope = this) val sub2 = hotData.data.test(tag = "sub2", startOnScope = this)
val sub3 = hotData.data.test().start(scope = this) val sub3 = hotData.data.test(tag = "sub3", startOnScope = this)
hotData.updateSafely { "A" } hotData.updateSafely { "A" }
hotData.updateSafely { "B" } hotData.updateSafely { "B" }
...@@ -181,4 +224,47 @@ class HotDataFlowTest : BaseTest() { ...@@ -181,4 +224,47 @@ class HotDataFlowTest : BaseTest() {
} }
coVerify(exactly = 1) { valueProvider.invoke(any()) } coVerify(exactly = 1) { valueProvider.invoke(any()) }
} }
@Test
fun `update queue is wiped on completion`() = runBlockingTest2(permanentJobs = true) {
val valueProvider = mockk<suspend CoroutineScope.() -> Long>()
coEvery { valueProvider.invoke(any()) } returns 1
val hotData = HotDataFlow(
loggingTag = "tag",
scope = this,
coroutineContext = this.coroutineContext,
startValueProvider = valueProvider,
sharingBehavior = SharingStarted.WhileSubscribed(replayExpirationMillis = 0)
)
val testCollector1 = hotData.data.test(tag = "collector1", startOnScope = this)
testCollector1.silent = false
(1..10).forEach { _ ->
hotData.updateSafely {
this + 1L
}
}
advanceUntilIdle()
testCollector1.await { list, _ -> list.size == 11 }
testCollector1.latestValues shouldBe (1L..11L).toList()
testCollector1.cancel()
testCollector1.awaitFinal()
val testCollector2 = hotData.data.test(tag = "collector2", startOnScope = this)
testCollector2.silent = false
advanceUntilIdle()
testCollector2.cancel()
testCollector2.awaitFinal()
testCollector2.latestValues shouldBe listOf(1L)
coVerify(exactly = 2) { valueProvider.invoke(any()) }
}
} }
...@@ -16,14 +16,17 @@ import kotlinx.coroutines.flow.onStart ...@@ -16,14 +16,17 @@ import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.withTimeout
import org.joda.time.Duration
import timber.log.Timber import timber.log.Timber
fun <T> Flow<T>.test( fun <T> Flow<T>.test(
tag: String? = null, tag: String? = null,
startOnScope: CoroutineScope startOnScope: CoroutineScope = TestCoroutineScope()
): TestCollector<T> = test(tag ?: "FlowTest").start(scope = startOnScope) ): TestCollector<T> = createTest(tag ?: "FlowTest").start(scope = startOnScope)
fun <T> Flow<T>.test( fun <T> Flow<T>.createTest(
tag: String? = null tag: String? = null
): TestCollector<T> = TestCollector(this, tag ?: "FlowTest") ): TestCollector<T> = TestCollector(this, tag ?: "FlowTest")
...@@ -74,9 +77,14 @@ class TestCollector<T>( ...@@ -74,9 +77,14 @@ class TestCollector<T>(
val latestValues: List<T> val latestValues: List<T>
get() = collectedValues get() = collectedValues
fun await(condition: (List<T>, T) -> Boolean): T = runBlocking { fun await(
emissions().first { timeout: Duration = Duration.standardSeconds(10),
condition(collectedValues, it) condition: (List<T>, T) -> Boolean
): T = runBlocking {
withTimeout(timeMillis = timeout.millis) {
emissions().first {
condition(collectedValues, it)
}
} }
} }
...@@ -94,6 +102,8 @@ class TestCollector<T>( ...@@ -94,6 +102,8 @@ class TestCollector<T>(
} }
fun cancel() { fun cancel() {
if (job.isCompleted) throw IllegalStateException("Flow is already canceled.")
runBlocking { runBlocking {
job.cancelAndJoin() job.cancelAndJoin()
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment