From 336f4ef09a9b2b00644912523cb891f381c7c3ec Mon Sep 17 00:00:00 2001 From: Matthias Urhahn <matthias.urhahn@sap.com> Date: Wed, 28 Oct 2020 09:56:59 +0100 Subject: [PATCH] Improve logging and data emission (DEV) (#1489) * Cleaner log and remove superfluos removal, if we replace the entry anyways, we don't have to remove it. * Only propagate changes if the data actually changed. Co-authored-by: harambasicluka <64483219+harambasicluka@users.noreply.github.com> --- .../DefaultCalculationTracker.kt | 5 +--- .../coronawarnapp/util/flow/HotDataFlow.kt | 2 ++ .../util/flow/HotDataFlowTest.kt | 26 +++++++++++++++++++ 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/DefaultCalculationTracker.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/DefaultCalculationTracker.kt index 97e1eb95d..c94eea81e 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/DefaultCalculationTracker.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/DefaultCalculationTracker.kt @@ -44,16 +44,13 @@ class DefaultCalculationTracker @Inject constructor( flow<Unit> { while (true) { hd.updateSafely { - Timber.v("Running timeout check on: %s", values) - val timeNow = timeStamper.nowUTC - Timber.v("Time now: %s", timeNow) + Timber.v("Running timeout check (now=%s): %s", timeNow, values) mutate { values.filter { it.isCalculating }.toList().forEach { if (timeNow.isAfter(it.startedAt.plus(TIMEOUT_LIMIT))) { Timber.w("Calculation timeout on %s", it) - remove(it.identifier) this[it.identifier] = it.copy( finishedAt = timeStamper.nowUTC, result = Result.TIMEOUT diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt index 217a31bf4..359c2e948 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onStart @@ -51,6 +52,7 @@ class HotDataFlow<T : Any>( } val data: Flow<T> = internalFlow + .distinctUntilChanged() .onStart { Timber.tag(tag).v("internal onStart") } .catch { Timber.tag(tag).e(it, "internal Error") diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt index f50fa2763..0a5702aa4 100644 --- a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt @@ -122,6 +122,32 @@ class HotDataFlowTest : BaseTest() { coVerify(exactly = 1) { valueProvider.invoke(any()) } } + @Test + fun `only emit new values if they actually changed updates`() { + val testScope = TestCoroutineScope() + + val hotData = HotDataFlow( + loggingTag = "tag", + scope = testScope, + startValueProvider = { "1" }, + sharingBehavior = SharingStarted.Lazily + ) + + val testCollector = hotData.data.test(startOnScope = testScope) + testCollector.silent = true + + hotData.updateSafely { "1" } + hotData.updateSafely { "2" } + hotData.updateSafely { "2" } + hotData.updateSafely { "1" } + + + runBlocking { + testCollector.await { list, l -> list.size == 3 } + testCollector.latestValues shouldBe listOf("1", "2", "1") + } + } + @Test fun `multiple subscribers share the flow`() { val testScope = TestCoroutineScope() -- GitLab