diff --git a/README.md b/README.md index 213b046..89185f2 100644 --- a/README.md +++ b/README.md @@ -17,43 +17,40 @@ Coroutine job scheduler inspired by `Android Work Manager` and `android-priority ## Example -Kotlin/Native doesn't have full reflection capabilities, thus we instantiate the job classes in a custom factory class. +Define a `DataTask<*>` or a `Task` (`DataTask`), customize its body and limit when it should repeat. + +⚠️ Make sure the data you pass into the task is serializable. ```kotlin -class TestFactory: TaskFactory { - override fun create(type: KClass, params: Map): Task = when(type) { - TestTask::class -> TestTask(params) - else -> throw Exception("Unknown job class!") - } +@Serializable +data class UploadData(val id: String) + +class UploadTask(data: UploadData): DataTask(data) { + override suspend fun body() { /* Do something */ } + override suspend fun onRepeat(cause: Throwable): Boolean { cause is NetworkException } } ``` -Create a single instance of the scheduler on app start, and then start the queue to enqueue scheduled jobs. +Create a single instance of the scheduler on app start. To start enqueuing jobs run `queue.start()`. + +You can pass a `Queue.Configuration` or a custom `JobSerializer` to the scheduler. ```kotlin -val factory = TestFactory() -val scheduler = JobScheduler(factory) +val scheduler = JobScheduler() scheduler.queue.start() ``` -On job schedule you can add rules, define a store, and inject parameters. - -````kotlin -val data = ... - -scheduler.schedule { - rules { - retry(RetryLimit.Limited(3), delay = 30.seconds) - unique(data.id) - timeout(60.seconds) - } - persist(Store.Preferences) - params( - "result" to data.value, - "timestamp" to data.date - ) +You can customize the jobs life cycle during schedule by defining rules. + +```kotlin +val data = UploadData(id = ...) + +scheduler.schedule(UploadTask(data)) { + unique(data.id) + retry(RetryLimit.Limited(3), delay = 30.seconds) + persist() } -```` +``` You can subscribe to life cycle events (e.g. for logging). diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/Job.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/Job.kt index 2fb2fd9..b4ad5fb 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/Job.kt +++ b/src/commonMain/kotlin/com/liftric/persisted/queue/Job.kt @@ -3,28 +3,17 @@ package com.liftric.persisted.queue import kotlinx.coroutines.* import kotlinx.datetime.Clock import kotlinx.datetime.Instant -import kotlinx.serialization.Serializable -import kotlinx.serialization.Transient -import kotlin.reflect.KClass import kotlin.time.Duration -@Retention(AnnotationRetention.RUNTIME) -annotation class RepeatOn(val clazz: KClass) - -@Serializable -data class Job( - @Serializable(with = UUIDSerializer::class) +class Job( override val id: UUID, - override val timeout: Duration, - override val task: Task, - override val tag: String?, - override val rules: List, + override val info: JobInfo, + override val task: DataTask<*>, override val startTime: Instant = Clock.System.now() ): JobContext { - @Transient var delegate: JobDelegate? = null - constructor(task: Task, info: JobInfo) : this (UUID::class.instance(), info.timeout, task, info.tag, info.rules) + constructor(task: DataTask<*>, info: JobInfo) : this (UUID::class.instance(), info, task) private var cancellable: kotlinx.coroutines.Job? = null @@ -38,7 +27,7 @@ data class Job( if (isCancelled) return@coroutineScope cancellable = launch { val event = try { - rules.forEach { it.willRun(this@Job) } + info.rules.forEach { it.willRun(this@Job) } delegate?.broadcast(JobEvent.WillRun(this@Job)) @@ -57,7 +46,7 @@ data class Job( if (isCancelled) return@launch - rules.forEach { it.willRemove(this@Job, event) } + info.rules.forEach { it.willRemove(this@Job, event) } } catch (e: CancellationException) { delegate?.broadcast(JobEvent.DidCancel(this@Job, "Cancelled after run")) } catch (e: Error) { @@ -76,9 +65,9 @@ data class Job( delegate?.exit() } - override suspend fun repeat(id: UUID, timeout: Duration, task: Task, tag: String?, rules: List, startTime: Instant) { + override suspend fun repeat(id: UUID, info: JobInfo, task: DataTask<*>, startTime: Instant) { if (canRepeat) { - delegate?.repeat(Job(id, timeout, task, tag, rules, startTime)) + delegate?.repeat(Job(id, info, task, startTime)) } else { delegate?.broadcast(JobEvent.NotAllowedToRepeat(this@Job)) } diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/JobContext.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/JobContext.kt index 14d33bf..fe806eb 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/JobContext.kt +++ b/src/commonMain/kotlin/com/liftric/persisted/queue/JobContext.kt @@ -1,16 +1,13 @@ package com.liftric.persisted.queue import kotlinx.datetime.Instant -import kotlin.time.Duration interface JobContext { val id: UUID - val timeout: Duration - val task: Task - val tag: String? - val rules: List + val info: JobInfo + val task: DataTask<*> val startTime: Instant suspend fun cancel() - suspend fun repeat(id: UUID = this.id, timeout: Duration = this.timeout, task: Task = this.task, tag: String? = this.tag, rules: List = this.rules, startTime: Instant = this.startTime) + suspend fun repeat(id: UUID = this.id, info: JobInfo = this.info, task: DataTask<*> = this.task, startTime: Instant = this.startTime) suspend fun broadcast(event: RuleEvent) } diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/JobInfo.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/JobInfo.kt index d6472ac..baa6251 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/JobInfo.kt +++ b/src/commonMain/kotlin/com/liftric/persisted/queue/JobInfo.kt @@ -4,33 +4,7 @@ import kotlin.time.Duration data class JobInfo( var tag: String? = null, - var timeout: Duration = Duration.INFINITE -) { - var rules: List = listOf() - private set - var params: Map = mapOf() - private set - var persister: JobPersister? = null - private set - - fun rules(init: RuleInfo.() -> Unit): JobInfo { - val info = RuleInfo() - info.init() - rules = info.rules.distinctBy { it::class } - return this - } - - fun persist(persister: JobPersister): JobInfo { - this.persister = persister - return this - } - - fun params(vararg params: Pair): JobInfo { - this.params = params.toMap() - return this - } -} - -class RuleInfo { - val rules: MutableList = mutableListOf() -} + var timeout: Duration = Duration.INFINITE, + var rules: MutableList = mutableListOf(), + var shouldPersist: Boolean = false +) diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/JobScheduler.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/JobScheduler.kt index c8a99d9..a831fcd 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/JobScheduler.kt +++ b/src/commonMain/kotlin/com/liftric/persisted/queue/JobScheduler.kt @@ -1,39 +1,42 @@ package com.liftric.persisted.queue +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.* +import kotlinx.serialization.InternalSerializationApi +import kotlinx.serialization.serializerOrNull class JobScheduler( - val factory: TaskFactory, - configuration: Queue.Configuration? = null + configuration: Queue.Configuration? = null, + private val serializer: JobSerializer? = null ) { - val queue = JobQueue(configuration) - - @PublishedApi - internal val delegate = JobDelegate() - + val queue = JobQueue(configuration ?: Queue.Configuration(CoroutineScope(Dispatchers.Default), 1)) val onEvent = MutableSharedFlow(extraBufferCapacity = Int.MAX_VALUE) + private val delegate = JobDelegate() + init { delegate.onExit = { /* Do something */ } delegate.onRepeat = { repeat(it) } delegate.onEvent = { onEvent.emit(it) } } - suspend inline fun schedule() { - schedule { this } + suspend fun schedule(task: () -> DataTask<*>, configure: JobInfo.() -> JobInfo = { JobInfo() }) { + schedule(task(), configure) } - suspend inline fun schedule(init: JobInfo.() -> JobInfo) = try { - val info = init(JobInfo()).apply { + @OptIn(InternalSerializationApi::class) + suspend fun schedule(task: DataTask<*>, configure: JobInfo.() -> JobInfo = { JobInfo() }) = try { + val info = configure(JobInfo()).apply { rules.forEach { it.mutating(this) } } - val task = factory.create(T::class, info.params) + if (task.data!!::class.serializerOrNull() == null) throw Exception("Data must be serializable") val job = Job(task, info) job.delegate = delegate - job.rules.forEach { + job.info.rules.forEach { it.willSchedule(queue, job) } @@ -47,7 +50,7 @@ class JobScheduler( private suspend fun repeat(job: Job) = try { job.delegate = delegate - job.rules.forEach { + job.info.rules.forEach { it.willSchedule(queue, job) } diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/JobPersister.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/JobSerializer.kt similarity index 85% rename from src/commonMain/kotlin/com/liftric/persisted/queue/JobPersister.kt rename to src/commonMain/kotlin/com/liftric/persisted/queue/JobSerializer.kt index 4639a0a..b1773c7 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/JobPersister.kt +++ b/src/commonMain/kotlin/com/liftric/persisted/queue/JobSerializer.kt @@ -1,6 +1,6 @@ package com.liftric.persisted.queue -interface JobPersister { +interface JobSerializer { val tag: String fun store(job: Job) fun retrieve(id: String): Job? diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/Preferences.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/Preferences.kt index 8eb7a08..9dc8184 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/Preferences.kt +++ b/src/commonMain/kotlin/com/liftric/persisted/queue/Preferences.kt @@ -8,7 +8,7 @@ import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json expect class Preferences: AbstractPreferences -abstract class AbstractPreferences(private val settings: Settings): JobPersister { +abstract class AbstractPreferences(private val settings: Settings): JobSerializer { override val tag: String = "" override fun retrieve(id: String): Job? { val jsonString = settings.get(id) ?: return null diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/Queue.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/Queue.kt index dbbf580..94efd79 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/Queue.kt +++ b/src/commonMain/kotlin/com/liftric/persisted/queue/Queue.kt @@ -2,16 +2,19 @@ package com.liftric.persisted.queue import kotlinx.atomicfu.atomic import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withPermit import kotlinx.datetime.Clock +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.coroutines.coroutineContext interface Queue { - val scope: CoroutineScope val jobs: List - val maxConcurrency: Int + val configuration: Configuration data class Configuration( val scope: CoroutineScope, @@ -19,29 +22,26 @@ interface Queue { ) } -class JobQueue( - override val scope: CoroutineScope, - override val maxConcurrency: Int -): Queue { +class JobQueue(override val configuration: Queue.Configuration): Queue { + private val cancellationQueue = MutableSharedFlow(extraBufferCapacity = Int.MAX_VALUE) private val queue = atomic(mutableListOf()) - private val lock = Semaphore(maxConcurrency, 0) + private val lock = Semaphore(configuration.maxConcurrency, 0) private val isCancelling = Mutex(false) override val jobs: List get() = queue.value - constructor(configuration: Queue.Configuration?) : this( - configuration?.scope ?: CoroutineScope(Dispatchers.Default), - configuration?.maxConcurrency ?: 1 - ) + init { + cancellationQueue.onEach { it.join() } + .flowOn(Dispatchers.Default) + .launchIn(configuration.scope) + } - @PublishedApi internal fun add(job: Job) { queue.value = queue.value.plus(listOf(job)).sortedBy { it.startTime }.toMutableList() } suspend fun start() { - while (scope.isActive) { - delay(1000L) + while (configuration.scope.isActive) { if (queue.value.isEmpty()) break if (isCancelling.isLocked) break if (lock.availablePermits < 1) break @@ -50,7 +50,7 @@ class JobQueue( queue.value.remove(job) } else if (job.startTime <= Clock.System.now()) { lock.withPermit { - withTimeout(job.timeout) { + withTimeout(job.info.timeout) { job.run() queue.value.remove(job) } @@ -60,25 +60,41 @@ class JobQueue( } suspend fun cancel() { - isCancelling.withLock { - scope.coroutineContext.cancelChildren() - queue.value.clear() + submitCancellation(coroutineContext) { + isCancelling.withLock { + configuration.scope.coroutineContext.cancelChildren() + queue.value.clear() + } } } suspend fun cancel(id: UUID) { - isCancelling.withLock { - val job = queue.value.first { it.id == id } - job.cancel() - queue.value.remove(job) + submitCancellation(coroutineContext) { + isCancelling.withLock { + queue.value.firstOrNull { it.id == id }?.let { job -> + job.cancel() + queue.value.remove(job) + } + } } } suspend fun cancel(tag: String) { - isCancelling.withLock { - val job = queue.value.first { it.tag == tag } - job.cancel() - queue.value.remove(job) + submitCancellation(coroutineContext) { + isCancelling.withLock { + queue.value.firstOrNull { it.info.tag == tag }?.let { job -> + job.cancel() + queue.value.remove(job) + } + } } } + + private fun submitCancellation( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.() -> Unit + ) { + val job = configuration.scope.launch(context, CoroutineStart.LAZY, block) + cancellationQueue.tryEmit(job) + } } diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/Task.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/Task.kt index 964c07f..aeb5208 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/Task.kt +++ b/src/commonMain/kotlin/com/liftric/persisted/queue/Task.kt @@ -1,8 +1,13 @@ package com.liftric.persisted.queue -interface Task { - val params: Map +import kotlinx.serialization.Serializable + +@Serializable +abstract class DataTask(@Serializable val data: Data) { @Throws(Throwable::class) - suspend fun body() - suspend fun onRepeat(cause: Throwable): Boolean = false + abstract suspend fun body() + open suspend fun onRepeat(cause: Throwable): Boolean = false } + +@Serializable +abstract class Task: DataTask(Unit) diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/TaskFactory.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/TaskFactory.kt deleted file mode 100644 index 8513ee0..0000000 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/TaskFactory.kt +++ /dev/null @@ -1,7 +0,0 @@ -package com.liftric.persisted.queue - -import kotlin.reflect.KClass - -interface TaskFactory { - fun create(type: KClass, params: Map): Task -} diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/DelayRule.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/rules/DelayRule.kt index 30272e4..19a240e 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/DelayRule.kt +++ b/src/commonMain/kotlin/com/liftric/persisted/queue/rules/DelayRule.kt @@ -14,7 +14,7 @@ data class DelayRule(val duration: Duration = 0.seconds): JobRule() { } } -fun RuleInfo.delay(duration: Duration = 0.seconds): RuleInfo { +fun JobInfo.delay(duration: Duration = 0.seconds): JobInfo { val rule = DelayRule(duration) rules.add(rule) return this diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/PeriodicRule.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/rules/PeriodicRule.kt index 92e4096..2ec6610 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/PeriodicRule.kt +++ b/src/commonMain/kotlin/com/liftric/persisted/queue/rules/PeriodicRule.kt @@ -15,7 +15,7 @@ data class PeriodicRule(val interval: Duration = 0.seconds): JobRule() { } } -fun RuleInfo.repeat(interval: Duration = 0.seconds): RuleInfo { +fun JobInfo.repeat(interval: Duration = 0.seconds): JobInfo { val rule = PeriodicRule(interval) rules.add(rule) return this diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/PersistenceRule.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/rules/PersistenceRule.kt new file mode 100644 index 0000000..3030829 --- /dev/null +++ b/src/commonMain/kotlin/com/liftric/persisted/queue/rules/PersistenceRule.kt @@ -0,0 +1,18 @@ +package com.liftric.persisted.queue.rules + +import com.liftric.persisted.queue.JobInfo +import com.liftric.persisted.queue.JobRule +import kotlinx.serialization.Serializable + +@Serializable +data class PersistenceRule(val shouldPersist: Boolean): JobRule() { + override suspend fun mutating(info: JobInfo) { + info.shouldPersist = shouldPersist + } +} + +fun JobInfo.persist(shouldPersist: Boolean = true): JobInfo { + val rule = PersistenceRule(shouldPersist) + rules.add(rule) + return this +} diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/RetryRule.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/rules/RetryRule.kt index e09f19f..7576ba1 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/RetryRule.kt +++ b/src/commonMain/kotlin/com/liftric/persisted/queue/rules/RetryRule.kt @@ -16,9 +16,9 @@ data class RetryRule(val limit: RetryLimit, val delay: Duration = 0.seconds): Jo } is RetryLimit.Limited -> { if (limit.count > 0) { - val rules = context.rules.minus(this).plus(RetryRule(RetryLimit.Limited((limit.count + 1) - 2), delay)) + val rules = context.info.rules.minus(this).plus(RetryRule(RetryLimit.Limited((limit.count + 1) - 2), delay)) context.broadcast(RuleEvent.OnRemove(this, "Attempting to retry task=$context")) - context.repeat(rules = rules, startTime = Clock.System.now().plus(delay)) + context.repeat(info = context.info.copy(rules = rules.toMutableList()), startTime = Clock.System.now().plus(delay)) } } } @@ -32,7 +32,7 @@ sealed class RetryLimit { object Unlimited: RetryLimit() } -fun RuleInfo.retry(limit: RetryLimit, delay: Duration = 0.seconds): RuleInfo { +fun JobInfo.retry(limit: RetryLimit, delay: Duration = 0.seconds): JobInfo { val rule = RetryRule(limit, delay) rules.add(rule) return this diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/TimeoutRule.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/rules/TimeoutRule.kt index 2cff1e7..6b1c685 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/TimeoutRule.kt +++ b/src/commonMain/kotlin/com/liftric/persisted/queue/rules/TimeoutRule.kt @@ -2,7 +2,6 @@ package com.liftric.persisted.queue.rules import com.liftric.persisted.queue.JobInfo import com.liftric.persisted.queue.JobRule -import com.liftric.persisted.queue.RuleInfo import kotlin.time.Duration data class TimeoutRule(val timeout: Duration): JobRule() { @@ -11,7 +10,7 @@ data class TimeoutRule(val timeout: Duration): JobRule() { } } -fun RuleInfo.timeout(timeout: Duration): RuleInfo { +fun JobInfo.timeout(timeout: Duration): JobInfo { val rule = TimeoutRule(timeout) rules.add(rule) return this diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/UniqueRule.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/rules/UniqueRule.kt index ad45567..f6bf8e3 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/UniqueRule.kt +++ b/src/commonMain/kotlin/com/liftric/persisted/queue/rules/UniqueRule.kt @@ -11,14 +11,14 @@ data class UniqueRule(private val tag: String? = null): JobRule() { override suspend fun willSchedule(queue: Queue, context: JobContext) { for (item in queue.jobs) { - if (item.tag == tag || item.id == context.id) { + if (item.info.tag == tag || item.id == context.id) { throw Error("Job with id=${item.id} already exists") } } } } -fun RuleInfo.unique(tag: String? = null): RuleInfo { +fun JobInfo.unique(tag: String? = null): JobInfo { val rule = UniqueRule(tag) rules.add(rule) return this diff --git a/src/commonTest/kotlin/com/liftric/persisted/queue/JobSchedulerTests.kt b/src/commonTest/kotlin/com/liftric/persisted/queue/JobSchedulerTests.kt index a0c2861..ddeb48a 100644 --- a/src/commonTest/kotlin/com/liftric/persisted/queue/JobSchedulerTests.kt +++ b/src/commonTest/kotlin/com/liftric/persisted/queue/JobSchedulerTests.kt @@ -8,8 +8,7 @@ import kotlin.time.Duration.Companion.seconds class JobSchedulerTests { @Test fun testSchedule() = runBlocking { - val factory = TestFactory() - val scheduler = JobScheduler(factory) + val scheduler = JobScheduler() val id = UUID::class.instance().toString() val job = async { scheduler.onEvent.collect { @@ -17,19 +16,13 @@ class JobSchedulerTests { } } - scheduler.schedule { - rules { - delay(1.seconds) - unique(id) - } - params("testResultId" to id) + scheduler.schedule(TestTask(TestData(id))) { + delay(1.seconds) + unique(id) } - scheduler.schedule { - rules { - unique(id) - } - params("testResultId" to id) + scheduler.schedule(TestTask(TestData(id))) { + unique(id) } assertEquals(1, scheduler.queue.jobs.count()) @@ -45,8 +38,7 @@ class JobSchedulerTests { @Test fun testRetry() = runBlocking { - val factory = TestFactory() - val scheduler = JobScheduler(factory) + val scheduler = JobScheduler() var count = 0 val job = launch { @@ -58,10 +50,8 @@ class JobSchedulerTests { } } - scheduler.schedule { - rules { - retry(RetryLimit.Limited(3), delay = 1.seconds) - } + scheduler.schedule(TestErrorTask()) { + retry(RetryLimit.Limited(3), delay = 1.seconds) } scheduler.queue.start() @@ -72,14 +62,11 @@ class JobSchedulerTests { @Test fun testCancelDuringRun() { - val factory = TestFactory() - val scheduler = JobScheduler(factory) + val scheduler = JobScheduler() runBlocking { - scheduler.schedule { - rules { - delay(10.seconds) - } + scheduler.schedule(LongRunningTask()) { + delay(10.seconds) } launch { @@ -102,18 +89,17 @@ class JobSchedulerTests { @Test fun testCancelByIdBeforeEnqueue() { - val factory = TestFactory() - val scheduler = JobScheduler(factory) + val scheduler = JobScheduler() runBlocking { val completable = CompletableDeferred() launch { scheduler.onEvent.collect { + println(it) if (it is JobEvent.DidEnd || it is JobEvent.DidFail) fail("Continued after run") if (it is JobEvent.DidSchedule) { completable.complete(it.job.id) - cancel() } if (it is JobEvent.DidCancel) { assertTrue(scheduler.queue.jobs.isEmpty()) @@ -124,32 +110,19 @@ class JobSchedulerTests { delay(1000L) - scheduler.schedule { - rules { - delay(2.seconds) - } + scheduler.schedule(::LongRunningTask) { + delay(2.seconds) } scheduler.queue.cancel(completable.await()) - - assertTrue(scheduler.queue.jobs.isEmpty()) } } @Test fun testCancelByIdAfterEnqueue() { - val factory = TestFactory() - val scheduler = JobScheduler(factory) + val scheduler = JobScheduler() runBlocking { - launch { - scheduler.schedule { - rules { - delay(0.seconds) - } - } - } - launch { scheduler.onEvent.collect { println(it) @@ -163,12 +136,12 @@ class JobSchedulerTests { } } + delay(1000L) + scheduler.queue.start() - scheduler.schedule { - rules { - delay(30.seconds) - } + scheduler.schedule(::LongRunningTask) { + delay(10.seconds) } } } diff --git a/src/commonTest/kotlin/com/liftric/persisted/queue/TestFactory.kt b/src/commonTest/kotlin/com/liftric/persisted/queue/TestFactory.kt deleted file mode 100644 index 027d244..0000000 --- a/src/commonTest/kotlin/com/liftric/persisted/queue/TestFactory.kt +++ /dev/null @@ -1,12 +0,0 @@ -package com.liftric.persisted.queue - -import kotlin.reflect.KClass - -class TestFactory: TaskFactory { - override fun create(type: KClass, params: Map): Task = when(type) { - TestTask::class -> TestTask(params) - LongRunningTask::class -> LongRunningTask(params) - TestErrorTask::class -> TestErrorTask(params) - else -> throw Exception("Unknown job!") - } -} diff --git a/src/commonTest/kotlin/com/liftric/persisted/queue/TestTask.kt b/src/commonTest/kotlin/com/liftric/persisted/queue/TestTask.kt index fda2836..f697853 100644 --- a/src/commonTest/kotlin/com/liftric/persisted/queue/TestTask.kt +++ b/src/commonTest/kotlin/com/liftric/persisted/queue/TestTask.kt @@ -1,25 +1,20 @@ package com.liftric.persisted.queue import kotlinx.coroutines.delay +import kotlinx.serialization.Serializable -class TestTask(override val params: Map): Task { - private val testResultId: String by params +@Serializable +data class TestData(val testResultId: String) - override suspend fun body() { } +class TestTask(data: TestData): DataTask(data) { + override suspend fun body() { } } -class TestErrorTask(override val params: Map): Task { - override suspend fun body() { - throw Error("Oh shoot!") - } - - override suspend fun onRepeat(cause: Throwable): Boolean { - return cause is Error - } +class TestErrorTask: Task() { + override suspend fun body() { throw Error("Oh shoot!") } + override suspend fun onRepeat(cause: Throwable): Boolean = cause is Error } -class LongRunningTask(override val params: Map): Task { - override suspend fun body() { - delay(10000L) - } +class LongRunningTask: Task() { + override suspend fun body() { delay(10000L) } } diff --git a/src/iosMain/kotlin/com/liftric/persisted/queue/UUID.kt b/src/iosMain/kotlin/com/liftric/persisted/queue/UUID.kt index 7fbc02a..fe555fe 100644 --- a/src/iosMain/kotlin/com/liftric/persisted/queue/UUID.kt +++ b/src/iosMain/kotlin/com/liftric/persisted/queue/UUID.kt @@ -5,7 +5,9 @@ import kotlinx.serialization.descriptors.PrimitiveKind import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor import kotlinx.serialization.encoding.Decoder import kotlinx.serialization.encoding.Encoder +import platform.Foundation.NSObjectHashCallBacks import platform.Foundation.NSUUID +import platform.darwin.NSObject import kotlin.reflect.KClass actual typealias UUID = NSUUID