Skip to content

Commit

Permalink
feat: Add runConcurrently parameter to allow or disallow parallel exe…
Browse files Browse the repository at this point in the history
…cution

Signed-off-by: starry-shivam <starry@krsh.dev>
  • Loading branch information
starry-shivam committed Jun 21, 2024
1 parent 8e779eb commit cbab848
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import java.util.concurrent.ConcurrentHashMap

/**
* An executor that executes jobs using coroutines.
*/
class CoroutineExecutor : Executor {

// A map of currently running jobs.
private val runningJobs = ConcurrentHashMap<String, Job>()

/**
* Executes the given job.
*
Expand All @@ -37,16 +41,23 @@ class CoroutineExecutor : Executor {
override suspend fun execute(
job: Job, onSuccess: () -> Unit, onError: (Exception) -> Unit
) {
// If the job is not allowed to run concurrently and a job with the
// same ID is already running, return.
if (!job.runConcurrently && runningJobs.containsKey(job.jobId)) {
return
}

CoroutineScope(job.dispatcher).launch {
// Add the job to the running jobs map.
runningJobs[job.jobId] = job
try {
job.callback()
withContext(Dispatchers.Default) {
onSuccess()
}
withContext(Dispatchers.Default) { onSuccess() }
} catch (exc: Exception) {
withContext(Dispatchers.Default) {
onError(exc)
}
withContext(Dispatchers.Default) { onError(exc) }
} finally {
// Remove the job from the running jobs map.
runningJobs.remove(job.jobId)
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/main/kotlin/dev/starry/ktscheduler/job/Job.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import java.time.ZonedDateTime
* @property jobId A unique identifier for the job.
* @property trigger The trigger that determines when the job should run.
* @property nextRunTime The next time the job should run.
* @property runConcurrently Whether to run multiple instances of this job concurrently.
* @property dispatcher The dispatcher to run the job on.
* @property callback The callback function to run when the job is triggered.
*/
Expand All @@ -48,8 +49,15 @@ data class Job(
*/
val nextRunTime: ZonedDateTime,

/**
* Whether to run multiple instances of this job concurrently.
* Default is true.
*/
val runConcurrently: Boolean = true,

/**
* The dispatcher to run the job on.
* Default is [Dispatchers.Default].
*/
val dispatcher: CoroutineDispatcher = Dispatchers.Default,

Expand Down
73 changes: 56 additions & 17 deletions src/test/kotlin/dev/starry/ktscheduler/CoroutineExecutorTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import dev.starry.ktscheduler.triggers.OneTimeTrigger
import junit.framework.TestCase.assertTrue
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.test.TestCoroutineScheduler
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.resetMain
import kotlinx.coroutines.test.runTest
Expand Down Expand Up @@ -54,41 +56,78 @@ class CoroutineExecutorTest {

@Test
fun testExecuteSuccess(): Unit = runTest {
val job = Job(
jobId = "testJob1",
trigger = trigger,
nextRunTime = ZonedDateTime.now(),
dispatcher = UnconfinedTestDispatcher(testScheduler)
) { /* Do nothing */ }

val job = createTestJob(scheduler = testScheduler) { }
var onSuccessCalled = false
val onSuccess: () -> Unit = { onSuccessCalled = true }
val onError: (Throwable) -> Unit = { fail("onError should not be called") }

executor.execute(job, onSuccess, onError)
Thread.sleep(100)
delay(10)
assertTrue(onSuccessCalled)
}

@Test
fun testExecuteError(): Unit = runTest {
val job = Job(
jobId = "testJob2",
trigger = trigger,
nextRunTime = ZonedDateTime.now(),
dispatcher = UnconfinedTestDispatcher(testScheduler),
callback = { throw IllegalArgumentException("Error") },
)
val job = createTestJob(scheduler = testScheduler) { throw IllegalArgumentException("Error") }

val onSuccess: () -> Unit = { fail("onSuccess should not be called") }
var exception: Throwable? = null
val onError: (Throwable) -> Unit = { exception = it }

executor.execute(job, onSuccess, onError)
Thread.sleep(100)

delay(10)
assertNotNull(exception)
assertTrue(exception is IllegalArgumentException)
assertEquals("Error", exception.message)
}

@Test
fun testConcurrentExecution(): Unit = runTest {
// Create a job that takes 100ms to execute.
val job = createTestJob(
scheduler = testScheduler, runConcurrently = true
) { delay(100) }

var onSuccessCalled = 0
val onSuccess: () -> Unit = { onSuccessCalled += 1 }
val onError: (Throwable) -> Unit = { fail("onError should not be called") }
// Execute the job 3 times concurrently.
executor.execute(job, onSuccess, onError)
executor.execute(job, onSuccess, onError)
executor.execute(job, onSuccess, onError)
// Wait for the jobs to complete.
delay(110)
assertEquals(3, onSuccessCalled)
}

@Test
fun testNonConcurrentExecution(): Unit = runTest {
// Create a job that takes 100ms to execute.
val job = createTestJob(scheduler = testScheduler, runConcurrently = false) { delay(100) }

var onSuccessCalled = 0
val onSuccess: () -> Unit = { onSuccessCalled += 1 }
val onError: (Throwable) -> Unit = { fail("onError should not be called") }
// Execute the job 3 times concurrently.
executor.execute(job, onSuccess, onError)
executor.execute(job, onSuccess, onError)
executor.execute(job, onSuccess, onError)
// Wait for the jobs to complete.
delay(110)
assertEquals(1, onSuccessCalled)
}

private fun createTestJob(
jobId: String = "job1",
runConcurrently: Boolean = true,
scheduler: TestCoroutineScheduler,
callback: suspend () -> Unit,
): Job = Job(
jobId = jobId,
trigger = trigger,
nextRunTime = ZonedDateTime.now(),
dispatcher = UnconfinedTestDispatcher(scheduler),
runConcurrently = runConcurrently,
callback = callback
)
}
61 changes: 61 additions & 0 deletions src/test/kotlin/dev/starry/ktscheduler/KtSchedulerTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,67 @@ class KtSchedulerTest {
assertEquals("longRunningJob", eventListener.completedJobs[0])
}

@Test
fun `scheduler should not execute job concurrently if runConcurrently is false`(): Unit = runTest {
val scheduler = KtScheduler()

// Create a job that takes 2 seconds to execute
val job = Job(
jobId = "longRunningJob",
trigger = IntervalTrigger(intervalSeconds = 1),
nextRunTime = ZonedDateTime.now(),
callback = { delay(2000) },
runConcurrently = false
)
val eventListener = TestJobEventListener()

scheduler.addJob(job)
scheduler.addEventListener(eventListener)
scheduler.start()
// Job should not be completed yet
assertEquals(0, eventListener.completedJobs.size)
// Wait for 3 seconds
Thread.sleep(3000)
// Assert that the job was only executed once in 3 seconds
// because the job is not run concurrently and it takes 2 seconds to execute
assertEquals(1, eventListener.completedJobs.size)
Thread.sleep(1100)
// Assert that the job was executed twice after 4 seconds
assertEquals(2, eventListener.completedJobs.size)
assertEquals("longRunningJob", eventListener.completedJobs[0])
assertEquals("longRunningJob", eventListener.completedJobs[1])
scheduler.shutdown()
}

@Test
fun `scheduler should execute job concurrently if runConcurrently is true`(): Unit = runTest {
val scheduler = KtScheduler()

// Create a job that takes 2 seconds to execute
val job = Job(
jobId = "longRunningJob",
trigger = IntervalTrigger(intervalSeconds = 1),
nextRunTime = ZonedDateTime.now(),
callback = { delay(2000) },
runConcurrently = true
)
val eventListener = TestJobEventListener()

scheduler.addJob(job)
scheduler.addEventListener(eventListener)
scheduler.start()
// Job should not be completed yet
assertEquals(0, eventListener.completedJobs.size)
// Wait for 3 seconds and shutdown the scheduler
Thread.sleep(3100)
scheduler.shutdown()
// Assert that the job was executed twice in 3 seconds
// because the job is run concurrently and it takes 2 seconds to execute
assertEquals(2, eventListener.completedJobs.size)
assertEquals("longRunningJob", eventListener.completedJobs[0])
assertEquals("longRunningJob", eventListener.completedJobs[1])
}

private fun createTestJob(
jobId: String,
runAt: ZonedDateTime = ZonedDateTime.now().plusSeconds(1),
Expand Down

0 comments on commit cbab848

Please sign in to comment.