Skip to content
This repository has been archived by the owner on Oct 14, 2023. It is now read-only.

Commit

Permalink
Merge pull request #3 from Liftric/feat/remove-factory
Browse files Browse the repository at this point in the history
Refactor: Remove factory
  • Loading branch information
benjohnde authored Dec 27, 2022
2 parents b520ec7 + 5306b81 commit f9de439
Show file tree
Hide file tree
Showing 20 changed files with 167 additions and 218 deletions.
49 changes: 23 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,40 @@ Coroutine job scheduler inspired by `Android Work Manager` and `android-priority

## Example

Kotlin/Native doesn't have full reflection capabilities, thus we instantiate the job classes in a custom factory class.
Define a `DataTask<*>` or a `Task` (`DataTask<Unit>`), customize its body and limit when it should repeat.

⚠️ Make sure the data you pass into the task is serializable.

```kotlin
class TestFactory: TaskFactory {
override fun <T : Task> create(type: KClass<T>, params: Map<String, Any>): Task = when(type) {
TestTask::class -> TestTask(params)
else -> throw Exception("Unknown job class!")
}
@Serializable
data class UploadData(val id: String)

class UploadTask(data: UploadData): DataTask<UploadData>(data) {
override suspend fun body() { /* Do something */ }
override suspend fun onRepeat(cause: Throwable): Boolean { cause is NetworkException }
}
```

Create a single instance of the scheduler on app start, and then start the queue to enqueue scheduled jobs.
Create a single instance of the scheduler on app start. To start enqueuing jobs run `queue.start()`.

You can pass a `Queue.Configuration` or a custom `JobSerializer` to the scheduler.

```kotlin
val factory = TestFactory()
val scheduler = JobScheduler(factory)
val scheduler = JobScheduler()
scheduler.queue.start()
```

On job schedule you can add rules, define a store, and inject parameters.

````kotlin
val data = ...

scheduler.schedule<UploadTask> {
rules {
retry(RetryLimit.Limited(3), delay = 30.seconds)
unique(data.id)
timeout(60.seconds)
}
persist(Store.Preferences)
params(
"result" to data.value,
"timestamp" to data.date
)
You can customize the jobs life cycle during schedule by defining rules.

```kotlin
val data = UploadData(id = ...)

scheduler.schedule(UploadTask(data)) {
unique(data.id)
retry(RetryLimit.Limited(3), delay = 30.seconds)
persist()
}
````
```

You can subscribe to life cycle events (e.g. for logging).

Expand Down
27 changes: 8 additions & 19 deletions src/commonMain/kotlin/com/liftric/persisted/queue/Job.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,17 @@ package com.liftric.persisted.queue
import kotlinx.coroutines.*
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlinx.serialization.Serializable
import kotlinx.serialization.Transient
import kotlin.reflect.KClass
import kotlin.time.Duration

@Retention(AnnotationRetention.RUNTIME)
annotation class RepeatOn(val clazz: KClass<Throwable>)

@Serializable
data class Job(
@Serializable(with = UUIDSerializer::class)
class Job(
override val id: UUID,
override val timeout: Duration,
override val task: Task,
override val tag: String?,
override val rules: List<JobRule>,
override val info: JobInfo,
override val task: DataTask<*>,
override val startTime: Instant = Clock.System.now()
): JobContext {
@Transient
var delegate: JobDelegate? = null

constructor(task: Task, info: JobInfo) : this (UUID::class.instance(), info.timeout, task, info.tag, info.rules)
constructor(task: DataTask<*>, info: JobInfo) : this (UUID::class.instance(), info, task)

private var cancellable: kotlinx.coroutines.Job? = null

Expand All @@ -38,7 +27,7 @@ data class Job(
if (isCancelled) return@coroutineScope
cancellable = launch {
val event = try {
rules.forEach { it.willRun(this@Job) }
info.rules.forEach { it.willRun(this@Job) }

delegate?.broadcast(JobEvent.WillRun(this@Job))

Expand All @@ -57,7 +46,7 @@ data class Job(

if (isCancelled) return@launch

rules.forEach { it.willRemove(this@Job, event) }
info.rules.forEach { it.willRemove(this@Job, event) }
} catch (e: CancellationException) {
delegate?.broadcast(JobEvent.DidCancel(this@Job, "Cancelled after run"))
} catch (e: Error) {
Expand All @@ -76,9 +65,9 @@ data class Job(
delegate?.exit()
}

override suspend fun repeat(id: UUID, timeout: Duration, task: Task, tag: String?, rules: List<JobRule>, startTime: Instant) {
override suspend fun repeat(id: UUID, info: JobInfo, task: DataTask<*>, startTime: Instant) {
if (canRepeat) {
delegate?.repeat(Job(id, timeout, task, tag, rules, startTime))
delegate?.repeat(Job(id, info, task, startTime))
} else {
delegate?.broadcast(JobEvent.NotAllowedToRepeat(this@Job))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package com.liftric.persisted.queue

import kotlinx.datetime.Instant
import kotlin.time.Duration

interface JobContext {
val id: UUID
val timeout: Duration
val task: Task
val tag: String?
val rules: List<JobRule>
val info: JobInfo
val task: DataTask<*>
val startTime: Instant
suspend fun cancel()
suspend fun repeat(id: UUID = this.id, timeout: Duration = this.timeout, task: Task = this.task, tag: String? = this.tag, rules: List<JobRule> = this.rules, startTime: Instant = this.startTime)
suspend fun repeat(id: UUID = this.id, info: JobInfo = this.info, task: DataTask<*> = this.task, startTime: Instant = this.startTime)
suspend fun broadcast(event: RuleEvent)
}
34 changes: 4 additions & 30 deletions src/commonMain/kotlin/com/liftric/persisted/queue/JobInfo.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,7 @@ import kotlin.time.Duration

data class JobInfo(
var tag: String? = null,
var timeout: Duration = Duration.INFINITE
) {
var rules: List<JobRule> = listOf()
private set
var params: Map<String, String> = mapOf()
private set
var persister: JobPersister? = null
private set

fun rules(init: RuleInfo.() -> Unit): JobInfo {
val info = RuleInfo()
info.init()
rules = info.rules.distinctBy { it::class }
return this
}

fun persist(persister: JobPersister): JobInfo {
this.persister = persister
return this
}

fun params(vararg params: Pair<String, String>): JobInfo {
this.params = params.toMap()
return this
}
}

class RuleInfo {
val rules: MutableList<JobRule> = mutableListOf()
}
var timeout: Duration = Duration.INFINITE,
var rules: MutableList<JobRule> = mutableListOf(),
var shouldPersist: Boolean = false
)
31 changes: 17 additions & 14 deletions src/commonMain/kotlin/com/liftric/persisted/queue/JobScheduler.kt
Original file line number Diff line number Diff line change
@@ -1,39 +1,42 @@
package com.liftric.persisted.queue

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.*
import kotlinx.serialization.InternalSerializationApi
import kotlinx.serialization.serializerOrNull

class JobScheduler(
val factory: TaskFactory,
configuration: Queue.Configuration? = null
configuration: Queue.Configuration? = null,
private val serializer: JobSerializer? = null
) {
val queue = JobQueue(configuration)

@PublishedApi
internal val delegate = JobDelegate()

val queue = JobQueue(configuration ?: Queue.Configuration(CoroutineScope(Dispatchers.Default), 1))
val onEvent = MutableSharedFlow<JobEvent>(extraBufferCapacity = Int.MAX_VALUE)

private val delegate = JobDelegate()

init {
delegate.onExit = { /* Do something */ }
delegate.onRepeat = { repeat(it) }
delegate.onEvent = { onEvent.emit(it) }
}

suspend inline fun <reified T: Task> schedule() {
schedule<T> { this }
suspend fun schedule(task: () -> DataTask<*>, configure: JobInfo.() -> JobInfo = { JobInfo() }) {
schedule(task(), configure)
}

suspend inline fun <reified T: Task> schedule(init: JobInfo.() -> JobInfo) = try {
val info = init(JobInfo()).apply {
@OptIn(InternalSerializationApi::class)
suspend fun schedule(task: DataTask<*>, configure: JobInfo.() -> JobInfo = { JobInfo() }) = try {
val info = configure(JobInfo()).apply {
rules.forEach { it.mutating(this) }
}

val task = factory.create(T::class, info.params)
if (task.data!!::class.serializerOrNull() == null) throw Exception("Data must be serializable")

val job = Job(task, info)
job.delegate = delegate

job.rules.forEach {
job.info.rules.forEach {
it.willSchedule(queue, job)
}

Expand All @@ -47,7 +50,7 @@ class JobScheduler(
private suspend fun repeat(job: Job) = try {
job.delegate = delegate

job.rules.forEach {
job.info.rules.forEach {
it.willSchedule(queue, job)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.liftric.persisted.queue

interface JobPersister {
interface JobSerializer {
val tag: String
fun store(job: Job)
fun retrieve(id: String): Job?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json

expect class Preferences: AbstractPreferences
abstract class AbstractPreferences(private val settings: Settings): JobPersister {
abstract class AbstractPreferences(private val settings: Settings): JobSerializer {
override val tag: String = ""
override fun retrieve(id: String): Job? {
val jsonString = settings.get<String>(id) ?: return null
Expand Down
68 changes: 42 additions & 26 deletions src/commonMain/kotlin/com/liftric/persisted/queue/Queue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,46 @@ package com.liftric.persisted.queue

import kotlinx.atomicfu.atomic
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.sync.withPermit
import kotlinx.datetime.Clock
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.coroutineContext

interface Queue {
val scope: CoroutineScope
val jobs: List<JobContext>
val maxConcurrency: Int
val configuration: Configuration

data class Configuration(
val scope: CoroutineScope,
val maxConcurrency: Int
)
}

class JobQueue(
override val scope: CoroutineScope,
override val maxConcurrency: Int
): Queue {
class JobQueue(override val configuration: Queue.Configuration): Queue {
private val cancellationQueue = MutableSharedFlow<kotlinx.coroutines.Job>(extraBufferCapacity = Int.MAX_VALUE)
private val queue = atomic(mutableListOf<Job>())
private val lock = Semaphore(maxConcurrency, 0)
private val lock = Semaphore(configuration.maxConcurrency, 0)
private val isCancelling = Mutex(false)
override val jobs: List<JobContext>
get() = queue.value

constructor(configuration: Queue.Configuration?) : this(
configuration?.scope ?: CoroutineScope(Dispatchers.Default),
configuration?.maxConcurrency ?: 1
)
init {
cancellationQueue.onEach { it.join() }
.flowOn(Dispatchers.Default)
.launchIn(configuration.scope)
}

@PublishedApi
internal fun add(job: Job) {
queue.value = queue.value.plus(listOf(job)).sortedBy { it.startTime }.toMutableList()
}

suspend fun start() {
while (scope.isActive) {
delay(1000L)
while (configuration.scope.isActive) {
if (queue.value.isEmpty()) break
if (isCancelling.isLocked) break
if (lock.availablePermits < 1) break
Expand All @@ -50,7 +50,7 @@ class JobQueue(
queue.value.remove(job)
} else if (job.startTime <= Clock.System.now()) {
lock.withPermit {
withTimeout(job.timeout) {
withTimeout(job.info.timeout) {
job.run()
queue.value.remove(job)
}
Expand All @@ -60,25 +60,41 @@ class JobQueue(
}

suspend fun cancel() {
isCancelling.withLock {
scope.coroutineContext.cancelChildren()
queue.value.clear()
submitCancellation(coroutineContext) {
isCancelling.withLock {
configuration.scope.coroutineContext.cancelChildren()
queue.value.clear()
}
}
}

suspend fun cancel(id: UUID) {
isCancelling.withLock {
val job = queue.value.first { it.id == id }
job.cancel()
queue.value.remove(job)
submitCancellation(coroutineContext) {
isCancelling.withLock {
queue.value.firstOrNull { it.id == id }?.let { job ->
job.cancel()
queue.value.remove(job)
}
}
}
}

suspend fun cancel(tag: String) {
isCancelling.withLock {
val job = queue.value.first { it.tag == tag }
job.cancel()
queue.value.remove(job)
submitCancellation(coroutineContext) {
isCancelling.withLock {
queue.value.firstOrNull { it.info.tag == tag }?.let { job ->
job.cancel()
queue.value.remove(job)
}
}
}
}

private fun submitCancellation(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> Unit
) {
val job = configuration.scope.launch(context, CoroutineStart.LAZY, block)
cancellationQueue.tryEmit(job)
}
}
Loading

0 comments on commit f9de439

Please sign in to comment.