Skip to content
Snippets Groups Projects
Unverified Commit 3bd7bf90 authored by harambasicluka's avatar harambasicluka Committed by GitHub
Browse files

Merge 1.6.x into 1.7.x (DEV) (#1618)

Merge 1.6.x into 1.7.x
parents 37544eed a7ef9114
No related branches found
No related tags found
No related merge requests found
......@@ -2,6 +2,8 @@ package de.rki.coronawarnapp.nearby.modules.detectiontracker
import android.content.Context
import com.google.gson.Gson
import de.rki.coronawarnapp.exception.ExceptionCategory
import de.rki.coronawarnapp.exception.reporting.report
import de.rki.coronawarnapp.util.di.AppContext
import de.rki.coronawarnapp.util.serialization.BaseGson
import de.rki.coronawarnapp.util.serialization.fromJson
......@@ -44,11 +46,13 @@ class ExposureDetectionTrackerStorage @Inject constructor(
if (!storageFile.exists()) return@withLock emptyMap()
gson.fromJson<Map<String, TrackedExposureDetection>>(storageFile).also {
require(it.size >= 0)
Timber.v("Loaded detection data: %s", it)
lastCalcuationData = it
}
} catch (e: Exception) {
Timber.e(e, "Failed to load tracked detections.")
if (storageFile.delete()) Timber.w("Storage file was deleted.")
emptyMap()
}
}
......@@ -63,6 +67,7 @@ class ExposureDetectionTrackerStorage @Inject constructor(
gson.toJson(data, storageFile)
} catch (e: Exception) {
Timber.e(e, "Failed to save tracked detections.")
e.report(ExceptionCategory.INTERNAL)
}
}
}
......@@ -15,6 +15,8 @@ import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.plus
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import timber.log.Timber
import kotlin.coroutines.CoroutineContext
......@@ -37,20 +39,32 @@ class HotDataFlow<T : Any>(
extraBufferCapacity = Int.MAX_VALUE,
onBufferOverflow = BufferOverflow.SUSPEND
)
private val valueGuard = Mutex()
private val internalProducer: Flow<Holder<T>> = channelFlow {
var currentValue = startValueProvider().also {
Timber.tag(tag).v("startValue=%s", it)
val updatedBy: suspend T.() -> T = { it }
send(Holder.Data(value = it, updatedBy = updatedBy))
var currentValue = valueGuard.withLock {
startValueProvider().also {
Timber.tag(tag).v("startValue=%s", it)
val updatedBy: suspend T.() -> T = { it }
send(Holder.Data(value = it, updatedBy = updatedBy))
}
}
Timber.tag(tag).v("startValue=%s", currentValue)
updateActions.collect { updateAction ->
currentValue = updateAction(currentValue).also {
currentValue = it
send(Holder.Data(value = it, updatedBy = updateAction))
updateActions
.onCompletion {
Timber.tag(tag).v("updateActions onCompletion -> resetReplayCache()")
updateActions.resetReplayCache()
}
}
.collect { updateAction ->
currentValue = valueGuard.withLock {
updateAction(currentValue).also {
send(Holder.Data(value = it, updatedBy = updateAction))
}
}
}
Timber.tag(tag).v("internal channelFlow finished.")
}
private val internalFlow = internalProducer
......@@ -65,7 +79,10 @@ class HotDataFlow<T : Any>(
throw it
}
}
.onCompletion { Timber.tag(tag).v("Internal onCompletion") }
.onCompletion { err ->
if (err != null) Timber.tag(tag).w(err, "internal onCompletion due to error")
else Timber.tag(tag).v("internal onCompletion")
}
.shareIn(
scope = scope + coroutineContext,
replay = 1,
......
......@@ -11,12 +11,13 @@ inline fun <reified T> Gson.fromJson(json: String): T = fromJson(
object : TypeToken<T>() {}.type
)
inline fun <reified T> Gson.fromJson(file: File): T = file.reader().use {
inline fun <reified T> Gson.fromJson(file: File): T = file.bufferedReader().use {
fromJson(it, object : TypeToken<T>() {}.type)
}
inline fun <reified T> Gson.toJson(data: T, file: File) = file.writer().use { writer ->
inline fun <reified T> Gson.toJson(data: T, file: File) = file.bufferedWriter().use { writer ->
toJson(data, writer)
writer.flush()
}
fun <T : Any> KClass<T>.getDefaultGsonTypeAdapter(): TypeAdapter<T> = Gson().getAdapter(this.java)
......@@ -130,4 +130,20 @@ class ExposureDetectionTrackerStorageTest : BaseIOTest() {
storedData.getValue("b2b98400-058d-43e6-b952-529a5255248b").isCalculating shouldBe true
storedData.getValue("aeb15509-fb34-42ce-8795-7a9ae0c2f389").isCalculating shouldBe false
}
@Test
fun `we catch empty json data and prevent unsafely initialized maps`() = runBlockingTest {
storageDir.mkdirs()
storageFile.writeText("")
storageFile.exists() shouldBe true
createStorage().apply {
val value = load()
value.size shouldBe 0
value shouldBe emptyMap()
storageFile.exists() shouldBe false
}
}
}
package de.rki.coronawarnapp.util.flow
import de.rki.coronawarnapp.util.mutate
import io.kotest.assertions.throwables.shouldThrow
import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.instanceOf
......@@ -84,7 +85,7 @@ class HotDataFlowTest : BaseTest() {
)
testScope.apply {
runBlockingTest2(permanentJobs = true) {
runBlockingTest2(ignoreActive = true) {
hotData.data.first() shouldBe "Test"
hotData.data.first() shouldBe "Test"
}
......@@ -107,7 +108,7 @@ class HotDataFlowTest : BaseTest() {
)
testScope.apply {
runBlockingTest2(permanentJobs = true) {
runBlockingTest2(ignoreActive = true) {
hotData.data.first() shouldBe "Test"
hotData.data.first() shouldBe "Test"
}
......@@ -150,6 +151,48 @@ class HotDataFlowTest : BaseTest() {
coVerify(exactly = 1) { valueProvider.invoke(any()) }
}
data class TestData(
val number: Long = 1
)
@Test
fun `check multi threading value updates with more complex data`() {
val testScope = TestCoroutineScope()
val valueProvider = mockk<suspend CoroutineScope.() -> Map<String, TestData>>()
coEvery { valueProvider.invoke(any()) } returns mapOf("data" to TestData())
val hotData = HotDataFlow(
loggingTag = "tag",
scope = testScope,
startValueProvider = valueProvider,
sharingBehavior = SharingStarted.Lazily
)
val testCollector = hotData.data.test(startOnScope = testScope)
testCollector.silent = true
(1..10).forEach { _ ->
thread {
(1..400).forEach { _ ->
hotData.updateSafely {
mutate {
this["data"] = getValue("data").copy(
number = getValue("data").number + 1
)
}
}
}
}
}
runBlocking {
testCollector.await { list, l -> list.size == 4001 }
testCollector.latestValues.map { it.values.single().number } shouldBe (1L..4001L).toList()
}
coVerify(exactly = 1) { valueProvider.invoke(any()) }
}
@Test
fun `only emit new values if they actually changed updates`() {
val testScope = TestCoroutineScope()
......@@ -188,10 +231,10 @@ class HotDataFlowTest : BaseTest() {
sharingBehavior = SharingStarted.Lazily
)
testScope.runBlockingTest2(permanentJobs = true) {
val sub1 = hotData.data.test().start(scope = this)
val sub2 = hotData.data.test().start(scope = this)
val sub3 = hotData.data.test().start(scope = this)
testScope.runBlockingTest2(ignoreActive = true) {
val sub1 = hotData.data.test(tag = "sub1", startOnScope = this)
val sub2 = hotData.data.test(tag = "sub2", startOnScope = this)
val sub3 = hotData.data.test(tag = "sub3", startOnScope = this)
hotData.updateSafely { "A" }
hotData.updateSafely { "B" }
......@@ -208,6 +251,49 @@ class HotDataFlowTest : BaseTest() {
coVerify(exactly = 1) { valueProvider.invoke(any()) }
}
@Test
fun `update queue is wiped on completion`() = runBlockingTest2(ignoreActive = true) {
val valueProvider = mockk<suspend CoroutineScope.() -> Long>()
coEvery { valueProvider.invoke(any()) } returns 1
val hotData = HotDataFlow(
loggingTag = "tag",
scope = this,
coroutineContext = this.coroutineContext,
startValueProvider = valueProvider,
sharingBehavior = SharingStarted.WhileSubscribed(replayExpirationMillis = 0)
)
val testCollector1 = hotData.data.test(tag = "collector1", startOnScope = this)
testCollector1.silent = false
(1..10).forEach { _ ->
hotData.updateSafely {
this + 1L
}
}
advanceUntilIdle()
testCollector1.await { list, _ -> list.size == 11 }
testCollector1.latestValues shouldBe (1L..11L).toList()
testCollector1.cancel()
testCollector1.awaitFinal()
val testCollector2 = hotData.data.test(tag = "collector2", startOnScope = this)
testCollector2.silent = false
advanceUntilIdle()
testCollector2.cancel()
testCollector2.awaitFinal()
testCollector2.latestValues shouldBe listOf(1L)
coVerify(exactly = 2) { valueProvider.invoke(any()) }
}
@Test
fun `blocking update is actually blocking`() = runBlocking {
val testScope = TestCoroutineScope()
......
......@@ -16,14 +16,17 @@ import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.withTimeout
import org.joda.time.Duration
import timber.log.Timber
fun <T> Flow<T>.test(
tag: String? = null,
startOnScope: CoroutineScope
): TestCollector<T> = test(tag ?: "FlowTest").start(scope = startOnScope)
startOnScope: CoroutineScope = TestCoroutineScope()
): TestCollector<T> = createTest(tag ?: "FlowTest").start(scope = startOnScope)
fun <T> Flow<T>.test(
fun <T> Flow<T>.createTest(
tag: String? = null
): TestCollector<T> = TestCollector(this, tag ?: "FlowTest")
......@@ -74,9 +77,14 @@ class TestCollector<T>(
val latestValues: List<T>
get() = collectedValues
fun await(condition: (List<T>, T) -> Boolean): T = runBlocking {
emissions().first {
condition(collectedValues, it)
fun await(
timeout: Duration = Duration.standardSeconds(10),
condition: (List<T>, T) -> Boolean
): T = runBlocking {
withTimeout(timeMillis = timeout.millis) {
emissions().first {
condition(collectedValues, it)
}
}
}
......@@ -95,6 +103,8 @@ class TestCollector<T>(
}
fun cancel() {
if (job.isCompleted) throw IllegalStateException("Flow is already canceled.")
runBlocking {
job.cancelAndJoin()
}
......
......@@ -11,10 +11,10 @@ import kotlin.coroutines.EmptyCoroutineContext
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
fun TestCoroutineScope.runBlockingTest2(
permanentJobs: Boolean = false,
ignoreActive: Boolean = false,
block: suspend TestCoroutineScope.() -> Unit
): Unit = runBlockingTest2(
ignoreActive = permanentJobs,
ignoreActive = ignoreActive,
context = coroutineContext,
testBody = block
)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment