diff --git a/.github/workflows/gradle-wrapper-validation.yml b/.github/workflows/gradle-wrapper-validation.yml index 13dca3db33d0f66c8b8af3bd6b1eb06db666aa83..8cbe448f6ffb21cd44d01a5f534581dff4675ce2 100644 --- a/.github/workflows/gradle-wrapper-validation.yml +++ b/.github/workflows/gradle-wrapper-validation.yml @@ -3,10 +3,10 @@ name: "Validate Gradle Wrapper" on: push: branches: - - master + - main pull_request: branches: - - master + - main jobs: validation: diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/CalculationTrackerStorage.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/CalculationTrackerStorage.kt index 1aaf4d63b9fe1806db9ec52b6c7760786fe27017..1d907fcc6c70499a2bcece6e3da4f80309ba1fa0 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/CalculationTrackerStorage.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/CalculationTrackerStorage.kt @@ -2,6 +2,8 @@ package de.rki.coronawarnapp.nearby.modules.calculationtracker import android.content.Context import com.google.gson.Gson +import de.rki.coronawarnapp.exception.ExceptionCategory +import de.rki.coronawarnapp.exception.reporting.report import de.rki.coronawarnapp.util.di.AppContext import de.rki.coronawarnapp.util.gson.fromJson import de.rki.coronawarnapp.util.gson.toJson @@ -36,11 +38,13 @@ class CalculationTrackerStorage @Inject constructor( if (!storageFile.exists()) return@withLock emptyMap() gson.fromJson<Map<String, Calculation>>(storageFile).also { + require(it.size >= 0) Timber.v("Loaded calculation data: %s", it) lastCalcuationData = it } } catch (e: Exception) { Timber.e(e, "Failed to load tracked calculations.") + if (storageFile.delete()) Timber.w("Storage file was deleted.") emptyMap() } } @@ -55,6 +59,7 @@ class CalculationTrackerStorage @Inject constructor( gson.toJson(data, storageFile) } catch (e: Exception) { Timber.e(e, "Failed to save tracked calculations.") + e.report(ExceptionCategory.INTERNAL) } } } diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt index 359c2e948ed6215083176ba02a1635aa7bbdf50c..a52a476a6c9a10d6cd420d5cf0705e56295c8fe9 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt @@ -6,7 +6,6 @@ import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharingStarted -import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.distinctUntilChanged @@ -15,6 +14,8 @@ import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.shareIn import kotlinx.coroutines.plus +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import timber.log.Timber import kotlin.coroutines.CoroutineContext @@ -36,29 +37,35 @@ class HotDataFlow<T : Any>( extraBufferCapacity = Int.MAX_VALUE, onBufferOverflow = BufferOverflow.SUSPEND ) + private val valueGuard = Mutex() private val internalFlow = channelFlow { - var currentValue = startValueProvider().also { - Timber.tag(tag).v("startValue=%s", it) - send(it) + var currentValue = valueGuard.withLock { + startValueProvider().also { send(it) } } + Timber.tag(tag).v("startValue=%s", currentValue) - updateActions.collect { updateAction -> - currentValue = updateAction(currentValue).also { - currentValue = it - send(it) + updateActions + .onCompletion { + Timber.tag(tag).v("updateActions onCompletion -> resetReplayCache()") + updateActions.resetReplayCache() } - } + .collect { updateAction -> + currentValue = valueGuard.withLock { + updateAction(currentValue).also { send(it) } + } + } + + Timber.tag(tag).v("internal channelFlow finished.") } val data: Flow<T> = internalFlow .distinctUntilChanged() .onStart { Timber.tag(tag).v("internal onStart") } - .catch { - Timber.tag(tag).e(it, "internal Error") - throw it + .onCompletion { err -> + if (err != null) Timber.tag(tag).w(err, "internal onCompletion due to error") + else Timber.tag(tag).v("internal onCompletion") } - .onCompletion { Timber.tag(tag).v("internal onCompletion") } .shareIn( scope = scope + coroutineContext, replay = 1, diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/gson/GsonExtensions.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/gson/GsonExtensions.kt index b79e725c5d935b80971ac86bf3ff39505eb5267a..79f055e93fc606e770ffb19e4291231b9b48d2bc 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/gson/GsonExtensions.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/gson/GsonExtensions.kt @@ -9,10 +9,11 @@ inline fun <reified T> Gson.fromJson(json: String): T = fromJson( object : TypeToken<T>() {}.type ) -inline fun <reified T> Gson.fromJson(file: File): T = file.reader().use { +inline fun <reified T> Gson.fromJson(file: File): T = file.bufferedReader().use { fromJson(it, object : TypeToken<T>() {}.type) } -inline fun <reified T> Gson.toJson(data: T, file: File) = file.writer().use { writer -> +inline fun <reified T> Gson.toJson(data: T, file: File) = file.bufferedWriter().use { writer -> toJson(data, writer) + writer.flush() } diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/CalculationTrackerStorageTest.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/CalculationTrackerStorageTest.kt index 5b1be36e8bec29072f2aa32140cbc1a4a091214b..4ab6acc1f3e367cc8c4f6eb774ab80335ef93792 100644 --- a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/CalculationTrackerStorageTest.kt +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/CalculationTrackerStorageTest.kt @@ -130,4 +130,20 @@ class CalculationTrackerStorageTest : BaseIOTest() { storedData.getValue("b2b98400-058d-43e6-b952-529a5255248b").isCalculating shouldBe true storedData.getValue("aeb15509-fb34-42ce-8795-7a9ae0c2f389").isCalculating shouldBe false } + + @Test + fun `we catch empty json data and prevent unsafely initialized maps`() = runBlockingTest { + storageDir.mkdirs() + storageFile.writeText("") + + storageFile.exists() shouldBe true + + createStorage().apply { + val value = load() + value.size shouldBe 0 + value shouldBe emptyMap() + + storageFile.exists() shouldBe false + } + } } diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt index 0a5702aa451a11d0064f51a1d9f97fa5e54ab376..0d6642641c839b32e302ee81304897a82ec3cdb0 100644 --- a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt @@ -1,5 +1,6 @@ package de.rki.coronawarnapp.util.flow +import de.rki.coronawarnapp.util.mutate import io.kotest.matchers.shouldBe import io.kotest.matchers.types.instanceOf import io.mockk.coEvery @@ -122,6 +123,48 @@ class HotDataFlowTest : BaseTest() { coVerify(exactly = 1) { valueProvider.invoke(any()) } } + data class TestData( + val number: Long = 1 + ) + + @Test + fun `check multi threading value updates with more complex data`() { + val testScope = TestCoroutineScope() + val valueProvider = mockk<suspend CoroutineScope.() -> Map<String, TestData>>() + coEvery { valueProvider.invoke(any()) } returns mapOf("data" to TestData()) + + val hotData = HotDataFlow( + loggingTag = "tag", + scope = testScope, + startValueProvider = valueProvider, + sharingBehavior = SharingStarted.Lazily + ) + + val testCollector = hotData.data.test(startOnScope = testScope) + testCollector.silent = true + + (1..10).forEach { _ -> + thread { + (1..400).forEach { _ -> + hotData.updateSafely { + mutate { + this["data"] = getValue("data").copy( + number = getValue("data").number + 1 + ) + } + } + } + } + } + + runBlocking { + testCollector.await { list, l -> list.size == 4001 } + testCollector.latestValues.map { it.values.single().number } shouldBe (1L..4001L).toList() + } + + coVerify(exactly = 1) { valueProvider.invoke(any()) } + } + @Test fun `only emit new values if they actually changed updates`() { val testScope = TestCoroutineScope() @@ -163,9 +206,9 @@ class HotDataFlowTest : BaseTest() { testScope.runBlockingTest2(permanentJobs = true) { - val sub1 = hotData.data.test().start(scope = this) - val sub2 = hotData.data.test().start(scope = this) - val sub3 = hotData.data.test().start(scope = this) + val sub1 = hotData.data.test(tag = "sub1", startOnScope = this) + val sub2 = hotData.data.test(tag = "sub2", startOnScope = this) + val sub3 = hotData.data.test(tag = "sub3", startOnScope = this) hotData.updateSafely { "A" } hotData.updateSafely { "B" } @@ -181,4 +224,47 @@ class HotDataFlowTest : BaseTest() { } coVerify(exactly = 1) { valueProvider.invoke(any()) } } + + @Test + fun `update queue is wiped on completion`() = runBlockingTest2(permanentJobs = true) { + val valueProvider = mockk<suspend CoroutineScope.() -> Long>() + coEvery { valueProvider.invoke(any()) } returns 1 + + val hotData = HotDataFlow( + loggingTag = "tag", + scope = this, + coroutineContext = this.coroutineContext, + startValueProvider = valueProvider, + sharingBehavior = SharingStarted.WhileSubscribed(replayExpirationMillis = 0) + ) + + val testCollector1 = hotData.data.test(tag = "collector1", startOnScope = this) + testCollector1.silent = false + + (1..10).forEach { _ -> + hotData.updateSafely { + this + 1L + } + } + + advanceUntilIdle() + + testCollector1.await { list, _ -> list.size == 11 } + testCollector1.latestValues shouldBe (1L..11L).toList() + + testCollector1.cancel() + testCollector1.awaitFinal() + + val testCollector2 = hotData.data.test(tag = "collector2", startOnScope = this) + testCollector2.silent = false + + advanceUntilIdle() + + testCollector2.cancel() + testCollector2.awaitFinal() + + testCollector2.latestValues shouldBe listOf(1L) + + coVerify(exactly = 2) { valueProvider.invoke(any()) } + } } diff --git a/Corona-Warn-App/src/test/java/testhelpers/coroutines/FlowTest.kt b/Corona-Warn-App/src/test/java/testhelpers/coroutines/FlowTest.kt index 975a8c48dd522f417915c601cca231fea22e0bf2..d874d9ea36af77611d62841ad35a3bfc5fd308e7 100644 --- a/Corona-Warn-App/src/test/java/testhelpers/coroutines/FlowTest.kt +++ b/Corona-Warn-App/src/test/java/testhelpers/coroutines/FlowTest.kt @@ -16,14 +16,17 @@ import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.withTimeout +import org.joda.time.Duration import timber.log.Timber fun <T> Flow<T>.test( tag: String? = null, - startOnScope: CoroutineScope -): TestCollector<T> = test(tag ?: "FlowTest").start(scope = startOnScope) + startOnScope: CoroutineScope = TestCoroutineScope() +): TestCollector<T> = createTest(tag ?: "FlowTest").start(scope = startOnScope) -fun <T> Flow<T>.test( +fun <T> Flow<T>.createTest( tag: String? = null ): TestCollector<T> = TestCollector(this, tag ?: "FlowTest") @@ -74,9 +77,14 @@ class TestCollector<T>( val latestValues: List<T> get() = collectedValues - fun await(condition: (List<T>, T) -> Boolean): T = runBlocking { - emissions().first { - condition(collectedValues, it) + fun await( + timeout: Duration = Duration.standardSeconds(10), + condition: (List<T>, T) -> Boolean + ): T = runBlocking { + withTimeout(timeMillis = timeout.millis) { + emissions().first { + condition(collectedValues, it) + } } } @@ -94,6 +102,8 @@ class TestCollector<T>( } fun cancel() { + if (job.isCompleted) throw IllegalStateException("Flow is already canceled.") + runBlocking { job.cancelAndJoin() } diff --git a/gradle.properties b/gradle.properties index 35e2e5d5c2860c715facf42cfb8fea684f3f9702..9d6eecb9a7bc916b3f59ac234fe1c9bb20e80737 100644 --- a/gradle.properties +++ b/gradle.properties @@ -19,5 +19,5 @@ org.gradle.dependency.verification.console=verbose # Versioning, this is used by the app & pipelines to calculate the current versionCode & versionName VERSION_MAJOR=1 VERSION_MINOR=6 -VERSION_PATCH=0 -VERSION_BUILD=8 +VERSION_PATCH=1 +VERSION_BUILD=1