diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/DefaultCalculationTracker.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/DefaultCalculationTracker.kt index 97e1eb95d6a6ec2da54069f6983b776513953016..c94eea81e2e5696c44fc8e76d80e837d02151c8d 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/DefaultCalculationTracker.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/DefaultCalculationTracker.kt @@ -44,16 +44,13 @@ class DefaultCalculationTracker @Inject constructor( flow<Unit> { while (true) { hd.updateSafely { - Timber.v("Running timeout check on: %s", values) - val timeNow = timeStamper.nowUTC - Timber.v("Time now: %s", timeNow) + Timber.v("Running timeout check (now=%s): %s", timeNow, values) mutate { values.filter { it.isCalculating }.toList().forEach { if (timeNow.isAfter(it.startedAt.plus(TIMEOUT_LIMIT))) { Timber.w("Calculation timeout on %s", it) - remove(it.identifier) this[it.identifier] = it.copy( finishedAt = timeStamper.nowUTC, result = Result.TIMEOUT diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt index 217a31bf47263979e04959a712343ecd40e1b7f7..359c2e948ed6215083176ba02a1635aa7bbdf50c 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onStart @@ -51,6 +52,7 @@ class HotDataFlow<T : Any>( } val data: Flow<T> = internalFlow + .distinctUntilChanged() .onStart { Timber.tag(tag).v("internal onStart") } .catch { Timber.tag(tag).e(it, "internal Error") diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt index f50fa2763509c8ee289df064541a909f9b1539d7..0a5702aa451a11d0064f51a1d9f97fa5e54ab376 100644 --- a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt @@ -122,6 +122,32 @@ class HotDataFlowTest : BaseTest() { coVerify(exactly = 1) { valueProvider.invoke(any()) } } + @Test + fun `only emit new values if they actually changed updates`() { + val testScope = TestCoroutineScope() + + val hotData = HotDataFlow( + loggingTag = "tag", + scope = testScope, + startValueProvider = { "1" }, + sharingBehavior = SharingStarted.Lazily + ) + + val testCollector = hotData.data.test(startOnScope = testScope) + testCollector.silent = true + + hotData.updateSafely { "1" } + hotData.updateSafely { "2" } + hotData.updateSafely { "2" } + hotData.updateSafely { "1" } + + + runBlocking { + testCollector.await { list, l -> list.size == 3 } + testCollector.latestValues shouldBe listOf("1", "2", "1") + } + } + @Test fun `multiple subscribers share the flow`() { val testScope = TestCoroutineScope()