diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e7478f6..85c0688 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,5 +1,4 @@ name: Build & test - on: pull_request: types: [ opened, reopened, synchronize ] @@ -8,19 +7,19 @@ on: push: branches: - main - jobs: test: runs-on: macOS-latest steps: - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 - name: set up JDK 11 - uses: actions/setup-java@v1 + uses: actions/setup-java@v3 with: - java-version: 11 + java-version: '11' + distribution: 'adopt' - name: Build and test - run: region=${{ secrets.region }} clientId=${{ secrets.clientid }} ./gradlew build test + run: ./gradlew build test - name: Upload test result if: ${{ always() }} uses: actions/upload-artifact@v2 diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 4730b10..877120e 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -1,27 +1,26 @@ -name: Publish to OSSRH +name: Publish to Github on: release: types: [published] - jobs: publish: runs-on: macOS-latest - steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up JDK 11 - uses: actions/setup-java@v1 + uses: actions/setup-java@v3 with: - java-version: 11 + java-version: '11' + distribution: 'adopt' + - name: Validate Gradle wrapper + uses: gradle/wrapper-validation-action@e6e38bacfdf1a337459f332974bb2327a31aaf4b - name: Grant Permission to Execute run: chmod +x gradlew - name: New version run: ./gradlew versionDisplay - - name: Publish Library + - name: Publish package + uses: gradle/gradle-build-action@67421db6bd0bf253fb4bd25b31ebb98943c375e1 + with: + arguments: publish env: - ORG_GRADLE_PROJECT_signingKey: ${{ secrets.OSSRH_GPG_SECRET_KEY }} - ORG_GRADLE_PROJECT_signingPassword: ${{ secrets.OSSRH_GPG_SECRET_KEY_PASSWORD }} - ORG_GRADLE_PROJECT_ossrhUsername: ${{ secrets.OSSRH_USERNAME }} - ORG_GRADLE_PROJECT_ossrhPassword: ${{ secrets.OSSRH_PASSWORD }} - ORG_GRADLE_PROJECT_npmAccessKey: ${{ secrets.NPMJS_ACCESS_KEY }} - run: region=${{ secrets.region }} clientId=${{ secrets.clientid }} ./gradlew publishAllPublicationsToSonatypeRepository + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/README.md b/README.md index 89185f2..c6cb020 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,10 @@ -# Persisted-queue +# kmm-job-queue -Coroutine job scheduler inspired by `Android Work Manager` and `android-priority-jobqueue` for Kotlin Multiplatform projects. Run & repeat tasks. Rebuild them from disk. Fine tune execution with rules. +Coroutine job scheduler for Kotlin Multiplatform projects. Run & repeat tasks. Rebuild them from disk. Fine tune execution with rules. + +The library depends on `kotlinx-serialization` for the persistence of the jobs. + +⚠️ The project is still work in progress and shouldn't be used in a production project. ## Rules @@ -9,15 +13,17 @@ Coroutine job scheduler inspired by `Android Work Manager` and `android-priority - [x] Retry - [x] Periodic - [x] Unique -- [ ] Internet +- [ ] Network ## Capabilities -- [x] Cancellation (all, by id, by tag) +- [x] Cancellation (all, by id) +- [x] Start & stop scheduling +- [x] Restore from disk (after start) ## Example -Define a `DataTask<*>` or a `Task` (`DataTask`), customize its body and limit when it should repeat. +Define a `Task` (or `DataTask`), customize its body and limit when it should repeat. ⚠️ Make sure the data you pass into the task is serializable. @@ -27,25 +33,31 @@ data class UploadData(val id: String) class UploadTask(data: UploadData): DataTask(data) { override suspend fun body() { /* Do something */ } - override suspend fun onRepeat(cause: Throwable): Boolean { cause is NetworkException } + override suspend fun onRepeat(cause: Throwable): Boolean { cause is NetworkException } // Won't retry if false } ``` -Create a single instance of the scheduler on app start. To start enqueuing jobs run `queue.start()`. +Create a single instance of the job queue on app start. To start enqueuing jobs run `jobQueue.start()`. + +⚠️ You have to provide the polymorphic serializer of your custom task **if you want to persist it**. -You can pass a `Queue.Configuration` or a custom `JobSerializer` to the scheduler. +You can pass a custom `Queue.Configuration` or `JsonStorage` to the job queue. ```kotlin -val scheduler = JobScheduler() -scheduler.queue.start() +val jobQueue = JobQueue(serializers = SerializersModule { + polymorphic(Task::class) { + subclass(UploadTask::class, UploadTask.serializer()) + } +}) +jobQueue.start() ``` You can customize the jobs life cycle during schedule by defining rules. ```kotlin -val data = UploadData(id = ...) +val data = UploadData(id = "123456") -scheduler.schedule(UploadTask(data)) { +jobQueue.schedule(UploadTask(data)) { unique(data.id) retry(RetryLimit.Limited(3), delay = 30.seconds) persist() @@ -55,7 +67,10 @@ scheduler.schedule(UploadTask(data)) { You can subscribe to life cycle events (e.g. for logging). ```kotlin -scheduler.onEvent.collect { event -> - Logger.info(event) +jobQueue.listener.collect { event -> + when (event) { + is JobEvent.DidFail -> Logger.error(event.error) + else -> Logger.info(event) + } } ``` diff --git a/build.gradle.kts b/build.gradle.kts index e590de3..ac734b9 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,3 +1,5 @@ +import org.jetbrains.kotlin.gradle.targets.native.tasks.KotlinNativeSimulatorTest + plugins { id("com.android.library") version libs.versions.android.tools.gradle kotlin("multiplatform") version libs.versions.kotlin @@ -7,22 +9,13 @@ plugins { id("signing") } -repositories { - mavenCentral() - google() - gradlePluginPortal() -} - -java { - sourceCompatibility = JavaVersion.VERSION_1_8 - targetCompatibility = JavaVersion.VERSION_1_8 +group = "com.liftric" +version = with(versioning.info) { + if (branch == "HEAD" && dirty.not()) tag else full } kotlin { - ios { - binaries.framework() - } - + ios() iosSimulatorArm64() android { @@ -44,6 +37,8 @@ kotlin { implementation(kotlin("test")) implementation(kotlin("test-common")) implementation(kotlin("test-annotations-common")) + implementation(libs.multiplatform.settings.test) + implementation(libs.kotlinx.coroutines.test) } } val androidMain by getting @@ -53,6 +48,9 @@ kotlin { implementation(kotlin("test")) implementation(kotlin("test-junit")) implementation(libs.androidx.test.core) + implementation(libs.androidx.test.runner) + implementation(libs.androidx.test.ext) + } } val iosMain by getting @@ -73,11 +71,13 @@ kotlin { } android { - compileSdk = 30 + compileSdk = 33 + + namespace = "com.liftric.job.queue" defaultConfig { minSdk = 21 - targetSdk = 30 + targetSdk = 33 testInstrumentationRunner = "androidx.test.runner" } @@ -85,22 +85,26 @@ android { sourceCompatibility = JavaVersion.VERSION_11 targetCompatibility = JavaVersion.VERSION_11 } - testOptions { - unitTests.apply { + unitTests { isReturnDefaultValues = true } } + publishing { + multipleVariants { + withSourcesJar() + withJavadocJar() + allVariants() + } + } } -group = "com.liftric" -version = with(versioning.info) { - if (branch == "HEAD" && dirty.not()) tag else full -} - -afterEvaluate { - project.publishing.publications.withType(MavenPublication::class.java).forEach { - it.groupId = group.toString() +tasks { + withType { + deviceId = "iPhone 14" + } + withType(JavaCompile::class) { + options.release.set(11) } } @@ -114,11 +118,11 @@ val javadocJar by tasks.registering(Jar::class) { publishing { repositories { maven { - name = "sonatype" - setUrl("https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/") + name = "GitHubPackages" + setUrl("https://maven.pkg.github.com/Liftric/kmm-job-queue") credentials { - username = ossrhUsername - password = ossrhPassword + username = System.getenv("GITHUB_ACTOR") + password = System.getenv("GITHUB_TOKEN") } } } @@ -128,13 +132,13 @@ publishing { pom { name.set(project.name) - description.set("Kotlin Multiplatform persisted queue library.") - url.set("https://github.com/liftric/cognito-idp") + description.set("Persistable coroutine job queue for Kotlin Multiplatform projects.") + url.set("https://github.com/Liftric/kmm-job-queue") licenses { license { name.set("MIT") - url.set("https://github.com/liftric/cognito-idp/blob/master/LICENSE") + url.set("https://github.com/Liftric/kmm-job-queue/blob/master/LICENSE") } } developers { @@ -145,12 +149,18 @@ publishing { } } scm { - url.set("https://github.com/liftric/persisted-queue") + url.set("https://github.com/Liftric/kmm-job-queue") } } } } +afterEvaluate { + project.publishing.publications.withType(MavenPublication::class.java).forEach { + it.groupId = group.toString() + } +} + signing { val signingKey: String? by project val signingPassword: String? by project diff --git a/gradle.properties b/gradle.properties index 081cad3..ab72ab0 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,10 +1,10 @@ android.useAndroidX=true +android.disableAutomaticComponentCreation=true org.gradle.parallel=true org.gradle.jvmargs=-Xmx4096m org.gradle.vfs.watch=true -kotlin.native.enableDependencyPropagation=false -kotlin.mpp.enableGranularSourceSetsMetadata=true kotlin.mpp.enableCompatibilityMetadataVariant=true +kotlin.mpp.androidSourceSetLayoutVersion1.nowarn=true kotlin.incremental=true kotlin.incremental.multiplatform=true kotlin.caching.enabled=true diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 8fad3f5..f42e62f 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.5.1-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-all.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/settings.gradle.kts b/settings.gradle.kts index 4b10170..43f8a79 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -1,4 +1,4 @@ -rootProject.name = "persisted-queue" +rootProject.name = "job-queue" pluginManagement { repositories { @@ -16,15 +16,19 @@ dependencyResolutionManagement { versionCatalogs { create("libs") { - version("android-tools-gradle", "7.2.2") - version("kotlin", "1.7.20") + version("android-tools-gradle", "7.3.0") + version("kotlin", "1.8.0") library("kotlinx-coroutines", "org.jetbrains.kotlinx", "kotlinx-coroutines-core").version("1.6.4") - library("kotlinx-serialization", "org.jetbrains.kotlinx", "kotlinx-serialization-json").version("1.4.0") - library("kotlinx-atomicfu", "org.jetbrains.kotlinx", "atomicfu").version("0.18.5") + library("kotlinx-coroutines-test", "org.jetbrains.kotlinx", "kotlinx-coroutines-test").version("1.6.4") + library("kotlinx-serialization", "org.jetbrains.kotlinx", "kotlinx-serialization-json").version("1.4.1") + library("kotlinx-atomicfu", "org.jetbrains.kotlinx", "atomicfu").version("0.19.0") library("kotlinx-datetime", "org.jetbrains.kotlinx", "kotlinx-datetime").version("0.4.0") library("androidx-test-core", "androidx.test", "core").version("1.4.0") + library("androidx-test-runner", "androidx.test", "runner").version("1.4.0") + library("androidx-test-ext", "androidx.test.ext", "junit").version("1.1.3") library("roboelectric", "org.robolectric", "robolectric").version("4.5.1") library("multiplatform-settings", "com.russhwolf", "multiplatform-settings").version("1.0.0-RC") + library("multiplatform-settings-test", "com.russhwolf", "multiplatform-settings-test").version("1.0.0-RC") plugin("versioning", "net.nemerosa.versioning").version("3.0.0") plugin("kotlin.serialization", "org.jetbrains.kotlin.plugin.serialization").versionRef("kotlin") } diff --git a/src/androidMain/kotlin/com/liftric/job/queue/JobQueue.kt b/src/androidMain/kotlin/com/liftric/job/queue/JobQueue.kt new file mode 100644 index 0000000..4a72fa0 --- /dev/null +++ b/src/androidMain/kotlin/com/liftric/job/queue/JobQueue.kt @@ -0,0 +1,16 @@ +package com.liftric.job.queue + +import android.content.Context +import com.russhwolf.settings.SharedPreferencesSettings +import kotlinx.serialization.modules.SerializersModule + +actual class JobQueue( + context: Context, + serializers: SerializersModule = SerializersModule {}, + configuration: Queue.Configuration = Queue.DefaultConfiguration, + store: JsonStorage = SettingsStorage(SharedPreferencesSettings.Factory(context).create("com.liftric.persisted.queue")) +) : AbstractJobQueue( + serializers, + configuration, + store +) diff --git a/src/androidMain/kotlin/com/liftric/persisted/queue/UUID.kt b/src/androidMain/kotlin/com/liftric/job/queue/UUID.kt similarity index 73% rename from src/androidMain/kotlin/com/liftric/persisted/queue/UUID.kt rename to src/androidMain/kotlin/com/liftric/job/queue/UUID.kt index d1be5fa..f813735 100644 --- a/src/androidMain/kotlin/com/liftric/persisted/queue/UUID.kt +++ b/src/androidMain/kotlin/com/liftric/job/queue/UUID.kt @@ -1,16 +1,17 @@ -package com.liftric.persisted.queue +package com.liftric.job.queue import kotlinx.serialization.KSerializer import kotlinx.serialization.descriptors.PrimitiveKind import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor import kotlinx.serialization.encoding.Decoder import kotlinx.serialization.encoding.Encoder -import java.util.UUID -import kotlin.reflect.KClass -actual typealias UUID = UUID +actual typealias UUID = java.util.UUID -actual fun KClass.instance(): UUID = UUID.randomUUID() +internal actual object UUIDFactory { + actual fun create(): UUID = UUID.randomUUID() + actual fun fromString(string: String): UUID = UUID.fromString(string) +} actual object UUIDSerializer: KSerializer { override val descriptor = PrimitiveSerialDescriptor("UUID", PrimitiveKind.STRING) diff --git a/src/androidMain/kotlin/com/liftric/persisted/queue/Preferences.kt b/src/androidMain/kotlin/com/liftric/persisted/queue/Preferences.kt deleted file mode 100644 index 02c2822..0000000 --- a/src/androidMain/kotlin/com/liftric/persisted/queue/Preferences.kt +++ /dev/null @@ -1,6 +0,0 @@ -package com.liftric.persisted.queue - -import android.content.Context -import com.russhwolf.settings.SharedPreferencesSettings - -actual class Preferences(context: Context): AbstractPreferences(SharedPreferencesSettings.Factory(context).create("com.liftric.job.scheduler")) diff --git a/src/androidTest/kotlin/com/liftric/job/queue/JobQueueTests.kt b/src/androidTest/kotlin/com/liftric/job/queue/JobQueueTests.kt new file mode 100644 index 0000000..cc1776d --- /dev/null +++ b/src/androidTest/kotlin/com/liftric/job/queue/JobQueueTests.kt @@ -0,0 +1,18 @@ +package com.liftric.job.queue + +import androidx.test.ext.junit.runners.AndroidJUnit4 +import androidx.test.platform.app.InstrumentationRegistry +import kotlinx.serialization.modules.SerializersModule +import kotlinx.serialization.modules.polymorphic +import org.junit.runner.RunWith + +@RunWith(AndroidJUnit4::class) +actual class JobQueueTests: AbstractJobQueueTests(JobQueue( + context = InstrumentationRegistry.getInstrumentation().targetContext, + serializers = SerializersModule { + polymorphic(Task::class) { + subclass(TestTask::class, TestTask.serializer()) + } + }, + store = MapStorage() +)) diff --git a/src/commonMain/kotlin/com/liftric/job/queue/Job.kt b/src/commonMain/kotlin/com/liftric/job/queue/Job.kt new file mode 100644 index 0000000..3b826b7 --- /dev/null +++ b/src/commonMain/kotlin/com/liftric/job/queue/Job.kt @@ -0,0 +1,65 @@ +package com.liftric.job.queue + +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.withTimeout +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant +import kotlinx.serialization.Serializable +import kotlinx.serialization.Transient + +internal class JobDelegate { + val onEvent = MutableSharedFlow(extraBufferCapacity = Int.MAX_VALUE) +} + +@Serializable +data class Job( + @Serializable(with = UUIDSerializer::class) + override val id: UUID, + override val info: JobInfo, + override val task: Task, + override val startTime: Instant +): JobContext { + @Transient internal var delegate: JobDelegate? = null + + constructor(task: Task, info: JobInfo) : this (UUIDFactory.create(), info, task, Clock.System.now()) + + private var canRepeat: Boolean = true + + suspend fun run(): JobEvent { + return withTimeout(info.timeout) { + val event = try { + info.rules.forEach { it.willRun(this@Job) } + + task.body() + + JobEvent.DidSucceed(this@Job) + } catch (e: CancellationException) { + throw e + } catch (e: Throwable) { + canRepeat = task.onRepeat(e) + JobEvent.DidFail(this@Job, e) + } + + try { + info.rules.forEach { it.willRemove(this@Job, event) } + + event + } catch (e: CancellationException) { + throw e + } catch (e: Throwable) { + JobEvent.DidFailOnRemove(this@Job, e) + } + } + } + + override suspend fun cancel() { + delegate?.onEvent?.emit(JobEvent.DidCancel(this@Job)) + } + + override suspend fun repeat(id: UUID, info: JobInfo, task: Task, startTime: Instant) { + if (canRepeat) { + delegate?.onEvent?.emit(JobEvent.ShouldRepeat(Job(id, info, task, startTime))) + } + } +} diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobContext.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobContext.kt new file mode 100644 index 0000000..537d691 --- /dev/null +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobContext.kt @@ -0,0 +1,15 @@ +package com.liftric.job.queue + +import kotlinx.datetime.Instant + +interface JobContext: JobData { + suspend fun cancel() + suspend fun repeat(id: UUID = this.id, info: JobInfo = this.info, task: Task = this.task, startTime: Instant = this.startTime) +} + +interface JobData { + val id: UUID + val info: JobInfo + val task: Task + val startTime: Instant +} diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobEvent.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobEvent.kt new file mode 100644 index 0000000..e1dc6ea --- /dev/null +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobEvent.kt @@ -0,0 +1,13 @@ +package com.liftric.job.queue + +sealed class JobEvent { + data class DidSchedule(val job: JobContext): JobEvent() + data class DidScheduleRepeat(val job: JobContext): JobEvent() + data class WillRun(val job: JobContext): JobEvent() + data class DidThrowOnSchedule(val error: Throwable): JobEvent() + data class DidSucceed(val job: JobContext): JobEvent() + data class DidFail(val job: JobContext, val error: Throwable): JobEvent() + data class ShouldRepeat(val job: Job): JobEvent() + data class DidCancel(val job: JobContext): JobEvent() + data class DidFailOnRemove(val job: JobContext, val error: Throwable): JobEvent() +} diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/JobInfo.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobInfo.kt similarity index 72% rename from src/commonMain/kotlin/com/liftric/persisted/queue/JobInfo.kt rename to src/commonMain/kotlin/com/liftric/job/queue/JobInfo.kt index baa6251..c19e15c 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/JobInfo.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobInfo.kt @@ -1,7 +1,9 @@ -package com.liftric.persisted.queue +package com.liftric.job.queue import kotlin.time.Duration +import kotlinx.serialization.Serializable +@Serializable data class JobInfo( var tag: String? = null, var timeout: Duration = Duration.INFINITE, diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt new file mode 100644 index 0000000..e99f351 --- /dev/null +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt @@ -0,0 +1,204 @@ +package com.liftric.job.queue + +import com.liftric.job.queue.rules.* +import kotlinx.atomicfu.atomic +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.sync.withLock +import kotlinx.datetime.Clock +import kotlinx.datetime.serializers.InstantIso8601Serializer +import kotlinx.serialization.decodeFromString +import kotlinx.serialization.encodeToString +import kotlinx.serialization.json.Json +import kotlinx.serialization.modules.SerializersModule +import kotlinx.serialization.modules.contextual +import kotlinx.serialization.modules.plus +import kotlinx.serialization.modules.polymorphic +import kotlin.time.Duration.Companion.seconds + +expect class JobQueue: AbstractJobQueue +abstract class AbstractJobQueue( + serializers: SerializersModule, + final override val configuration: Queue.Configuration, + private val store: JsonStorage +): Queue { + private val module = SerializersModule { + contextual(InstantIso8601Serializer) + polymorphic(JobRule::class) { + subclass(DelayRule::class, DelayRule.serializer()) + subclass(PeriodicRule::class, PeriodicRule.serializer()) + subclass(RetryRule::class, RetryRule.serializer()) + subclass(TimeoutRule::class, TimeoutRule.serializer()) + subclass(UniqueRule::class, UniqueRule.serializer()) + subclass(PersistenceRule::class, PersistenceRule.serializer()) + } + } + private val format = Json { serializersModule = module + serializers } + + val listener = MutableSharedFlow(extraBufferCapacity = Int.MAX_VALUE) + + /** + * Scheduled jobs + */ + private val running = atomic(mutableMapOf()) + private val queue = atomic(mutableListOf()) + + override val jobs: List + get() = queue.value + override val numberOfJobs: Int + get() = jobs.count() + + /** + * Semaphore to limit concurrency + */ + private val lock = Semaphore(configuration.maxConcurrency, 0) + + /** + * Mutex to suspend queue operations during cancellation + */ + private val isCancelling = Mutex() + + private var scheduler: kotlinx.coroutines.Job? = null + + init { + if (configuration.startsAutomatically) { + start() + } + } + + suspend fun schedule(task: () -> Task, configure: JobInfo.() -> JobInfo = { JobInfo() }) { + schedule(task(), configure) + } + + suspend fun schedule(data: Data, task: (Data) -> DataTask, configure: JobInfo.() -> JobInfo = { JobInfo() }) { + schedule(task(data), configure) + } + + suspend fun schedule(task: Task, configure: JobInfo.() -> JobInfo = { JobInfo() }) { + val info = configure(JobInfo()).apply { + rules.forEach { it.mutating(this) } + } + + val job = Job(task, info) + + schedule(job).apply { + listener.emit(JobEvent.DidSchedule(job)) + } + } + + private suspend fun schedule(job: Job) = try { + job.info.rules.forEach { + it.willSchedule(this, job) + } + + if (job.info.shouldPersist) { + store.set(job.id.toString(), format.encodeToString(job)) + } + + queue.value = queue.value.plus(listOf(job)).sortedBy { it.startTime }.toMutableList() + } catch (e: Throwable) { + listener.emit(JobEvent.DidThrowOnSchedule(e)) + } + + private val delegate = JobDelegate() + + /** + * Starts enqueuing scheduled jobs + */ + fun start() { + if (scheduler != null) return + scheduler = CoroutineScope(Dispatchers.Default).launch { + launch { + delegate.onEvent.collect { event -> + when (event) { + is JobEvent.DidCancel -> { + cancel(event.job.id) + } + is JobEvent.ShouldRepeat -> { + schedule(event.job).apply { + listener.emit(JobEvent.DidScheduleRepeat(event.job)) + } + } + else -> listener.emit(event) + } + } + } + restore() + while (isActive) { + if (isCancelling.isLocked) continue + if (queue.value.isEmpty()) continue + if (queue.value.first().startTime.minus(Clock.System.now()) > 0.seconds) continue + lock.acquire() + val job = queue.value.removeFirst() + job.delegate = delegate + running.value[job.id] = configuration.scope.launch { + try { + listener.emit(JobEvent.WillRun(job)) + val result = job.run() + listener.emit(result) + } catch (e: CancellationException) { + listener.emit(JobEvent.DidCancel(job)) + } finally { + if (job.info.shouldPersist) { + store.remove(job.id.toString()) + } + running.value[job.id]?.cancel() + running.value.remove(job.id) + lock.release() + } + } + } + } + } + + /** + * Stops enqueuing scheduled jobs + */ + fun stop() { + scheduler?.cancel() + scheduler = null + } + + /** + * Removes all scheduled jobs + */ + suspend fun clear() { + clear(true) + } + + internal suspend fun clear(clearStore: Boolean = true) { + isCancelling.withLock { + queue.value.clear() + running.value.clear() + configuration.scope.coroutineContext.cancelChildren() + if (clearStore) { store.clear() } + } + } + + /** + * Cancels jobs + * @param id Unique identifier of the job + */ + suspend fun cancel(id: UUID) { + isCancelling.withLock { + queue.value.firstOrNull { it.id == id }?.let { job -> + queue.value.remove(job) + listener.emit(JobEvent.DidCancel(job)) + } ?: running.value[id]?.cancel() + } + } + + /** + * Restores all persisted jobs. Ensures job not already in queue. + */ + internal suspend fun restore() { + store.keys.forEach { key -> + val job: Job = format.decodeFromString(store.get(key)) + if (queue.value.none { it.id == job.id }) { + schedule(job) + } + } + } +} diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/JobRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt similarity index 90% rename from src/commonMain/kotlin/com/liftric/persisted/queue/JobRule.kt rename to src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt index b1fbafe..5d512ab 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/JobRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt @@ -1,4 +1,4 @@ -package com.liftric.persisted.queue +package com.liftric.job.queue import kotlinx.serialization.Serializable diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JsonStorage.kt b/src/commonMain/kotlin/com/liftric/job/queue/JsonStorage.kt new file mode 100644 index 0000000..727f00c --- /dev/null +++ b/src/commonMain/kotlin/com/liftric/job/queue/JsonStorage.kt @@ -0,0 +1,47 @@ +package com.liftric.job.queue + +import com.russhwolf.settings.Settings +import com.russhwolf.settings.set + +interface JsonStorage { + val keys: Set + fun get(id: String): String + fun set(id: String, json: String) + fun clear() + fun remove(id: String) +} + +class MapStorage: JsonStorage { + private val store = mutableMapOf() + override val keys: Set + get() = store.keys + override fun get(id: String): String { + return store.getValue(id) + } + override fun set(id: String, json: String) { + store[id] = json + } + override fun clear() { + store.clear() + } + override fun remove(id: String) { + store.remove(id) + } +} + +internal class SettingsStorage(private val store: Settings): JsonStorage { + override val keys: Set + get() = store.keys + override fun get(id: String): String { + return store.getString(id, "") + } + override fun set(id: String, json: String) { + store[id] = json + } + override fun clear() { + store.clear() + } + override fun remove(id: String) { + store.remove(id) + } +} diff --git a/src/commonMain/kotlin/com/liftric/job/queue/Queue.kt b/src/commonMain/kotlin/com/liftric/job/queue/Queue.kt new file mode 100644 index 0000000..b13d10a --- /dev/null +++ b/src/commonMain/kotlin/com/liftric/job/queue/Queue.kt @@ -0,0 +1,23 @@ +package com.liftric.job.queue + +import kotlinx.coroutines.* + +interface Queue { + val jobs: List + val numberOfJobs: Int + val configuration: Configuration + + data class Configuration( + val scope: CoroutineScope, + val maxConcurrency: Int, + val startsAutomatically: Boolean + ) + + companion object { + val DefaultConfiguration = Configuration( + scope = CoroutineScope(Dispatchers.Default), + maxConcurrency = 1, + startsAutomatically = false + ) + } +} diff --git a/src/commonMain/kotlin/com/liftric/job/queue/Task.kt b/src/commonMain/kotlin/com/liftric/job/queue/Task.kt new file mode 100644 index 0000000..372ea0a --- /dev/null +++ b/src/commonMain/kotlin/com/liftric/job/queue/Task.kt @@ -0,0 +1,11 @@ +package com.liftric.job.queue + +interface Task { + @Throws(Throwable::class) + suspend fun body() + suspend fun onRepeat(cause: Throwable): Boolean = false +} + +interface DataTask: Task { + val data: Data +} diff --git a/src/commonMain/kotlin/com/liftric/job/queue/UUID.kt b/src/commonMain/kotlin/com/liftric/job/queue/UUID.kt new file mode 100644 index 0000000..4422756 --- /dev/null +++ b/src/commonMain/kotlin/com/liftric/job/queue/UUID.kt @@ -0,0 +1,12 @@ +package com.liftric.job.queue + +import kotlinx.serialization.KSerializer + +expect class UUID + +internal expect object UUIDFactory { + fun create(): UUID + fun fromString(string: String): UUID +} + +expect object UUIDSerializer: KSerializer diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/DelayRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/DelayRule.kt similarity index 71% rename from src/commonMain/kotlin/com/liftric/persisted/queue/rules/DelayRule.kt rename to src/commonMain/kotlin/com/liftric/job/queue/rules/DelayRule.kt index 19a240e..a200305 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/DelayRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/DelayRule.kt @@ -1,15 +1,14 @@ -package com.liftric.persisted.queue.rules +package com.liftric.job.queue.rules -import com.liftric.persisted.queue.* +import com.liftric.job.queue.* import kotlinx.coroutines.delay -import kotlinx.serialization.Serializable import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds +import kotlinx.serialization.Serializable @Serializable data class DelayRule(val duration: Duration = 0.seconds): JobRule() { override suspend fun willRun(context: JobContext) { - context.broadcast(RuleEvent.OnRun(this, "Delaying job=${context.id} by duration=$duration")) delay(duration) } } diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/PeriodicRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/PeriodicRule.kt similarity index 81% rename from src/commonMain/kotlin/com/liftric/persisted/queue/rules/PeriodicRule.kt rename to src/commonMain/kotlin/com/liftric/job/queue/rules/PeriodicRule.kt index 2ec6610..8dc68d1 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/PeriodicRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/PeriodicRule.kt @@ -1,6 +1,6 @@ -package com.liftric.persisted.queue.rules +package com.liftric.job.queue.rules -import com.liftric.persisted.queue.* +import com.liftric.job.queue.* import kotlinx.datetime.Clock import kotlinx.serialization.Serializable import kotlin.time.Duration @@ -9,7 +9,7 @@ import kotlin.time.Duration.Companion.seconds @Serializable data class PeriodicRule(val interval: Duration = 0.seconds): JobRule() { override suspend fun willRemove(context: JobContext, result: JobEvent) { - if (result is JobEvent.DidEnd) { + if (result is JobEvent.DidSucceed) { context.repeat(startTime = Clock.System.now().plus(interval)) } } diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/PersistenceRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/PersistenceRule.kt similarity index 74% rename from src/commonMain/kotlin/com/liftric/persisted/queue/rules/PersistenceRule.kt rename to src/commonMain/kotlin/com/liftric/job/queue/rules/PersistenceRule.kt index 3030829..186a777 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/PersistenceRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/PersistenceRule.kt @@ -1,7 +1,7 @@ -package com.liftric.persisted.queue.rules +package com.liftric.job.queue.rules -import com.liftric.persisted.queue.JobInfo -import com.liftric.persisted.queue.JobRule +import com.liftric.job.queue.JobInfo +import com.liftric.job.queue.JobRule import kotlinx.serialization.Serializable @Serializable diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/RetryRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/RetryRule.kt similarity index 82% rename from src/commonMain/kotlin/com/liftric/persisted/queue/rules/RetryRule.kt rename to src/commonMain/kotlin/com/liftric/job/queue/rules/RetryRule.kt index 7576ba1..8396c44 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/RetryRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/RetryRule.kt @@ -1,6 +1,6 @@ -package com.liftric.persisted.queue.rules +package com.liftric.job.queue.rules -import com.liftric.persisted.queue.* +import com.liftric.job.queue.* import kotlinx.datetime.Clock import kotlinx.serialization.Serializable import kotlin.time.Duration @@ -16,8 +16,7 @@ data class RetryRule(val limit: RetryLimit, val delay: Duration = 0.seconds): Jo } is RetryLimit.Limited -> { if (limit.count > 0) { - val rules = context.info.rules.minus(this).plus(RetryRule(RetryLimit.Limited((limit.count + 1) - 2), delay)) - context.broadcast(RuleEvent.OnRemove(this, "Attempting to retry task=$context")) + val rules = context.info.rules.minus(this).plus(RetryRule(RetryLimit.Limited(limit.count - 1), delay)) context.repeat(info = context.info.copy(rules = rules.toMutableList()), startTime = Clock.System.now().plus(delay)) } } diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/TimeoutRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/TimeoutRule.kt similarity index 64% rename from src/commonMain/kotlin/com/liftric/persisted/queue/rules/TimeoutRule.kt rename to src/commonMain/kotlin/com/liftric/job/queue/rules/TimeoutRule.kt index 6b1c685..96a4bf0 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/TimeoutRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/TimeoutRule.kt @@ -1,9 +1,11 @@ -package com.liftric.persisted.queue.rules +package com.liftric.job.queue.rules -import com.liftric.persisted.queue.JobInfo -import com.liftric.persisted.queue.JobRule +import com.liftric.job.queue.JobInfo +import com.liftric.job.queue.JobRule import kotlin.time.Duration +import kotlinx.serialization.Serializable +@Serializable data class TimeoutRule(val timeout: Duration): JobRule() { override suspend fun mutating(info: JobInfo) { info.timeout = timeout diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/UniqueRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/UniqueRule.kt similarity index 59% rename from src/commonMain/kotlin/com/liftric/persisted/queue/rules/UniqueRule.kt rename to src/commonMain/kotlin/com/liftric/job/queue/rules/UniqueRule.kt index f6bf8e3..31966ec 100644 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/rules/UniqueRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/UniqueRule.kt @@ -1,6 +1,6 @@ -package com.liftric.persisted.queue.rules +package com.liftric.job.queue.rules -import com.liftric.persisted.queue.* +import com.liftric.job.queue.* import kotlinx.serialization.Serializable @Serializable @@ -11,8 +11,11 @@ data class UniqueRule(private val tag: String? = null): JobRule() { override suspend fun willSchedule(queue: Queue, context: JobContext) { for (item in queue.jobs) { - if (item.info.tag == tag || item.id == context.id) { - throw Error("Job with id=${item.id} already exists") + if (item.info.tag == tag) { + throw Throwable("Job with tag=${item.info.tag} already exists") + } + if (item.id == context.id) { + throw Throwable("Job with id=${item.id} already exists") } } } diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/Job.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/Job.kt deleted file mode 100644 index b4ad5fb..0000000 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/Job.kt +++ /dev/null @@ -1,79 +0,0 @@ -package com.liftric.persisted.queue - -import kotlinx.coroutines.* -import kotlinx.datetime.Clock -import kotlinx.datetime.Instant -import kotlin.time.Duration - -class Job( - override val id: UUID, - override val info: JobInfo, - override val task: DataTask<*>, - override val startTime: Instant = Clock.System.now() -): JobContext { - var delegate: JobDelegate? = null - - constructor(task: DataTask<*>, info: JobInfo) : this (UUID::class.instance(), info, task) - - private var cancellable: kotlinx.coroutines.Job? = null - - var isCancelled: Boolean = false - private set - - private var canRepeat: Boolean = false - - internal suspend fun run() { - coroutineScope { - if (isCancelled) return@coroutineScope - cancellable = launch { - val event = try { - info.rules.forEach { it.willRun(this@Job) } - - delegate?.broadcast(JobEvent.WillRun(this@Job)) - - task.body() - - JobEvent.DidEnd(this@Job) - } catch (e: CancellationException) { - JobEvent.DidCancel(this@Job, "Cancelled during run") - } catch (e: Error) { - canRepeat = task.onRepeat(e) - JobEvent.DidFail(this@Job, e) - } - - try { - delegate?.broadcast(event) - - if (isCancelled) return@launch - - info.rules.forEach { it.willRemove(this@Job, event) } - } catch (e: CancellationException) { - delegate?.broadcast(JobEvent.DidCancel(this@Job, "Cancelled after run")) - } catch (e: Error) { - delegate?.broadcast(JobEvent.DidFailOnRemove(this@Job, e)) - } - } - } - } - - override suspend fun cancel() { - if (isCancelled) return - isCancelled = true - cancellable?.cancel(CancellationException("Cancelled during run")) ?: run { - delegate?.broadcast(JobEvent.DidCancel(this@Job, "Cancelled before run")) - } - delegate?.exit() - } - - override suspend fun repeat(id: UUID, info: JobInfo, task: DataTask<*>, startTime: Instant) { - if (canRepeat) { - delegate?.repeat(Job(id, info, task, startTime)) - } else { - delegate?.broadcast(JobEvent.NotAllowedToRepeat(this@Job)) - } - } - - override suspend fun broadcast(event: RuleEvent) { - delegate?.broadcast(event) - } -} diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/JobContext.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/JobContext.kt deleted file mode 100644 index fe806eb..0000000 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/JobContext.kt +++ /dev/null @@ -1,13 +0,0 @@ -package com.liftric.persisted.queue - -import kotlinx.datetime.Instant - -interface JobContext { - val id: UUID - val info: JobInfo - val task: DataTask<*> - val startTime: Instant - suspend fun cancel() - suspend fun repeat(id: UUID = this.id, info: JobInfo = this.info, task: DataTask<*> = this.task, startTime: Instant = this.startTime) - suspend fun broadcast(event: RuleEvent) -} diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/JobDelegate.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/JobDelegate.kt deleted file mode 100644 index b20943f..0000000 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/JobDelegate.kt +++ /dev/null @@ -1,20 +0,0 @@ -package com.liftric.persisted.queue - -class JobDelegate { - var onExit: (suspend () -> Unit)? = null - var onRepeat: (suspend (Job) -> Unit)? = null - var onEvent: (suspend (JobEvent) -> Unit)? = null - - suspend fun broadcast(event: JobEvent) { - onEvent?.invoke(event) - } - - suspend fun exit() { - onExit?.invoke() - } - - suspend fun repeat(job: Job) { - onRepeat?.invoke(job) - } -} - diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/JobEvent.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/JobEvent.kt deleted file mode 100644 index 07737ed..0000000 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/JobEvent.kt +++ /dev/null @@ -1,32 +0,0 @@ -package com.liftric.persisted.queue - -sealed class JobEvent { - data class DidSchedule(val job: JobContext): JobEvent() - data class DidScheduleRepeat(val job: JobContext): JobEvent() - data class WillRun(val job: JobContext): JobEvent() - data class DidThrowOnRepeat(val error: Error): JobEvent() - data class DidThrowOnSchedule(val error: Error): JobEvent() - data class DidEnd(val job: JobContext): JobEvent() - data class DidFail(val job: JobContext, val error: Error): JobEvent() - data class DidCancel(val job: JobContext, val message: String): JobEvent() - data class DidFailOnRemove(val job: JobContext, val error: Error): JobEvent() - data class NotAllowedToRepeat(val job: JobContext): JobEvent() -} - -sealed class RuleEvent(open val rule: String, open val message: String): JobEvent() { - data class OnMutate(override val rule: String, override val message: String): RuleEvent(rule, message) { - constructor(rule: JobRule, message: String) : this(rule::class.simpleName!!, message) - } - - data class OnSchedule(override val rule: String, override val message: String): RuleEvent(rule, message) { - constructor(rule: JobRule, message: String) : this(rule::class.simpleName!!, message) - } - - data class OnRun(override val rule: String, override val message: String): RuleEvent(rule, message) { - constructor(rule: JobRule, message: String) : this(rule::class.simpleName!!, message) - } - - data class OnRemove(override val rule: String, override val message: String): RuleEvent(rule, message) { - constructor(rule: JobRule, message: String) : this(rule::class.simpleName!!, message) - } -} diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/JobScheduler.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/JobScheduler.kt deleted file mode 100644 index a831fcd..0000000 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/JobScheduler.kt +++ /dev/null @@ -1,63 +0,0 @@ -package com.liftric.persisted.queue - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.flow.* -import kotlinx.serialization.InternalSerializationApi -import kotlinx.serialization.serializerOrNull - -class JobScheduler( - configuration: Queue.Configuration? = null, - private val serializer: JobSerializer? = null -) { - val queue = JobQueue(configuration ?: Queue.Configuration(CoroutineScope(Dispatchers.Default), 1)) - val onEvent = MutableSharedFlow(extraBufferCapacity = Int.MAX_VALUE) - - private val delegate = JobDelegate() - - init { - delegate.onExit = { /* Do something */ } - delegate.onRepeat = { repeat(it) } - delegate.onEvent = { onEvent.emit(it) } - } - - suspend fun schedule(task: () -> DataTask<*>, configure: JobInfo.() -> JobInfo = { JobInfo() }) { - schedule(task(), configure) - } - - @OptIn(InternalSerializationApi::class) - suspend fun schedule(task: DataTask<*>, configure: JobInfo.() -> JobInfo = { JobInfo() }) = try { - val info = configure(JobInfo()).apply { - rules.forEach { it.mutating(this) } - } - - if (task.data!!::class.serializerOrNull() == null) throw Exception("Data must be serializable") - - val job = Job(task, info) - job.delegate = delegate - - job.info.rules.forEach { - it.willSchedule(queue, job) - } - - queue.add(job).apply { - onEvent.emit(JobEvent.DidSchedule(job)) - } - } catch (error: Error) { - onEvent.emit(JobEvent.DidThrowOnSchedule(error)) - } - - private suspend fun repeat(job: Job) = try { - job.delegate = delegate - - job.info.rules.forEach { - it.willSchedule(queue, job) - } - - onEvent.emit(JobEvent.DidScheduleRepeat(job)) - - queue.add(job) - } catch (error: Error) { - onEvent.emit(JobEvent.DidThrowOnRepeat(error)) - } -} diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/JobSerializer.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/JobSerializer.kt deleted file mode 100644 index b1773c7..0000000 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/JobSerializer.kt +++ /dev/null @@ -1,8 +0,0 @@ -package com.liftric.persisted.queue - -interface JobSerializer { - val tag: String - fun store(job: Job) - fun retrieve(id: String): Job? - fun retrieveAll(): List -} diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/Preferences.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/Preferences.kt deleted file mode 100644 index 9dc8184..0000000 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/Preferences.kt +++ /dev/null @@ -1,25 +0,0 @@ -package com.liftric.persisted.queue - -import com.russhwolf.settings.Settings -import com.russhwolf.settings.get -import com.russhwolf.settings.set -import kotlinx.serialization.decodeFromString -import kotlinx.serialization.encodeToString -import kotlinx.serialization.json.Json - -expect class Preferences: AbstractPreferences -abstract class AbstractPreferences(private val settings: Settings): JobSerializer { - override val tag: String = "" - override fun retrieve(id: String): Job? { - val jsonString = settings.get(id) ?: return null - return Json.decodeFromString(jsonString) - } - - override fun retrieveAll(): List { - return settings.keys.mapNotNull { retrieve(it) } - } - - override fun store(job: Job) { - settings[job.id.toString()] = Json.encodeToString(job) - } -} diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/Queue.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/Queue.kt deleted file mode 100644 index 94efd79..0000000 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/Queue.kt +++ /dev/null @@ -1,100 +0,0 @@ -package com.liftric.persisted.queue - -import kotlinx.atomicfu.atomic -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.Semaphore -import kotlinx.coroutines.sync.withLock -import kotlinx.coroutines.sync.withPermit -import kotlinx.datetime.Clock -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.EmptyCoroutineContext -import kotlin.coroutines.coroutineContext - -interface Queue { - val jobs: List - val configuration: Configuration - - data class Configuration( - val scope: CoroutineScope, - val maxConcurrency: Int - ) -} - -class JobQueue(override val configuration: Queue.Configuration): Queue { - private val cancellationQueue = MutableSharedFlow(extraBufferCapacity = Int.MAX_VALUE) - private val queue = atomic(mutableListOf()) - private val lock = Semaphore(configuration.maxConcurrency, 0) - private val isCancelling = Mutex(false) - override val jobs: List - get() = queue.value - - init { - cancellationQueue.onEach { it.join() } - .flowOn(Dispatchers.Default) - .launchIn(configuration.scope) - } - - internal fun add(job: Job) { - queue.value = queue.value.plus(listOf(job)).sortedBy { it.startTime }.toMutableList() - } - - suspend fun start() { - while (configuration.scope.isActive) { - if (queue.value.isEmpty()) break - if (isCancelling.isLocked) break - if (lock.availablePermits < 1) break - val job = queue.value.first() - if (job.isCancelled) { - queue.value.remove(job) - } else if (job.startTime <= Clock.System.now()) { - lock.withPermit { - withTimeout(job.info.timeout) { - job.run() - queue.value.remove(job) - } - } - } - } - } - - suspend fun cancel() { - submitCancellation(coroutineContext) { - isCancelling.withLock { - configuration.scope.coroutineContext.cancelChildren() - queue.value.clear() - } - } - } - - suspend fun cancel(id: UUID) { - submitCancellation(coroutineContext) { - isCancelling.withLock { - queue.value.firstOrNull { it.id == id }?.let { job -> - job.cancel() - queue.value.remove(job) - } - } - } - } - - suspend fun cancel(tag: String) { - submitCancellation(coroutineContext) { - isCancelling.withLock { - queue.value.firstOrNull { it.info.tag == tag }?.let { job -> - job.cancel() - queue.value.remove(job) - } - } - } - } - - private fun submitCancellation( - context: CoroutineContext = EmptyCoroutineContext, - block: suspend CoroutineScope.() -> Unit - ) { - val job = configuration.scope.launch(context, CoroutineStart.LAZY, block) - cancellationQueue.tryEmit(job) - } -} diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/Task.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/Task.kt deleted file mode 100644 index aeb5208..0000000 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/Task.kt +++ /dev/null @@ -1,13 +0,0 @@ -package com.liftric.persisted.queue - -import kotlinx.serialization.Serializable - -@Serializable -abstract class DataTask(@Serializable val data: Data) { - @Throws(Throwable::class) - abstract suspend fun body() - open suspend fun onRepeat(cause: Throwable): Boolean = false -} - -@Serializable -abstract class Task: DataTask(Unit) diff --git a/src/commonMain/kotlin/com/liftric/persisted/queue/UUID.kt b/src/commonMain/kotlin/com/liftric/persisted/queue/UUID.kt deleted file mode 100644 index c8f6a31..0000000 --- a/src/commonMain/kotlin/com/liftric/persisted/queue/UUID.kt +++ /dev/null @@ -1,10 +0,0 @@ -package com.liftric.persisted.queue - -import kotlinx.serialization.KSerializer -import kotlin.reflect.KClass - -expect class UUID - -expect fun KClass.instance(): UUID - -expect object UUIDSerializer: KSerializer diff --git a/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt b/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt new file mode 100644 index 0000000..8a6791e --- /dev/null +++ b/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt @@ -0,0 +1,171 @@ +package com.liftric.job.queue + +import com.liftric.job.queue.rules.* +import kotlinx.coroutines.* +import kotlin.test.* +import kotlin.time.Duration.Companion.seconds + +expect class JobQueueTests: AbstractJobQueueTests +abstract class AbstractJobQueueTests(private val queue: JobQueue) { + @AfterTest + fun tearDown() = runBlocking { + queue.stop() + queue.clear() + } + + @Test + fun testSchedule() { + runBlocking { + val id = UUIDFactory.create().toString() + val job = async { + queue.listener.collect { + println(it) + } + } + + queue.schedule(TestData(id), ::TestTask) { + delay(1.seconds) + unique(id) + } + + queue.schedule(TestTask(TestData(id))) { + unique(id) + } + + assertEquals(1, queue.numberOfJobs) + + queue.start() + + delay(2000L) + + assertEquals(0, queue.numberOfJobs) + + job.cancel() + + } + } + + @Test + fun testRetry() = runBlocking { + var count = 0 + val job = launch { + queue.listener.collect { + println(it) + if (it is JobEvent.DidScheduleRepeat) { + count += 1 + } + } + } + + delay(1000L) + + queue.schedule(TestErrorTask()) { + retry(RetryLimit.Limited(3), delay = 1.seconds) + } + + queue.start() + delay(15000L) + job.cancel() + assertEquals(3, count) + } + + @Test + fun testCancelDuringRun() { + runBlocking { + val listener = launch { + queue.listener.collect { + println(it) + if (it is JobEvent.DidSucceed || it is JobEvent.DidFail) fail("Continued after run") + if (it is JobEvent.WillRun) { + queue.cancel(it.job.id) + } + if (it is JobEvent.DidCancel) { + assertTrue(queue.numberOfJobs == 0) + } + } + } + + queue.start() + + delay(1000L) + + queue.schedule(::LongRunningTask) + + delay(10000L) + + listener.cancel() + } + } + + @Test + fun testCancelByIdBeforeEnqueue() { + runBlocking { + val completable = CompletableDeferred() + + launch { + queue.listener.collect { + println(it) + if (it is JobEvent.DidSucceed || it is JobEvent.DidFail) fail("Continued after run") + if (it is JobEvent.DidSchedule) { + completable.complete(it.job.id) + } + if (it is JobEvent.DidCancel) { + assertTrue(queue.numberOfJobs == 0) + cancel() + } + } + } + + delay(1000L) + + queue.schedule(::LongRunningTask) { + delay(2.seconds) + } + + queue.cancel(completable.await()) + } + } + + @Test + fun testCancelByIdAfterEnqueue() { + runBlocking { + launch { + queue.listener.collect { + println(it) + if (it is JobEvent.DidSchedule) { + delay(3000L) + queue.cancel(it.job.id) + } + if (it is JobEvent.DidCancel) { + cancel() + } + } + } + + delay(1000L) + + queue.start() + + queue.schedule(::LongRunningTask) { + delay(10.seconds) + } + } + } + + @Test + fun testPersist() = runBlocking { + queue.schedule(TestData(UUIDFactory.create().toString()), ::TestTask) { + persist() + } + + assertEquals(1, queue.numberOfJobs) + + queue.clear(clearStore = false) + + assertEquals(0, queue.numberOfJobs) + + queue.restore() + + assertEquals(1, queue.numberOfJobs) + } +} diff --git a/src/commonTest/kotlin/com/liftric/job/queue/TestTask.kt b/src/commonTest/kotlin/com/liftric/job/queue/TestTask.kt new file mode 100644 index 0000000..fdfbc06 --- /dev/null +++ b/src/commonTest/kotlin/com/liftric/job/queue/TestTask.kt @@ -0,0 +1,24 @@ +package com.liftric.job.queue + +import kotlinx.coroutines.delay +import kotlinx.serialization.Serializable +import kotlin.time.Duration.Companion.seconds + +@Serializable +data class TestData(val testResultId: String) + +@Serializable +data class TestTask(override val data: TestData): DataTask { + override suspend fun body() { } +} + +@Serializable +class TestErrorTask: Task { + override suspend fun body() { throw Error("Oh shoot!") } + override suspend fun onRepeat(cause: Throwable): Boolean = cause is Error +} + +@Serializable +class LongRunningTask: Task { + override suspend fun body() { delay(15.seconds) } +} diff --git a/src/commonTest/kotlin/com/liftric/persisted/queue/JobSchedulerTests.kt b/src/commonTest/kotlin/com/liftric/persisted/queue/JobSchedulerTests.kt deleted file mode 100644 index ddeb48a..0000000 --- a/src/commonTest/kotlin/com/liftric/persisted/queue/JobSchedulerTests.kt +++ /dev/null @@ -1,148 +0,0 @@ -package com.liftric.persisted.queue - -import com.liftric.persisted.queue.rules.* -import kotlinx.coroutines.* -import kotlin.test.* -import kotlin.time.Duration.Companion.seconds - -class JobSchedulerTests { - @Test - fun testSchedule() = runBlocking { - val scheduler = JobScheduler() - val id = UUID::class.instance().toString() - val job = async { - scheduler.onEvent.collect { - println(it) - } - } - - scheduler.schedule(TestTask(TestData(id))) { - delay(1.seconds) - unique(id) - } - - scheduler.schedule(TestTask(TestData(id))) { - unique(id) - } - - assertEquals(1, scheduler.queue.jobs.count()) - - scheduler.queue.start() - - delay(2000L) - - assertEquals(0, scheduler.queue.jobs.count()) - - job.cancel() - } - - @Test - fun testRetry() = runBlocking { - val scheduler = JobScheduler() - - var count = 0 - val job = launch { - scheduler.onEvent.collect { - println(it) - if (it is JobEvent.DidScheduleRepeat) { - count += 1 - } - } - } - - scheduler.schedule(TestErrorTask()) { - retry(RetryLimit.Limited(3), delay = 1.seconds) - } - - scheduler.queue.start() - delay(10000L) - job.cancel() - assertEquals(3, count) - } - - @Test - fun testCancelDuringRun() { - val scheduler = JobScheduler() - - runBlocking { - scheduler.schedule(LongRunningTask()) { - delay(10.seconds) - } - - launch { - scheduler.onEvent.collect { - println(it) - if (it is JobEvent.DidEnd || it is JobEvent.DidFail) fail("Continued after run") - if (it is JobEvent.WillRun) { - scheduler.queue.cancel(it.job.id) - } - if (it is JobEvent.DidCancel) { - assertTrue(scheduler.queue.jobs.isEmpty()) - cancel() - } - } - } - - scheduler.queue.start() - } - } - - @Test - fun testCancelByIdBeforeEnqueue() { - val scheduler = JobScheduler() - - runBlocking { - val completable = CompletableDeferred() - - launch { - scheduler.onEvent.collect { - println(it) - if (it is JobEvent.DidEnd || it is JobEvent.DidFail) fail("Continued after run") - if (it is JobEvent.DidSchedule) { - completable.complete(it.job.id) - } - if (it is JobEvent.DidCancel) { - assertTrue(scheduler.queue.jobs.isEmpty()) - cancel() - } - } - } - - delay(1000L) - - scheduler.schedule(::LongRunningTask) { - delay(2.seconds) - } - - scheduler.queue.cancel(completable.await()) - } - } - - @Test - fun testCancelByIdAfterEnqueue() { - val scheduler = JobScheduler() - - runBlocking { - launch { - scheduler.onEvent.collect { - println(it) - if (it is JobEvent.DidSchedule) { - delay(3000L) - scheduler.queue.cancel(it.job.id) - } - if (it is JobEvent.DidCancel) { - cancel() - } - } - } - - delay(1000L) - - scheduler.queue.start() - - scheduler.schedule(::LongRunningTask) { - delay(10.seconds) - } - } - } -} diff --git a/src/commonTest/kotlin/com/liftric/persisted/queue/TestTask.kt b/src/commonTest/kotlin/com/liftric/persisted/queue/TestTask.kt deleted file mode 100644 index f697853..0000000 --- a/src/commonTest/kotlin/com/liftric/persisted/queue/TestTask.kt +++ /dev/null @@ -1,20 +0,0 @@ -package com.liftric.persisted.queue - -import kotlinx.coroutines.delay -import kotlinx.serialization.Serializable - -@Serializable -data class TestData(val testResultId: String) - -class TestTask(data: TestData): DataTask(data) { - override suspend fun body() { } -} - -class TestErrorTask: Task() { - override suspend fun body() { throw Error("Oh shoot!") } - override suspend fun onRepeat(cause: Throwable): Boolean = cause is Error -} - -class LongRunningTask: Task() { - override suspend fun body() { delay(10000L) } -} diff --git a/src/iosMain/kotlin/com/liftric/job/queue/JobQueue.kt b/src/iosMain/kotlin/com/liftric/job/queue/JobQueue.kt new file mode 100644 index 0000000..ab1aa64 --- /dev/null +++ b/src/iosMain/kotlin/com/liftric/job/queue/JobQueue.kt @@ -0,0 +1,15 @@ +package com.liftric.job.queue + +import com.russhwolf.settings.NSUserDefaultsSettings +import kotlinx.serialization.modules.SerializersModule +import platform.Foundation.NSUserDefaults + +actual class JobQueue( + serializers: SerializersModule = SerializersModule {}, + configuration: Queue.Configuration = Queue.DefaultConfiguration, + store: JsonStorage = SettingsStorage(NSUserDefaultsSettings(NSUserDefaults("com.liftric.persisted.queue"))) +) : AbstractJobQueue( + serializers, + configuration, + store +) diff --git a/src/iosMain/kotlin/com/liftric/persisted/queue/UUID.kt b/src/iosMain/kotlin/com/liftric/job/queue/UUID.kt similarity index 77% rename from src/iosMain/kotlin/com/liftric/persisted/queue/UUID.kt rename to src/iosMain/kotlin/com/liftric/job/queue/UUID.kt index fe555fe..a1f43a6 100644 --- a/src/iosMain/kotlin/com/liftric/persisted/queue/UUID.kt +++ b/src/iosMain/kotlin/com/liftric/job/queue/UUID.kt @@ -1,18 +1,18 @@ -package com.liftric.persisted.queue +package com.liftric.job.queue import kotlinx.serialization.KSerializer import kotlinx.serialization.descriptors.PrimitiveKind import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor import kotlinx.serialization.encoding.Decoder import kotlinx.serialization.encoding.Encoder -import platform.Foundation.NSObjectHashCallBacks import platform.Foundation.NSUUID -import platform.darwin.NSObject -import kotlin.reflect.KClass actual typealias UUID = NSUUID -actual fun KClass.instance(): UUID = NSUUID() +internal actual object UUIDFactory { + actual fun create(): UUID = NSUUID() + actual fun fromString(string: String): UUID = UUID(string) +} actual object UUIDSerializer: KSerializer { override val descriptor = PrimitiveSerialDescriptor("UUID", PrimitiveKind.STRING) diff --git a/src/iosMain/kotlin/com/liftric/persisted/queue/Preferences.kt b/src/iosMain/kotlin/com/liftric/persisted/queue/Preferences.kt deleted file mode 100644 index 3b233a1..0000000 --- a/src/iosMain/kotlin/com/liftric/persisted/queue/Preferences.kt +++ /dev/null @@ -1,6 +0,0 @@ -package com.liftric.persisted.queue - -import com.russhwolf.settings.NSUserDefaultsSettings -import platform.Foundation.NSUserDefaults - -actual class Preferences: AbstractPreferences(NSUserDefaultsSettings(NSUserDefaults(suiteName = "com.liftric.job.scheduler"))) diff --git a/src/iosTest/kotlin/com/liftric/job/queue/JobQueueTests.kt b/src/iosTest/kotlin/com/liftric/job/queue/JobQueueTests.kt new file mode 100644 index 0000000..0891133 --- /dev/null +++ b/src/iosTest/kotlin/com/liftric/job/queue/JobQueueTests.kt @@ -0,0 +1,13 @@ +package com.liftric.job.queue + +import kotlinx.serialization.modules.SerializersModule +import kotlinx.serialization.modules.polymorphic + +actual class JobQueueTests: AbstractJobQueueTests(JobQueue( + serializers = SerializersModule { + polymorphic(Task::class) { + subclass(TestTask::class, TestTask.serializer()) + } + }, + store = MapStorage() +)) diff --git a/src/main/AndroidManifest.xml b/src/main/AndroidManifest.xml deleted file mode 100644 index 1a7c120..0000000 --- a/src/main/AndroidManifest.xml +++ /dev/null @@ -1 +0,0 @@ -