Skip to content
Snippets Groups Projects
Unverified Commit 336f4ef0 authored by Matthias Urhahn's avatar Matthias Urhahn Committed by GitHub
Browse files

Improve logging and data emission (DEV) (#1489)


* Cleaner log and remove superfluos removal, if we replace the entry anyways, we don't have to remove it.

* Only propagate changes if the data actually changed.

Co-authored-by: default avatarharambasicluka <64483219+harambasicluka@users.noreply.github.com>
parent 438868c5
No related branches found
No related tags found
No related merge requests found
...@@ -44,16 +44,13 @@ class DefaultCalculationTracker @Inject constructor( ...@@ -44,16 +44,13 @@ class DefaultCalculationTracker @Inject constructor(
flow<Unit> { flow<Unit> {
while (true) { while (true) {
hd.updateSafely { hd.updateSafely {
Timber.v("Running timeout check on: %s", values)
val timeNow = timeStamper.nowUTC val timeNow = timeStamper.nowUTC
Timber.v("Time now: %s", timeNow) Timber.v("Running timeout check (now=%s): %s", timeNow, values)
mutate { mutate {
values.filter { it.isCalculating }.toList().forEach { values.filter { it.isCalculating }.toList().forEach {
if (timeNow.isAfter(it.startedAt.plus(TIMEOUT_LIMIT))) { if (timeNow.isAfter(it.startedAt.plus(TIMEOUT_LIMIT))) {
Timber.w("Calculation timeout on %s", it) Timber.w("Calculation timeout on %s", it)
remove(it.identifier)
this[it.identifier] = it.copy( this[it.identifier] = it.copy(
finishedAt = timeStamper.nowUTC, finishedAt = timeStamper.nowUTC,
result = Result.TIMEOUT result = Result.TIMEOUT
......
...@@ -9,6 +9,7 @@ import kotlinx.coroutines.flow.SharingStarted ...@@ -9,6 +9,7 @@ import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.onStart
...@@ -51,6 +52,7 @@ class HotDataFlow<T : Any>( ...@@ -51,6 +52,7 @@ class HotDataFlow<T : Any>(
} }
val data: Flow<T> = internalFlow val data: Flow<T> = internalFlow
.distinctUntilChanged()
.onStart { Timber.tag(tag).v("internal onStart") } .onStart { Timber.tag(tag).v("internal onStart") }
.catch { .catch {
Timber.tag(tag).e(it, "internal Error") Timber.tag(tag).e(it, "internal Error")
......
...@@ -122,6 +122,32 @@ class HotDataFlowTest : BaseTest() { ...@@ -122,6 +122,32 @@ class HotDataFlowTest : BaseTest() {
coVerify(exactly = 1) { valueProvider.invoke(any()) } coVerify(exactly = 1) { valueProvider.invoke(any()) }
} }
@Test
fun `only emit new values if they actually changed updates`() {
val testScope = TestCoroutineScope()
val hotData = HotDataFlow(
loggingTag = "tag",
scope = testScope,
startValueProvider = { "1" },
sharingBehavior = SharingStarted.Lazily
)
val testCollector = hotData.data.test(startOnScope = testScope)
testCollector.silent = true
hotData.updateSafely { "1" }
hotData.updateSafely { "2" }
hotData.updateSafely { "2" }
hotData.updateSafely { "1" }
runBlocking {
testCollector.await { list, l -> list.size == 3 }
testCollector.latestValues shouldBe listOf("1", "2", "1")
}
}
@Test @Test
fun `multiple subscribers share the flow`() { fun `multiple subscribers share the flow`() {
val testScope = TestCoroutineScope() val testScope = TestCoroutineScope()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment