Skip to content
This repository has been archived by the owner on Oct 14, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1 from Liftric/feat/simple-impl
Browse files Browse the repository at this point in the history
Start of a basic implementation
  • Loading branch information
benjohnde authored Dec 9, 2022
2 parents fc41533 + 8a05e1f commit b520ec7
Show file tree
Hide file tree
Showing 28 changed files with 878 additions and 3 deletions.
65 changes: 64 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,64 @@
# Kotlin Multiplatform Persisted Queue Library
# Persisted-queue

Coroutine job scheduler inspired by `Android Work Manager` and `android-priority-jobqueue` for Kotlin Multiplatform projects. Run & repeat tasks. Rebuild them from disk. Fine tune execution with rules.

## Rules

- [x] Delay
- [x] Timeout
- [x] Retry
- [x] Periodic
- [x] Unique
- [ ] Internet

## Capabilities

- [x] Cancellation (all, by id, by tag)

## Example

Kotlin/Native doesn't have full reflection capabilities, thus we instantiate the job classes in a custom factory class.

```kotlin
class TestFactory: TaskFactory {
override fun <T : Task> create(type: KClass<T>, params: Map<String, Any>): Task = when(type) {
TestTask::class -> TestTask(params)
else -> throw Exception("Unknown job class!")
}
}
```

Create a single instance of the scheduler on app start, and then start the queue to enqueue scheduled jobs.

```kotlin
val factory = TestFactory()
val scheduler = JobScheduler(factory)
scheduler.queue.start()
```

On job schedule you can add rules, define a store, and inject parameters.

````kotlin
val data = ...

scheduler.schedule<UploadTask> {
rules {
retry(RetryLimit.Limited(3), delay = 30.seconds)
unique(data.id)
timeout(60.seconds)
}
persist(Store.Preferences)
params(
"result" to data.value,
"timestamp" to data.date
)
}
````

You can subscribe to life cycle events (e.g. for logging).

```kotlin
scheduler.onEvent.collect { event ->
Logger.info(event)
}
```
3 changes: 3 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ kotlin {
dependencies {
implementation(libs.kotlinx.coroutines)
implementation(libs.kotlinx.serialization)
implementation(libs.kotlinx.datetime)
implementation(libs.kotlinx.atomicfu)
implementation(libs.multiplatform.settings)
}
}
val commonTest by getting {
Expand Down
6 changes: 4 additions & 2 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ dependencyResolutionManagement {
versionCatalogs {
create("libs") {
version("android-tools-gradle", "7.2.2")
version("kotlin", "1.7.10")
version("kotlin", "1.7.20")
library("kotlinx-coroutines", "org.jetbrains.kotlinx", "kotlinx-coroutines-core").version("1.6.4")
library("kotlinx-serialization", "org.jetbrains.kotlinx", "kotlinx-serialization-json").version("1.4.0")
library("kotlinx-atomicfu", "org.jetbrains.kotlinx", "atomicfu").version("0.18.5")
library("kotlinx-datetime", "org.jetbrains.kotlinx", "kotlinx-datetime").version("0.4.0")
library("androidx-test-core", "androidx.test", "core").version("1.4.0")
library("roboelectric", "org.robolectric", "robolectric").version("4.5.1")
library("multiplatform-settings", "com.russhwolf", "multiplatform-settings").version("0.9")
library("multiplatform-settings", "com.russhwolf", "multiplatform-settings").version("1.0.0-RC")
plugin("versioning", "net.nemerosa.versioning").version("3.0.0")
plugin("kotlin.serialization", "org.jetbrains.kotlin.plugin.serialization").versionRef("kotlin")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.liftric.persisted.queue

import android.content.Context
import com.russhwolf.settings.SharedPreferencesSettings

actual class Preferences(context: Context): AbstractPreferences(SharedPreferencesSettings.Factory(context).create("com.liftric.job.scheduler"))
25 changes: 25 additions & 0 deletions src/androidMain/kotlin/com/liftric/persisted/queue/UUID.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.liftric.persisted.queue

import kotlinx.serialization.KSerializer
import kotlinx.serialization.descriptors.PrimitiveKind
import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor
import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder
import java.util.UUID
import kotlin.reflect.KClass

actual typealias UUID = UUID

actual fun KClass<UUID>.instance(): UUID = UUID.randomUUID()

actual object UUIDSerializer: KSerializer<UUID> {
override val descriptor = PrimitiveSerialDescriptor("UUID", PrimitiveKind.STRING)

override fun deserialize(decoder: Decoder): UUID {
return UUID.fromString(decoder.decodeString())
}

override fun serialize(encoder: Encoder, value: UUID) {
encoder.encodeString(value.toString().lowercase())
}
}
90 changes: 90 additions & 0 deletions src/commonMain/kotlin/com/liftric/persisted/queue/Job.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.liftric.persisted.queue

import kotlinx.coroutines.*
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlinx.serialization.Serializable
import kotlinx.serialization.Transient
import kotlin.reflect.KClass
import kotlin.time.Duration

@Retention(AnnotationRetention.RUNTIME)
annotation class RepeatOn(val clazz: KClass<Throwable>)

@Serializable
data class Job(
@Serializable(with = UUIDSerializer::class)
override val id: UUID,
override val timeout: Duration,
override val task: Task,
override val tag: String?,
override val rules: List<JobRule>,
override val startTime: Instant = Clock.System.now()
): JobContext {
@Transient
var delegate: JobDelegate? = null

constructor(task: Task, info: JobInfo) : this (UUID::class.instance(), info.timeout, task, info.tag, info.rules)

private var cancellable: kotlinx.coroutines.Job? = null

var isCancelled: Boolean = false
private set

private var canRepeat: Boolean = false

internal suspend fun run() {
coroutineScope {
if (isCancelled) return@coroutineScope
cancellable = launch {
val event = try {
rules.forEach { it.willRun(this@Job) }

delegate?.broadcast(JobEvent.WillRun(this@Job))

task.body()

JobEvent.DidEnd(this@Job)
} catch (e: CancellationException) {
JobEvent.DidCancel(this@Job, "Cancelled during run")
} catch (e: Error) {
canRepeat = task.onRepeat(e)
JobEvent.DidFail(this@Job, e)
}

try {
delegate?.broadcast(event)

if (isCancelled) return@launch

rules.forEach { it.willRemove(this@Job, event) }
} catch (e: CancellationException) {
delegate?.broadcast(JobEvent.DidCancel(this@Job, "Cancelled after run"))
} catch (e: Error) {
delegate?.broadcast(JobEvent.DidFailOnRemove(this@Job, e))
}
}
}
}

override suspend fun cancel() {
if (isCancelled) return
isCancelled = true
cancellable?.cancel(CancellationException("Cancelled during run")) ?: run {
delegate?.broadcast(JobEvent.DidCancel(this@Job, "Cancelled before run"))
}
delegate?.exit()
}

override suspend fun repeat(id: UUID, timeout: Duration, task: Task, tag: String?, rules: List<JobRule>, startTime: Instant) {
if (canRepeat) {
delegate?.repeat(Job(id, timeout, task, tag, rules, startTime))
} else {
delegate?.broadcast(JobEvent.NotAllowedToRepeat(this@Job))
}
}

override suspend fun broadcast(event: RuleEvent) {
delegate?.broadcast(event)
}
}
16 changes: 16 additions & 0 deletions src/commonMain/kotlin/com/liftric/persisted/queue/JobContext.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.liftric.persisted.queue

import kotlinx.datetime.Instant
import kotlin.time.Duration

interface JobContext {
val id: UUID
val timeout: Duration
val task: Task
val tag: String?
val rules: List<JobRule>
val startTime: Instant
suspend fun cancel()
suspend fun repeat(id: UUID = this.id, timeout: Duration = this.timeout, task: Task = this.task, tag: String? = this.tag, rules: List<JobRule> = this.rules, startTime: Instant = this.startTime)
suspend fun broadcast(event: RuleEvent)
}
20 changes: 20 additions & 0 deletions src/commonMain/kotlin/com/liftric/persisted/queue/JobDelegate.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.liftric.persisted.queue

class JobDelegate {
var onExit: (suspend () -> Unit)? = null
var onRepeat: (suspend (Job) -> Unit)? = null
var onEvent: (suspend (JobEvent) -> Unit)? = null

suspend fun broadcast(event: JobEvent) {
onEvent?.invoke(event)
}

suspend fun exit() {
onExit?.invoke()
}

suspend fun repeat(job: Job) {
onRepeat?.invoke(job)
}
}

32 changes: 32 additions & 0 deletions src/commonMain/kotlin/com/liftric/persisted/queue/JobEvent.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.liftric.persisted.queue

sealed class JobEvent {
data class DidSchedule(val job: JobContext): JobEvent()
data class DidScheduleRepeat(val job: JobContext): JobEvent()
data class WillRun(val job: JobContext): JobEvent()
data class DidThrowOnRepeat(val error: Error): JobEvent()
data class DidThrowOnSchedule(val error: Error): JobEvent()
data class DidEnd(val job: JobContext): JobEvent()
data class DidFail(val job: JobContext, val error: Error): JobEvent()
data class DidCancel(val job: JobContext, val message: String): JobEvent()
data class DidFailOnRemove(val job: JobContext, val error: Error): JobEvent()
data class NotAllowedToRepeat(val job: JobContext): JobEvent()
}

sealed class RuleEvent(open val rule: String, open val message: String): JobEvent() {
data class OnMutate(override val rule: String, override val message: String): RuleEvent(rule, message) {
constructor(rule: JobRule, message: String) : this(rule::class.simpleName!!, message)
}

data class OnSchedule(override val rule: String, override val message: String): RuleEvent(rule, message) {
constructor(rule: JobRule, message: String) : this(rule::class.simpleName!!, message)
}

data class OnRun(override val rule: String, override val message: String): RuleEvent(rule, message) {
constructor(rule: JobRule, message: String) : this(rule::class.simpleName!!, message)
}

data class OnRemove(override val rule: String, override val message: String): RuleEvent(rule, message) {
constructor(rule: JobRule, message: String) : this(rule::class.simpleName!!, message)
}
}
36 changes: 36 additions & 0 deletions src/commonMain/kotlin/com/liftric/persisted/queue/JobInfo.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.liftric.persisted.queue

import kotlin.time.Duration

data class JobInfo(
var tag: String? = null,
var timeout: Duration = Duration.INFINITE
) {
var rules: List<JobRule> = listOf()
private set
var params: Map<String, String> = mapOf()
private set
var persister: JobPersister? = null
private set

fun rules(init: RuleInfo.() -> Unit): JobInfo {
val info = RuleInfo()
info.init()
rules = info.rules.distinctBy { it::class }
return this
}

fun persist(persister: JobPersister): JobInfo {
this.persister = persister
return this
}

fun params(vararg params: Pair<String, String>): JobInfo {
this.params = params.toMap()
return this
}
}

class RuleInfo {
val rules: MutableList<JobRule> = mutableListOf()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.liftric.persisted.queue

interface JobPersister {
val tag: String
fun store(job: Job)
fun retrieve(id: String): Job?
fun retrieveAll(): List<Job>
}
12 changes: 12 additions & 0 deletions src/commonMain/kotlin/com/liftric/persisted/queue/JobRule.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.liftric.persisted.queue

import kotlinx.serialization.Serializable

@Serializable
abstract class JobRule {
open suspend fun mutating(info: JobInfo) {}
@Throws(Throwable::class)
open suspend fun willSchedule(queue: Queue, context: JobContext) {}
open suspend fun willRun(context: JobContext) {}
open suspend fun willRemove(context: JobContext, result: JobEvent) {}
}
Loading

0 comments on commit b520ec7

Please sign in to comment.