diff --git a/Corona-Warn-App/src/deviceForTesters/java/de/rki/coronawarnapp/test/tasks/testtask/TestTask.kt b/Corona-Warn-App/src/deviceForTesters/java/de/rki/coronawarnapp/test/tasks/testtask/TestTask.kt index 20e00f194868cb16bd4746f86d9bb87a13ac9900..762a10de5f296adb9f3a06fa6846532c65ffeb59 100644 --- a/Corona-Warn-App/src/deviceForTesters/java/de/rki/coronawarnapp/test/tasks/testtask/TestTask.kt +++ b/Corona-Warn-App/src/deviceForTesters/java/de/rki/coronawarnapp/test/tasks/testtask/TestTask.kt @@ -8,6 +8,7 @@ import kotlinx.coroutines.channels.ConflatedBroadcastChannel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow +import org.joda.time.Duration import org.joda.time.Instant import timber.log.Timber import javax.inject.Inject @@ -53,11 +54,12 @@ class TestTask @Inject constructor() : Task<DefaultProgress, TestTask.Result> { class Result : Task.Result - data class Config( + class Config : TaskFactory.Config { + override val executionTimeout: Duration = Duration.standardSeconds(10) + override val collisionBehavior: TaskFactory.Config.CollisionBehavior = TaskFactory.Config.CollisionBehavior.ENQUEUE - - ) : TaskFactory.Config + } class Factory @Inject constructor( private val taskByDagger: Provider<TestTask> diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/TaskController.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/TaskController.kt index 45311edec336858f688dae05814cccb31e0f900a..77e22394784193a235c6a0793ebedf78d2ff5231 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/TaskController.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/TaskController.kt @@ -19,6 +19,7 @@ import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.withTimeout import timber.log.Timber import java.util.UUID import javax.inject.Inject @@ -88,9 +89,11 @@ class TaskController @Inject constructor( val taskConfig = taskFactory.config val task = taskFactory.taskProvider() - val deferred = taskScope.async( - start = CoroutineStart.LAZY - ) { task.run(newRequest.arguments) } + val deferred = taskScope.async(start = CoroutineStart.LAZY) { + withTimeout(timeMillis = taskConfig.executionTimeout.millis) { + task.run(newRequest.arguments) + } + } val activeTask = InternalTaskState( request = newRequest, diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/TaskFactory.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/TaskFactory.kt index c247ea1c6eac77c728659a4ff0e3b02dd5a65246..8c694e86f93e5b7450c8ea5ca2c1480f7ce7ca44 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/TaskFactory.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/TaskFactory.kt @@ -1,11 +1,18 @@ package de.rki.coronawarnapp.task +import org.joda.time.Duration + interface TaskFactory< ProgressType : Task.Progress, ResultType : Task.Result > { interface Config { + /** + * The maximal runtime of the task, before it is canceled by the controller + */ + val executionTimeout: Duration + val collisionBehavior: CollisionBehavior enum class CollisionBehavior { diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/common/DefaultTaskRequest.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/common/DefaultTaskRequest.kt index 444af7e32e857a5151f4ead58111dd90f31a36de..03c85b3e6e9e5a6eb5e9dd42da4515ee9665615c 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/common/DefaultTaskRequest.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/common/DefaultTaskRequest.kt @@ -9,4 +9,7 @@ data class DefaultTaskRequest( override val id: UUID = UUID.randomUUID(), override val type: KClass<out Task<Task.Progress, Task.Result>>, override val arguments: Task.Arguments -) : TaskRequest +) : TaskRequest { + + fun toNewTask(): DefaultTaskRequest = copy(id = UUID.randomUUID()) +} diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/example/QueueingTask.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/example/QueueingTask.kt index b3a0d282163c243491a28278c770350987fa05bf..88618d3ed747e42b5980e61542634a22bca4b91d 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/example/QueueingTask.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/example/QueueingTask.kt @@ -8,12 +8,14 @@ import kotlinx.coroutines.channels.ConflatedBroadcastChannel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow +import org.joda.time.Duration import timber.log.Timber import java.io.File import java.util.UUID import javax.inject.Inject import javax.inject.Provider +@Suppress("MagicNumber") open class QueueingTask @Inject constructor() : Task<DefaultProgress, QueueingTask.Result> { private val internalProgress = ConflatedBroadcastChannel<DefaultProgress>() @@ -51,30 +53,26 @@ open class QueueingTask @Inject constructor() : Task<DefaultProgress, QueueingTa isCanceled = true } - @Suppress("MagicNumber") data class Arguments( val path: File, val values: List<String> = (1..10).map { UUID.randomUUID().toString() }, val delay: Long = 100L ) : Task.Arguments - data class Result( - val writtenBytes: Long - ) : Task.Result + data class Result(val writtenBytes: Long) : Task.Result + + class Config : TaskFactory.Config { + override val executionTimeout: Duration = Duration.standardSeconds(10) - data class Config( override val collisionBehavior: TaskFactory.Config.CollisionBehavior = TaskFactory.Config.CollisionBehavior.ENQUEUE - - ) : TaskFactory.Config + } class Factory @Inject constructor( private val taskByDagger: Provider<QueueingTask> ) : TaskFactory<DefaultProgress, Result> { override val config: TaskFactory.Config = Config() - override val taskProvider: () -> Task<DefaultProgress, Result> = { - taskByDagger.get() - } + override val taskProvider: () -> Task<DefaultProgress, Result> = { taskByDagger.get() } } } diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/TaskControllerTest.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/TaskControllerTest.kt index c6bc35a0d09c1d1af2d02ccaa1cbafba4cfa09c7..e823164d0335affe30ca1a7bbeec2933b5e3ddf3 100644 --- a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/TaskControllerTest.kt +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/TaskControllerTest.kt @@ -2,7 +2,10 @@ package de.rki.coronawarnapp.task import de.rki.coronawarnapp.task.common.DefaultTaskRequest import de.rki.coronawarnapp.task.example.QueueingTask -import de.rki.coronawarnapp.task.example.SkippingTask +import de.rki.coronawarnapp.task.testtasks.SkippingTask +import de.rki.coronawarnapp.task.testtasks.timeout.TimeoutTask +import de.rki.coronawarnapp.task.testtasks.timeout.TimeoutTask2 +import de.rki.coronawarnapp.task.testtasks.timeout.TimeoutTaskArguments import de.rki.coronawarnapp.util.TimeStamper import io.kotest.assertions.throwables.shouldNotThrowAny import io.kotest.assertions.throwables.shouldThrow @@ -19,6 +22,7 @@ import io.mockk.mockk import io.mockk.spyk import io.mockk.verifySequence import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.delay import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.take @@ -34,6 +38,7 @@ import testhelpers.extensions.isAfterOrEqual import java.io.File import java.io.FileNotFoundException import java.util.UUID +import javax.inject.Provider class TaskControllerTest : BaseIOTest() { @@ -45,8 +50,10 @@ class TaskControllerTest : BaseIOTest() { private val testDir = File(IO_TEST_BASEDIR, this::class.java.simpleName) - private val queueingFactory = spyk(QueueingTask.Factory { QueueingTask() }) - private val skippingFactory = spyk(SkippingTask.Factory { SkippingTask() }) + private val timeoutFactory = spyk(TimeoutTask.Factory(Provider { TimeoutTask() })) + private val timeoutFactory2 = spyk(TimeoutTask2.Factory(Provider { TimeoutTask2() })) + private val queueingFactory = spyk(QueueingTask.Factory(Provider { QueueingTask() })) + private val skippingFactory = spyk(SkippingTask.Factory(Provider { SkippingTask() })) @BeforeEach fun setup() { @@ -54,6 +61,8 @@ class TaskControllerTest : BaseIOTest() { taskFactoryMap[QueueingTask::class.java] = queueingFactory taskFactoryMap[SkippingTask::class.java] = skippingFactory + taskFactoryMap[TimeoutTask::class.java] = timeoutFactory + taskFactoryMap[TimeoutTask2::class.java] = timeoutFactory2 every { timeStamper.nowUTC } answers { Instant.now() @@ -250,7 +259,7 @@ class TaskControllerTest : BaseIOTest() { ) instance.submit(request1) - val request2 = request1.copy(id = UUID.randomUUID()) + val request2 = request1.toNewTask() instance.submit(request2) val infoPending = instance.tasks.first { emission -> @@ -411,4 +420,123 @@ class TaskControllerTest : BaseIOTest() { instance.close() } + + @Test + fun `tasks are timed out according to their config`() = runBlockingTest { + val instance = createInstance(scope = this) + + val request = DefaultTaskRequest( + arguments = TimeoutTaskArguments(), + type = TimeoutTask::class + ) + + instance.submit(request) + + val infoFinished = instance.tasks + .first { it.single().taskState.executionState == TaskState.ExecutionState.FINISHED } + .single() + + infoFinished.apply { + taskState.isFailed shouldBe true + taskState.error shouldBe instanceOf(TimeoutCancellationException::class) + } + + instance.tasks.first().size shouldBe 1 + + instance.close() + } + + @Test + fun `timeout starts on execution, not while pending`() = runBlockingTest { + val instance = createInstance(scope = this) + + val taskWithTimeout = DefaultTaskRequest( + arguments = TimeoutTaskArguments(), + type = TimeoutTask::class + ) + val taskWithoutTimeout = DefaultTaskRequest( + arguments = TimeoutTaskArguments(delay = 5000), + type = TimeoutTask::class + ) + val taskWithoutTimeout2 = taskWithoutTimeout.toNewTask() + + instance.submit(taskWithTimeout) + instance.submit(taskWithoutTimeout) + instance.submit(taskWithoutTimeout2) + + val finishedTasks = instance.tasks.first { tasks -> + tasks.all { it.taskState.executionState == TaskState.ExecutionState.FINISHED } + } + instance.tasks.first().size shouldBe 3 + + finishedTasks.single { it.taskState.request == taskWithTimeout }.apply { + taskState.isFailed shouldBe true + taskState.error shouldBe instanceOf(TimeoutCancellationException::class) + } + finishedTasks.single { it.taskState.request == taskWithoutTimeout }.apply { + taskState.isSuccessful shouldBe true + taskState.error shouldBe null + taskState.result shouldNotBe null + } + finishedTasks.single { it.taskState.request == taskWithoutTimeout2 }.apply { + taskState.isSuccessful shouldBe true + taskState.error shouldBe null + taskState.result shouldNotBe null + } + + instance.close() + } + + @Test + fun `parallel tasks can timeout`() = runBlockingTest { + val instance = createInstance(scope = this) + + val task1WithTimeout = DefaultTaskRequest( + arguments = TimeoutTaskArguments(), + type = TimeoutTask::class + ) + val task2WithTimeout = DefaultTaskRequest( + arguments = TimeoutTaskArguments(), + type = TimeoutTask2::class + ) + val task1WithoutTimeout = DefaultTaskRequest( + arguments = TimeoutTaskArguments(delay = 5000), + type = TimeoutTask::class + ) + val task2WithoutTimeout = DefaultTaskRequest( + arguments = TimeoutTaskArguments(delay = 5000), + type = TimeoutTask2::class + ) + + instance.submit(task1WithTimeout) + instance.submit(task2WithTimeout) + instance.submit(task1WithoutTimeout) + instance.submit(task2WithoutTimeout) + + val finishedTasks = instance.tasks.first { tasks -> + tasks.all { it.taskState.executionState == TaskState.ExecutionState.FINISHED } + } + instance.tasks.first().size shouldBe 4 + + finishedTasks.single { it.taskState.request == task1WithTimeout }.apply { + taskState.isFailed shouldBe true + taskState.error shouldBe instanceOf(TimeoutCancellationException::class) + } + finishedTasks.single { it.taskState.request == task2WithTimeout }.apply { + taskState.isFailed shouldBe true + taskState.error shouldBe instanceOf(TimeoutCancellationException::class) + } + finishedTasks.single { it.taskState.request == task1WithoutTimeout }.apply { + taskState.isSuccessful shouldBe true + taskState.error shouldBe null + taskState.result shouldNotBe null + } + finishedTasks.single { it.taskState.request == task2WithoutTimeout }.apply { + taskState.isSuccessful shouldBe true + taskState.error shouldBe null + taskState.result shouldNotBe null + } + + instance.close() + } } diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/example/SkippingTask.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/SkippingTask.kt similarity index 57% rename from Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/example/SkippingTask.kt rename to Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/SkippingTask.kt index 653ea122eba93f5f04f2173b84c9e21cd73a8ea3..3c832ad293951d1072960f4ffbefaee9fe1b0333 100644 --- a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/example/SkippingTask.kt +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/SkippingTask.kt @@ -1,22 +1,28 @@ -package de.rki.coronawarnapp.task.example +package de.rki.coronawarnapp.task.testtasks import de.rki.coronawarnapp.task.Task import de.rki.coronawarnapp.task.TaskFactory import de.rki.coronawarnapp.task.common.DefaultProgress +import de.rki.coronawarnapp.task.example.QueueingTask +import org.joda.time.Duration import javax.inject.Inject import javax.inject.Provider class SkippingTask : QueueingTask() { - data class Config( - override val collisionBehavior: TaskFactory.Config.CollisionBehavior = TaskFactory.Config.CollisionBehavior.SKIP_IF_SIBLING_RUNNING - ) : TaskFactory.Config + class Config : TaskFactory.Config { + override val executionTimeout: Duration = Duration.standardSeconds(10) + + override val collisionBehavior: TaskFactory.Config.CollisionBehavior = + TaskFactory.Config.CollisionBehavior.SKIP_IF_SIBLING_RUNNING + } class Factory @Inject constructor( private val taskByDagger: Provider<QueueingTask>, ) : TaskFactory<DefaultProgress, Result> { - override val config: TaskFactory.Config = Config() + override val config: TaskFactory.Config = + Config() override val taskProvider: () -> Task<DefaultProgress, Result> = { taskByDagger.get() } diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/BaseTimeoutTask.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/BaseTimeoutTask.kt new file mode 100644 index 0000000000000000000000000000000000000000..7879b015a9d57e50a59e433c055959f2642b6a30 --- /dev/null +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/BaseTimeoutTask.kt @@ -0,0 +1,50 @@ +package de.rki.coronawarnapp.task.testtasks.timeout + +import de.rki.coronawarnapp.task.Task +import de.rki.coronawarnapp.task.TaskCancellationException +import de.rki.coronawarnapp.task.TaskFactory +import de.rki.coronawarnapp.task.common.DefaultProgress +import kotlinx.coroutines.channels.ConflatedBroadcastChannel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import timber.log.Timber +import javax.inject.Inject +import javax.inject.Provider + +abstract class BaseTimeoutTask : Task<DefaultProgress, TimeoutTaskResult> { + + private val internalProgress = ConflatedBroadcastChannel<DefaultProgress>() + override val progress: Flow<DefaultProgress> = internalProgress.asFlow() + + private var isCanceled = false + + override suspend fun run(arguments: Task.Arguments): TimeoutTaskResult = try { + Timber.d("Running with arguments=%s", arguments) + runSafely(arguments as TimeoutTaskArguments).also { + if (isCanceled) throw TaskCancellationException() + } + } finally { + Timber.i("Finished (isCanceled=$isCanceled).") + internalProgress.close() + } + + private suspend fun runSafely(arguments: TimeoutTaskArguments): TimeoutTaskResult { + delay(arguments.delay) + return TimeoutTaskResult() + } + + override suspend fun cancel() { + Timber.w("cancel() called.") + isCanceled = true + } + + abstract class Factory @Inject constructor( + private val taskByDagger: Provider<BaseTimeoutTask> + ) : TaskFactory<DefaultProgress, TimeoutTaskResult> { + + override val config: TaskFactory.Config = TimeoutTaskConfig() + override val taskProvider: () -> Task<DefaultProgress, TimeoutTaskResult> = + { taskByDagger.get() } + } +} diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTask.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTask.kt new file mode 100644 index 0000000000000000000000000000000000000000..08d0dffb965ed895b17dd710a0a0b1af9f346c90 --- /dev/null +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTask.kt @@ -0,0 +1,9 @@ +package de.rki.coronawarnapp.task.testtasks.timeout + +import javax.inject.Provider + +class TimeoutTask : BaseTimeoutTask() { + + class Factory constructor(taskByDagger: Provider<BaseTimeoutTask>) : + BaseTimeoutTask.Factory(taskByDagger) +} diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTask2.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTask2.kt new file mode 100644 index 0000000000000000000000000000000000000000..0f2b843f15c6088324d865ddaac10fd3485da7b0 --- /dev/null +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTask2.kt @@ -0,0 +1,9 @@ +package de.rki.coronawarnapp.task.testtasks.timeout + +import javax.inject.Provider + +class TimeoutTask2 : BaseTimeoutTask() { + + class Factory constructor(taskByDagger: Provider<BaseTimeoutTask>) : + BaseTimeoutTask.Factory(taskByDagger) +} diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskArguments.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskArguments.kt new file mode 100644 index 0000000000000000000000000000000000000000..2195009183fe5794f72afdce27098104b13c8512 --- /dev/null +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskArguments.kt @@ -0,0 +1,7 @@ +package de.rki.coronawarnapp.task.testtasks.timeout + +import de.rki.coronawarnapp.task.Task + +@Suppress("MagicNumber") +data class TimeoutTaskArguments(val delay: Long = 15 * 1000L) : + Task.Arguments diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskConfig.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskConfig.kt new file mode 100644 index 0000000000000000000000000000000000000000..18165d9dd13c5ce5c13864d0634315b631e17f9a --- /dev/null +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskConfig.kt @@ -0,0 +1,11 @@ +package de.rki.coronawarnapp.task.testtasks.timeout + +import de.rki.coronawarnapp.task.TaskFactory +import org.joda.time.Duration + +class TimeoutTaskConfig : TaskFactory.Config { + override val executionTimeout: Duration = Duration.standardSeconds(10) + + override val collisionBehavior: TaskFactory.Config.CollisionBehavior = + TaskFactory.Config.CollisionBehavior.ENQUEUE +} diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskResult.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskResult.kt new file mode 100644 index 0000000000000000000000000000000000000000..eb735b0482b87b0be5a6c27bcc595e36e0519d51 --- /dev/null +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskResult.kt @@ -0,0 +1,5 @@ +package de.rki.coronawarnapp.task.testtasks.timeout + +import de.rki.coronawarnapp.task.Task + +class TimeoutTaskResult : Task.Result