From 374a64bb0eecfb11d92babe10222d676a5e3ee2a Mon Sep 17 00:00:00 2001 From: Matthias Urhahn <matthias.urhahn@sap.com> Date: Thu, 15 Oct 2020 10:25:16 +0200 Subject: [PATCH] TaskController: Support timeout behavior (EXPOSUREAPP-2842) (#1407) * + timeout field * updated the example * Add support for Timeout logic to the TaskController (+tests ofc). Co-authored-by: chris-cwa <chris.cwa.sap@gmail.com> --- .../test/tasks/testtask/TestTask.kt | 8 +- .../rki/coronawarnapp/task/TaskController.kt | 9 +- .../de/rki/coronawarnapp/task/TaskFactory.kt | 7 + .../task/common/DefaultTaskRequest.kt | 5 +- .../task/example/QueueingTask.kt | 18 ++- .../coronawarnapp/task/TaskControllerTest.kt | 136 +++++++++++++++++- .../{example => testtasks}/SkippingTask.kt | 16 ++- .../task/testtasks/timeout/BaseTimeoutTask.kt | 50 +++++++ .../task/testtasks/timeout/TimeoutTask.kt | 9 ++ .../task/testtasks/timeout/TimeoutTask2.kt | 9 ++ .../testtasks/timeout/TimeoutTaskArguments.kt | 7 + .../testtasks/timeout/TimeoutTaskConfig.kt | 11 ++ .../testtasks/timeout/TimeoutTaskResult.kt | 5 + 13 files changed, 264 insertions(+), 26 deletions(-) rename Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/{example => testtasks}/SkippingTask.kt (57%) create mode 100644 Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/BaseTimeoutTask.kt create mode 100644 Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTask.kt create mode 100644 Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTask2.kt create mode 100644 Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskArguments.kt create mode 100644 Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskConfig.kt create mode 100644 Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskResult.kt diff --git a/Corona-Warn-App/src/deviceForTesters/java/de/rki/coronawarnapp/test/tasks/testtask/TestTask.kt b/Corona-Warn-App/src/deviceForTesters/java/de/rki/coronawarnapp/test/tasks/testtask/TestTask.kt index 20e00f194..762a10de5 100644 --- a/Corona-Warn-App/src/deviceForTesters/java/de/rki/coronawarnapp/test/tasks/testtask/TestTask.kt +++ b/Corona-Warn-App/src/deviceForTesters/java/de/rki/coronawarnapp/test/tasks/testtask/TestTask.kt @@ -8,6 +8,7 @@ import kotlinx.coroutines.channels.ConflatedBroadcastChannel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow +import org.joda.time.Duration import org.joda.time.Instant import timber.log.Timber import javax.inject.Inject @@ -53,11 +54,12 @@ class TestTask @Inject constructor() : Task<DefaultProgress, TestTask.Result> { class Result : Task.Result - data class Config( + class Config : TaskFactory.Config { + override val executionTimeout: Duration = Duration.standardSeconds(10) + override val collisionBehavior: TaskFactory.Config.CollisionBehavior = TaskFactory.Config.CollisionBehavior.ENQUEUE - - ) : TaskFactory.Config + } class Factory @Inject constructor( private val taskByDagger: Provider<TestTask> diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/TaskController.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/TaskController.kt index 45311edec..77e223947 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/TaskController.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/TaskController.kt @@ -19,6 +19,7 @@ import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.withTimeout import timber.log.Timber import java.util.UUID import javax.inject.Inject @@ -88,9 +89,11 @@ class TaskController @Inject constructor( val taskConfig = taskFactory.config val task = taskFactory.taskProvider() - val deferred = taskScope.async( - start = CoroutineStart.LAZY - ) { task.run(newRequest.arguments) } + val deferred = taskScope.async(start = CoroutineStart.LAZY) { + withTimeout(timeMillis = taskConfig.executionTimeout.millis) { + task.run(newRequest.arguments) + } + } val activeTask = InternalTaskState( request = newRequest, diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/TaskFactory.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/TaskFactory.kt index c247ea1c6..8c694e86f 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/TaskFactory.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/TaskFactory.kt @@ -1,11 +1,18 @@ package de.rki.coronawarnapp.task +import org.joda.time.Duration + interface TaskFactory< ProgressType : Task.Progress, ResultType : Task.Result > { interface Config { + /** + * The maximal runtime of the task, before it is canceled by the controller + */ + val executionTimeout: Duration + val collisionBehavior: CollisionBehavior enum class CollisionBehavior { diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/common/DefaultTaskRequest.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/common/DefaultTaskRequest.kt index 444af7e32..03c85b3e6 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/common/DefaultTaskRequest.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/common/DefaultTaskRequest.kt @@ -9,4 +9,7 @@ data class DefaultTaskRequest( override val id: UUID = UUID.randomUUID(), override val type: KClass<out Task<Task.Progress, Task.Result>>, override val arguments: Task.Arguments -) : TaskRequest +) : TaskRequest { + + fun toNewTask(): DefaultTaskRequest = copy(id = UUID.randomUUID()) +} diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/example/QueueingTask.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/example/QueueingTask.kt index b3a0d2821..88618d3ed 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/example/QueueingTask.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/task/example/QueueingTask.kt @@ -8,12 +8,14 @@ import kotlinx.coroutines.channels.ConflatedBroadcastChannel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow +import org.joda.time.Duration import timber.log.Timber import java.io.File import java.util.UUID import javax.inject.Inject import javax.inject.Provider +@Suppress("MagicNumber") open class QueueingTask @Inject constructor() : Task<DefaultProgress, QueueingTask.Result> { private val internalProgress = ConflatedBroadcastChannel<DefaultProgress>() @@ -51,30 +53,26 @@ open class QueueingTask @Inject constructor() : Task<DefaultProgress, QueueingTa isCanceled = true } - @Suppress("MagicNumber") data class Arguments( val path: File, val values: List<String> = (1..10).map { UUID.randomUUID().toString() }, val delay: Long = 100L ) : Task.Arguments - data class Result( - val writtenBytes: Long - ) : Task.Result + data class Result(val writtenBytes: Long) : Task.Result + + class Config : TaskFactory.Config { + override val executionTimeout: Duration = Duration.standardSeconds(10) - data class Config( override val collisionBehavior: TaskFactory.Config.CollisionBehavior = TaskFactory.Config.CollisionBehavior.ENQUEUE - - ) : TaskFactory.Config + } class Factory @Inject constructor( private val taskByDagger: Provider<QueueingTask> ) : TaskFactory<DefaultProgress, Result> { override val config: TaskFactory.Config = Config() - override val taskProvider: () -> Task<DefaultProgress, Result> = { - taskByDagger.get() - } + override val taskProvider: () -> Task<DefaultProgress, Result> = { taskByDagger.get() } } } diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/TaskControllerTest.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/TaskControllerTest.kt index c6bc35a0d..e823164d0 100644 --- a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/TaskControllerTest.kt +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/TaskControllerTest.kt @@ -2,7 +2,10 @@ package de.rki.coronawarnapp.task import de.rki.coronawarnapp.task.common.DefaultTaskRequest import de.rki.coronawarnapp.task.example.QueueingTask -import de.rki.coronawarnapp.task.example.SkippingTask +import de.rki.coronawarnapp.task.testtasks.SkippingTask +import de.rki.coronawarnapp.task.testtasks.timeout.TimeoutTask +import de.rki.coronawarnapp.task.testtasks.timeout.TimeoutTask2 +import de.rki.coronawarnapp.task.testtasks.timeout.TimeoutTaskArguments import de.rki.coronawarnapp.util.TimeStamper import io.kotest.assertions.throwables.shouldNotThrowAny import io.kotest.assertions.throwables.shouldThrow @@ -19,6 +22,7 @@ import io.mockk.mockk import io.mockk.spyk import io.mockk.verifySequence import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.delay import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.take @@ -34,6 +38,7 @@ import testhelpers.extensions.isAfterOrEqual import java.io.File import java.io.FileNotFoundException import java.util.UUID +import javax.inject.Provider class TaskControllerTest : BaseIOTest() { @@ -45,8 +50,10 @@ class TaskControllerTest : BaseIOTest() { private val testDir = File(IO_TEST_BASEDIR, this::class.java.simpleName) - private val queueingFactory = spyk(QueueingTask.Factory { QueueingTask() }) - private val skippingFactory = spyk(SkippingTask.Factory { SkippingTask() }) + private val timeoutFactory = spyk(TimeoutTask.Factory(Provider { TimeoutTask() })) + private val timeoutFactory2 = spyk(TimeoutTask2.Factory(Provider { TimeoutTask2() })) + private val queueingFactory = spyk(QueueingTask.Factory(Provider { QueueingTask() })) + private val skippingFactory = spyk(SkippingTask.Factory(Provider { SkippingTask() })) @BeforeEach fun setup() { @@ -54,6 +61,8 @@ class TaskControllerTest : BaseIOTest() { taskFactoryMap[QueueingTask::class.java] = queueingFactory taskFactoryMap[SkippingTask::class.java] = skippingFactory + taskFactoryMap[TimeoutTask::class.java] = timeoutFactory + taskFactoryMap[TimeoutTask2::class.java] = timeoutFactory2 every { timeStamper.nowUTC } answers { Instant.now() @@ -250,7 +259,7 @@ class TaskControllerTest : BaseIOTest() { ) instance.submit(request1) - val request2 = request1.copy(id = UUID.randomUUID()) + val request2 = request1.toNewTask() instance.submit(request2) val infoPending = instance.tasks.first { emission -> @@ -411,4 +420,123 @@ class TaskControllerTest : BaseIOTest() { instance.close() } + + @Test + fun `tasks are timed out according to their config`() = runBlockingTest { + val instance = createInstance(scope = this) + + val request = DefaultTaskRequest( + arguments = TimeoutTaskArguments(), + type = TimeoutTask::class + ) + + instance.submit(request) + + val infoFinished = instance.tasks + .first { it.single().taskState.executionState == TaskState.ExecutionState.FINISHED } + .single() + + infoFinished.apply { + taskState.isFailed shouldBe true + taskState.error shouldBe instanceOf(TimeoutCancellationException::class) + } + + instance.tasks.first().size shouldBe 1 + + instance.close() + } + + @Test + fun `timeout starts on execution, not while pending`() = runBlockingTest { + val instance = createInstance(scope = this) + + val taskWithTimeout = DefaultTaskRequest( + arguments = TimeoutTaskArguments(), + type = TimeoutTask::class + ) + val taskWithoutTimeout = DefaultTaskRequest( + arguments = TimeoutTaskArguments(delay = 5000), + type = TimeoutTask::class + ) + val taskWithoutTimeout2 = taskWithoutTimeout.toNewTask() + + instance.submit(taskWithTimeout) + instance.submit(taskWithoutTimeout) + instance.submit(taskWithoutTimeout2) + + val finishedTasks = instance.tasks.first { tasks -> + tasks.all { it.taskState.executionState == TaskState.ExecutionState.FINISHED } + } + instance.tasks.first().size shouldBe 3 + + finishedTasks.single { it.taskState.request == taskWithTimeout }.apply { + taskState.isFailed shouldBe true + taskState.error shouldBe instanceOf(TimeoutCancellationException::class) + } + finishedTasks.single { it.taskState.request == taskWithoutTimeout }.apply { + taskState.isSuccessful shouldBe true + taskState.error shouldBe null + taskState.result shouldNotBe null + } + finishedTasks.single { it.taskState.request == taskWithoutTimeout2 }.apply { + taskState.isSuccessful shouldBe true + taskState.error shouldBe null + taskState.result shouldNotBe null + } + + instance.close() + } + + @Test + fun `parallel tasks can timeout`() = runBlockingTest { + val instance = createInstance(scope = this) + + val task1WithTimeout = DefaultTaskRequest( + arguments = TimeoutTaskArguments(), + type = TimeoutTask::class + ) + val task2WithTimeout = DefaultTaskRequest( + arguments = TimeoutTaskArguments(), + type = TimeoutTask2::class + ) + val task1WithoutTimeout = DefaultTaskRequest( + arguments = TimeoutTaskArguments(delay = 5000), + type = TimeoutTask::class + ) + val task2WithoutTimeout = DefaultTaskRequest( + arguments = TimeoutTaskArguments(delay = 5000), + type = TimeoutTask2::class + ) + + instance.submit(task1WithTimeout) + instance.submit(task2WithTimeout) + instance.submit(task1WithoutTimeout) + instance.submit(task2WithoutTimeout) + + val finishedTasks = instance.tasks.first { tasks -> + tasks.all { it.taskState.executionState == TaskState.ExecutionState.FINISHED } + } + instance.tasks.first().size shouldBe 4 + + finishedTasks.single { it.taskState.request == task1WithTimeout }.apply { + taskState.isFailed shouldBe true + taskState.error shouldBe instanceOf(TimeoutCancellationException::class) + } + finishedTasks.single { it.taskState.request == task2WithTimeout }.apply { + taskState.isFailed shouldBe true + taskState.error shouldBe instanceOf(TimeoutCancellationException::class) + } + finishedTasks.single { it.taskState.request == task1WithoutTimeout }.apply { + taskState.isSuccessful shouldBe true + taskState.error shouldBe null + taskState.result shouldNotBe null + } + finishedTasks.single { it.taskState.request == task2WithoutTimeout }.apply { + taskState.isSuccessful shouldBe true + taskState.error shouldBe null + taskState.result shouldNotBe null + } + + instance.close() + } } diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/example/SkippingTask.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/SkippingTask.kt similarity index 57% rename from Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/example/SkippingTask.kt rename to Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/SkippingTask.kt index 653ea122e..3c832ad29 100644 --- a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/example/SkippingTask.kt +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/SkippingTask.kt @@ -1,22 +1,28 @@ -package de.rki.coronawarnapp.task.example +package de.rki.coronawarnapp.task.testtasks import de.rki.coronawarnapp.task.Task import de.rki.coronawarnapp.task.TaskFactory import de.rki.coronawarnapp.task.common.DefaultProgress +import de.rki.coronawarnapp.task.example.QueueingTask +import org.joda.time.Duration import javax.inject.Inject import javax.inject.Provider class SkippingTask : QueueingTask() { - data class Config( - override val collisionBehavior: TaskFactory.Config.CollisionBehavior = TaskFactory.Config.CollisionBehavior.SKIP_IF_SIBLING_RUNNING - ) : TaskFactory.Config + class Config : TaskFactory.Config { + override val executionTimeout: Duration = Duration.standardSeconds(10) + + override val collisionBehavior: TaskFactory.Config.CollisionBehavior = + TaskFactory.Config.CollisionBehavior.SKIP_IF_SIBLING_RUNNING + } class Factory @Inject constructor( private val taskByDagger: Provider<QueueingTask>, ) : TaskFactory<DefaultProgress, Result> { - override val config: TaskFactory.Config = Config() + override val config: TaskFactory.Config = + Config() override val taskProvider: () -> Task<DefaultProgress, Result> = { taskByDagger.get() } diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/BaseTimeoutTask.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/BaseTimeoutTask.kt new file mode 100644 index 000000000..7879b015a --- /dev/null +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/BaseTimeoutTask.kt @@ -0,0 +1,50 @@ +package de.rki.coronawarnapp.task.testtasks.timeout + +import de.rki.coronawarnapp.task.Task +import de.rki.coronawarnapp.task.TaskCancellationException +import de.rki.coronawarnapp.task.TaskFactory +import de.rki.coronawarnapp.task.common.DefaultProgress +import kotlinx.coroutines.channels.ConflatedBroadcastChannel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import timber.log.Timber +import javax.inject.Inject +import javax.inject.Provider + +abstract class BaseTimeoutTask : Task<DefaultProgress, TimeoutTaskResult> { + + private val internalProgress = ConflatedBroadcastChannel<DefaultProgress>() + override val progress: Flow<DefaultProgress> = internalProgress.asFlow() + + private var isCanceled = false + + override suspend fun run(arguments: Task.Arguments): TimeoutTaskResult = try { + Timber.d("Running with arguments=%s", arguments) + runSafely(arguments as TimeoutTaskArguments).also { + if (isCanceled) throw TaskCancellationException() + } + } finally { + Timber.i("Finished (isCanceled=$isCanceled).") + internalProgress.close() + } + + private suspend fun runSafely(arguments: TimeoutTaskArguments): TimeoutTaskResult { + delay(arguments.delay) + return TimeoutTaskResult() + } + + override suspend fun cancel() { + Timber.w("cancel() called.") + isCanceled = true + } + + abstract class Factory @Inject constructor( + private val taskByDagger: Provider<BaseTimeoutTask> + ) : TaskFactory<DefaultProgress, TimeoutTaskResult> { + + override val config: TaskFactory.Config = TimeoutTaskConfig() + override val taskProvider: () -> Task<DefaultProgress, TimeoutTaskResult> = + { taskByDagger.get() } + } +} diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTask.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTask.kt new file mode 100644 index 000000000..08d0dffb9 --- /dev/null +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTask.kt @@ -0,0 +1,9 @@ +package de.rki.coronawarnapp.task.testtasks.timeout + +import javax.inject.Provider + +class TimeoutTask : BaseTimeoutTask() { + + class Factory constructor(taskByDagger: Provider<BaseTimeoutTask>) : + BaseTimeoutTask.Factory(taskByDagger) +} diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTask2.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTask2.kt new file mode 100644 index 000000000..0f2b843f1 --- /dev/null +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTask2.kt @@ -0,0 +1,9 @@ +package de.rki.coronawarnapp.task.testtasks.timeout + +import javax.inject.Provider + +class TimeoutTask2 : BaseTimeoutTask() { + + class Factory constructor(taskByDagger: Provider<BaseTimeoutTask>) : + BaseTimeoutTask.Factory(taskByDagger) +} diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskArguments.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskArguments.kt new file mode 100644 index 000000000..219500918 --- /dev/null +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskArguments.kt @@ -0,0 +1,7 @@ +package de.rki.coronawarnapp.task.testtasks.timeout + +import de.rki.coronawarnapp.task.Task + +@Suppress("MagicNumber") +data class TimeoutTaskArguments(val delay: Long = 15 * 1000L) : + Task.Arguments diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskConfig.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskConfig.kt new file mode 100644 index 000000000..18165d9dd --- /dev/null +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskConfig.kt @@ -0,0 +1,11 @@ +package de.rki.coronawarnapp.task.testtasks.timeout + +import de.rki.coronawarnapp.task.TaskFactory +import org.joda.time.Duration + +class TimeoutTaskConfig : TaskFactory.Config { + override val executionTimeout: Duration = Duration.standardSeconds(10) + + override val collisionBehavior: TaskFactory.Config.CollisionBehavior = + TaskFactory.Config.CollisionBehavior.ENQUEUE +} diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskResult.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskResult.kt new file mode 100644 index 000000000..eb735b048 --- /dev/null +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/task/testtasks/timeout/TimeoutTaskResult.kt @@ -0,0 +1,5 @@ +package de.rki.coronawarnapp.task.testtasks.timeout + +import de.rki.coronawarnapp.task.Task + +class TimeoutTaskResult : Task.Result -- GitLab