diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/nearby/modules/detectiontracker/ExposureDetectionTrackerStorage.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/nearby/modules/detectiontracker/ExposureDetectionTrackerStorage.kt index 01e93c4b9ab724a44dc953b0466c2b6a7b366c31..c4299cafe56f85562c86d276a10d910bb5e39287 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/nearby/modules/detectiontracker/ExposureDetectionTrackerStorage.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/nearby/modules/detectiontracker/ExposureDetectionTrackerStorage.kt @@ -2,6 +2,8 @@ package de.rki.coronawarnapp.nearby.modules.detectiontracker import android.content.Context import com.google.gson.Gson +import de.rki.coronawarnapp.exception.ExceptionCategory +import de.rki.coronawarnapp.exception.reporting.report import de.rki.coronawarnapp.util.di.AppContext import de.rki.coronawarnapp.util.serialization.BaseGson import de.rki.coronawarnapp.util.serialization.fromJson @@ -44,11 +46,13 @@ class ExposureDetectionTrackerStorage @Inject constructor( if (!storageFile.exists()) return@withLock emptyMap() gson.fromJson<Map<String, TrackedExposureDetection>>(storageFile).also { + require(it.size >= 0) Timber.v("Loaded detection data: %s", it) lastCalcuationData = it } } catch (e: Exception) { Timber.e(e, "Failed to load tracked detections.") + if (storageFile.delete()) Timber.w("Storage file was deleted.") emptyMap() } } @@ -63,6 +67,7 @@ class ExposureDetectionTrackerStorage @Inject constructor( gson.toJson(data, storageFile) } catch (e: Exception) { Timber.e(e, "Failed to save tracked detections.") + e.report(ExceptionCategory.INTERNAL) } } } diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt index 91086980e2bd33336558e889a15272962e37bd83..1fe9db2d964a52c0bc216428d4a5e6fe1ec415c4 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt @@ -15,6 +15,8 @@ import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.shareIn import kotlinx.coroutines.plus +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import timber.log.Timber import kotlin.coroutines.CoroutineContext @@ -37,20 +39,32 @@ class HotDataFlow<T : Any>( extraBufferCapacity = Int.MAX_VALUE, onBufferOverflow = BufferOverflow.SUSPEND ) + private val valueGuard = Mutex() private val internalProducer: Flow<Holder<T>> = channelFlow { - var currentValue = startValueProvider().also { - Timber.tag(tag).v("startValue=%s", it) - val updatedBy: suspend T.() -> T = { it } - send(Holder.Data(value = it, updatedBy = updatedBy)) + var currentValue = valueGuard.withLock { + startValueProvider().also { + Timber.tag(tag).v("startValue=%s", it) + val updatedBy: suspend T.() -> T = { it } + send(Holder.Data(value = it, updatedBy = updatedBy)) + } } + Timber.tag(tag).v("startValue=%s", currentValue) - updateActions.collect { updateAction -> - currentValue = updateAction(currentValue).also { - currentValue = it - send(Holder.Data(value = it, updatedBy = updateAction)) + updateActions + .onCompletion { + Timber.tag(tag).v("updateActions onCompletion -> resetReplayCache()") + updateActions.resetReplayCache() } - } + .collect { updateAction -> + currentValue = valueGuard.withLock { + updateAction(currentValue).also { + send(Holder.Data(value = it, updatedBy = updateAction)) + } + } + } + + Timber.tag(tag).v("internal channelFlow finished.") } private val internalFlow = internalProducer @@ -65,7 +79,10 @@ class HotDataFlow<T : Any>( throw it } } - .onCompletion { Timber.tag(tag).v("Internal onCompletion") } + .onCompletion { err -> + if (err != null) Timber.tag(tag).w(err, "internal onCompletion due to error") + else Timber.tag(tag).v("internal onCompletion") + } .shareIn( scope = scope + coroutineContext, replay = 1, diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/serialization/GsonExtensions.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/serialization/GsonExtensions.kt index 601f833c3ded2777122e62d6814e07edff3380a3..f014dc54c28ca112b2a49db88917add6c83fedca 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/serialization/GsonExtensions.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/serialization/GsonExtensions.kt @@ -11,12 +11,13 @@ inline fun <reified T> Gson.fromJson(json: String): T = fromJson( object : TypeToken<T>() {}.type ) -inline fun <reified T> Gson.fromJson(file: File): T = file.reader().use { +inline fun <reified T> Gson.fromJson(file: File): T = file.bufferedReader().use { fromJson(it, object : TypeToken<T>() {}.type) } -inline fun <reified T> Gson.toJson(data: T, file: File) = file.writer().use { writer -> +inline fun <reified T> Gson.toJson(data: T, file: File) = file.bufferedWriter().use { writer -> toJson(data, writer) + writer.flush() } fun <T : Any> KClass<T>.getDefaultGsonTypeAdapter(): TypeAdapter<T> = Gson().getAdapter(this.java) diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/nearby/modules/detectiontracker/ExposureDetectionTrackerStorageTest.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/nearby/modules/detectiontracker/ExposureDetectionTrackerStorageTest.kt index fee9f63fa4177ddbb0fc6819d6788fe67535c514..702291ceaeaacdea384fe8f930a56caec0590ede 100644 --- a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/nearby/modules/detectiontracker/ExposureDetectionTrackerStorageTest.kt +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/nearby/modules/detectiontracker/ExposureDetectionTrackerStorageTest.kt @@ -130,4 +130,20 @@ class ExposureDetectionTrackerStorageTest : BaseIOTest() { storedData.getValue("b2b98400-058d-43e6-b952-529a5255248b").isCalculating shouldBe true storedData.getValue("aeb15509-fb34-42ce-8795-7a9ae0c2f389").isCalculating shouldBe false } + + @Test + fun `we catch empty json data and prevent unsafely initialized maps`() = runBlockingTest { + storageDir.mkdirs() + storageFile.writeText("") + + storageFile.exists() shouldBe true + + createStorage().apply { + val value = load() + value.size shouldBe 0 + value shouldBe emptyMap() + + storageFile.exists() shouldBe false + } + } } diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt index 3de7010f7bf7105aa25a8738dbcd6772f0c140b4..e0887ab672ef278b57116e133611cb128f10233d 100644 --- a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt @@ -1,6 +1,7 @@ package de.rki.coronawarnapp.util.flow import io.kotest.assertions.throwables.shouldThrow +import de.rki.coronawarnapp.util.mutate import io.kotest.matchers.shouldBe import io.kotest.matchers.types.instanceOf import io.mockk.coEvery @@ -150,6 +151,48 @@ class HotDataFlowTest : BaseTest() { coVerify(exactly = 1) { valueProvider.invoke(any()) } } + data class TestData( + val number: Long = 1 + ) + + @Test + fun `check multi threading value updates with more complex data`() { + val testScope = TestCoroutineScope() + val valueProvider = mockk<suspend CoroutineScope.() -> Map<String, TestData>>() + coEvery { valueProvider.invoke(any()) } returns mapOf("data" to TestData()) + + val hotData = HotDataFlow( + loggingTag = "tag", + scope = testScope, + startValueProvider = valueProvider, + sharingBehavior = SharingStarted.Lazily + ) + + val testCollector = hotData.data.test(startOnScope = testScope) + testCollector.silent = true + + (1..10).forEach { _ -> + thread { + (1..400).forEach { _ -> + hotData.updateSafely { + mutate { + this["data"] = getValue("data").copy( + number = getValue("data").number + 1 + ) + } + } + } + } + } + + runBlocking { + testCollector.await { list, l -> list.size == 4001 } + testCollector.latestValues.map { it.values.single().number } shouldBe (1L..4001L).toList() + } + + coVerify(exactly = 1) { valueProvider.invoke(any()) } + } + @Test fun `only emit new values if they actually changed updates`() { val testScope = TestCoroutineScope() @@ -189,9 +232,9 @@ class HotDataFlowTest : BaseTest() { ) testScope.runBlockingTest2(permanentJobs = true) { - val sub1 = hotData.data.test().start(scope = this) - val sub2 = hotData.data.test().start(scope = this) - val sub3 = hotData.data.test().start(scope = this) + val sub1 = hotData.data.test(tag = "sub1", startOnScope = this) + val sub2 = hotData.data.test(tag = "sub2", startOnScope = this) + val sub3 = hotData.data.test(tag = "sub3", startOnScope = this) hotData.updateSafely { "A" } hotData.updateSafely { "B" } @@ -208,6 +251,49 @@ class HotDataFlowTest : BaseTest() { coVerify(exactly = 1) { valueProvider.invoke(any()) } } + @Test + fun `update queue is wiped on completion`() = runBlockingTest2(permanentJobs = true) { + val valueProvider = mockk<suspend CoroutineScope.() -> Long>() + coEvery { valueProvider.invoke(any()) } returns 1 + + val hotData = HotDataFlow( + loggingTag = "tag", + scope = this, + coroutineContext = this.coroutineContext, + startValueProvider = valueProvider, + sharingBehavior = SharingStarted.WhileSubscribed(replayExpirationMillis = 0) + ) + + val testCollector1 = hotData.data.test(tag = "collector1", startOnScope = this) + testCollector1.silent = false + + (1..10).forEach { _ -> + hotData.updateSafely { + this + 1L + } + } + + advanceUntilIdle() + + testCollector1.await { list, _ -> list.size == 11 } + testCollector1.latestValues shouldBe (1L..11L).toList() + + testCollector1.cancel() + testCollector1.awaitFinal() + + val testCollector2 = hotData.data.test(tag = "collector2", startOnScope = this) + testCollector2.silent = false + + advanceUntilIdle() + + testCollector2.cancel() + testCollector2.awaitFinal() + + testCollector2.latestValues shouldBe listOf(1L) + + coVerify(exactly = 2) { valueProvider.invoke(any()) } + } + @Test fun `blocking update is actually blocking`() = runBlocking { val testScope = TestCoroutineScope() diff --git a/Corona-Warn-App/src/test/java/testhelpers/coroutines/FlowTest.kt b/Corona-Warn-App/src/test/java/testhelpers/coroutines/FlowTest.kt index 3fc56ce9a503dcd65c2afb6f91e657557776c2d8..5a19fa308a2f5ec037373528c26e0ea099810aa1 100644 --- a/Corona-Warn-App/src/test/java/testhelpers/coroutines/FlowTest.kt +++ b/Corona-Warn-App/src/test/java/testhelpers/coroutines/FlowTest.kt @@ -16,14 +16,17 @@ import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.withTimeout +import org.joda.time.Duration import timber.log.Timber fun <T> Flow<T>.test( tag: String? = null, - startOnScope: CoroutineScope -): TestCollector<T> = test(tag ?: "FlowTest").start(scope = startOnScope) + startOnScope: CoroutineScope = TestCoroutineScope() +): TestCollector<T> = createTest(tag ?: "FlowTest").start(scope = startOnScope) -fun <T> Flow<T>.test( +fun <T> Flow<T>.createTest( tag: String? = null ): TestCollector<T> = TestCollector(this, tag ?: "FlowTest") @@ -74,9 +77,14 @@ class TestCollector<T>( val latestValues: List<T> get() = collectedValues - fun await(condition: (List<T>, T) -> Boolean): T = runBlocking { - emissions().first { - condition(collectedValues, it) + fun await( + timeout: Duration = Duration.standardSeconds(10), + condition: (List<T>, T) -> Boolean + ): T = runBlocking { + withTimeout(timeMillis = timeout.millis) { + emissions().first { + condition(collectedValues, it) + } } } @@ -95,6 +103,8 @@ class TestCollector<T>( } fun cancel() { + if (job.isCompleted) throw IllegalStateException("Flow is already canceled.") + runBlocking { job.cancelAndJoin() }