Skip to content

Commit

Permalink
fix: ThreadPools fixed. Removed Bound type
Browse files Browse the repository at this point in the history
  • Loading branch information
y9san9 committed Aug 18, 2024
1 parent 56be14d commit dd895bf
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 141 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ suspend fun main() {
key = { it % 2 },
action = { delay(1_000) }
).collect()

// This will be executed in roughly 1 second because of single-threaded pool
val singleThreadedQueue = AQueue.fixedThreadPool(numberOfThreads = 1, name = "Test")

natural.mapInAQueue(
queue = singleThreadedQueue,
action = {
Thread.sleep(100)
it
}
).collect()
}
```

Expand Down
63 changes: 5 additions & 58 deletions core/src/commonMain/kotlin/me/y9san9/aqueue/AQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,70 +9,17 @@ import kotlin.coroutines.EmptyCoroutineContext
public interface AQueue {

/**
* Executes [request] with fine-grained control over concurrency
* Executes [action] with fine-grained control over concurrency
*
* @param request The request to execute
* @param key It is guaranteed that requests with the same [key] will be executed consecutively
* @param context The context that is used to launch new coroutines. You may limit parallelism using context
* @param action The action to perform with [request]
* @param action The action to perform
*/
public suspend fun <TRequest, TResponse> execute(
request: TRequest,
public suspend fun <T> execute(
key: Any? = null,
context: CoroutineContext = EmptyCoroutineContext,
action: suspend (TRequest) -> TResponse
): TResponse

/**
* Creates an Asynchronous Queue that has all parameters provided except
* of the request itself.
*/
public fun <TRequest, TResponse> bind(
key: (TRequest) -> Any? = { null },
context: CoroutineContext = EmptyCoroutineContext,
queue: AQueue = AQueue(),
action: suspend (TRequest) -> TResponse,
): Bound<TRequest, TResponse> {
return Bound(key, context, queue, action)
}

/**
* Asynchronous Queue that has all parameters provided except
* of the request itself.
*
* @param key It is guaranteed that requests with the same [key] will be executed consecutively
* @param context The context that is used to launch new coroutines. You may limit parallelism using context
* @param queue The queue that is used to parallel requests
* @param action The action to perform with request
*/
public class Bound<TRequest, TResponse>(
private val key: (TRequest) -> Any? = { null },
private val context: CoroutineContext = EmptyCoroutineContext,
private val queue: AQueue = AQueue(),
private val action: suspend (TRequest) -> TResponse,
) {
/**
* Executes [request] with fine-grained control over concurrency
*/
public suspend fun execute(
request: TRequest,
key: Any? = this.key(request),
context: CoroutineContext = this.context
): TResponse {
return queue.execute(request, key, context, action)
}

public fun copy(
key: (TRequest) -> Any? = this.key,
context: CoroutineContext = this.context,
queue: AQueue = this.queue,
action: suspend (TRequest) -> TResponse = this.action,
): Bound<TRequest, TResponse> {
return Bound(key, context, queue, action)
}

public companion object
}
action: suspend () -> T
): T

public companion object
}
38 changes: 17 additions & 21 deletions core/src/commonMain/kotlin/me/y9san9/aqueue/LinkedAQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,32 @@ public class LinkedAQueue : AQueue {
private val pendingMap = PendingMap()

/**
* Executes [request] with fine-grained control over concurrency
* Executes [action] with fine-grained control over concurrency
*
* @param request The request to execute
* @param key It is guaranteed that requests with the same [key] will be executed consecutively
* @param context The context that is used to launch new coroutines. You may limit parallelism using context
* @param action The action to perform with [request]
* @param action The action to perform
*/
override suspend fun <TRequest, TResponse> execute(
request: TRequest,
override suspend fun <T> execute(
key: Any?,
context: CoroutineContext,
action: suspend (TRequest) -> TResponse
): TResponse {
return coroutineScope {
val scope = this
action: suspend () -> T
): T = coroutineScope {
val scope = this

suspendCancellableCoroutine { continuation ->
launch(start = CoroutineStart.UNDISPATCHED) {
pendingMap.putPending(key) { pendingJob ->
launch {
pendingJob?.join()
val result = runCatching { action(request) }
pendingMap.finishPendingJob(key, coroutineContext.job)
continuation.resumeWith(result)
}
suspendCancellableCoroutine { continuation ->
launch(start = CoroutineStart.UNDISPATCHED) {
pendingMap.putPending(key) { pendingJob ->
launch(context) {
pendingJob?.join()
val result = runCatching { action() }
pendingMap.finishPendingJob(key, coroutineContext.job)
continuation.resumeWith(result)
}
}
continuation.invokeOnCancellation { cancellation ->
if (cancellation is CancellationException) scope.cancel(cancellation)
}
}
continuation.invokeOnCancellation { cancellation ->
if (cancellation is CancellationException) scope.cancel(cancellation)
}
}
}
Expand Down
38 changes: 13 additions & 25 deletions core/src/commonMain/kotlin/me/y9san9/aqueue/flow/AQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,25 @@ import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

/**
* Parallels flow with fine-grained control using [AQueue.Bound] queue.
* Parallels flow using provided [queue]
*
* @param queue The queue used to parallel given flow
* @param key It is guaranteed that requests with the same [key] will be executed consecutively
* @param context The context that is used to launch new coroutines. You may limit parallelism using context
* @param queue The queue used to parallel flow
* @param action The action to perform with request
*/
public fun <TRequest, TResponse> Flow<TRequest>.mapInAQueue(
queue: AQueue.Bound<TRequest, TResponse>
): Flow<TResponse> {
public fun <T, R> Flow<T>.mapInAQueue(
key: (T) -> Any? = { null },
queue: AQueue = AQueue(),
context: CoroutineContext = EmptyCoroutineContext,
action: suspend (T) -> R,
): Flow<R> {
return channelFlow {
collect { request ->
collect { element ->
launch(start = CoroutineStart.UNDISPATCHED) {
val result = queue.execute(request)
val result = queue.execute(key(element), context) { action(element) }
send(result)
}
}
}
}

/**
* Constructs new [AQueue.Bound] and parallels flow using it.
*
* @param key It is guaranteed that requests with the same [key] will be executed consecutively
* @param context The context that is used to launch new coroutines. You may limit parallelism using context
* @param queue The queue used to parallel flow
* @param action The action to perform with request
*/
public fun <TRequest, TResponse> Flow<TRequest>.mapInAQueue(
key: (TRequest) -> Any? = { null },
context: CoroutineContext = EmptyCoroutineContext,
queue: AQueue = AQueue(),
action: suspend (TRequest) -> TResponse,
): Flow<TResponse> {
val bound = AQueue.Bound(key, context, queue, action)
return mapInAQueue(bound)
}
52 changes: 22 additions & 30 deletions core/src/jvmMain/kotlin/me/y9san9/aqueue/AQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,49 +11,41 @@ import kotlin.coroutines.EmptyCoroutineContext
*
* @param numberOfThreads The number of threads for [newFixedThreadPoolContext]
* @param name The name for [newFixedThreadPoolContext]
* @param key It is guaranteed that requests with the same [key] will be executed consecutively
* @param context The context that is used to launch new coroutines. You may limit parallelism using context
* @param queue The queue that is used to parallel requests
* @param action The action to perform with [request]
*/
@DelicateCoroutinesApi
public fun <TRequest, TResponse> AQueue.Bound.Companion.fixedThreadPool(
public fun AQueue.Companion.fixedThreadPool(
numberOfThreads: Int,
name: String,
key: (TRequest) -> Any? = { null },
context: CoroutineContext = EmptyCoroutineContext,
queue: AQueue = AQueue(),
action: suspend (TRequest) -> TResponse,
): AQueue.Bound<TRequest, TResponse> {
queue: AQueue = AQueue()
): AQueue {
val fixedContext = newFixedThreadPoolContext(numberOfThreads, name)

return AQueue.Bound(
key = key,
context = context + fixedContext,
queue = queue,
action = action
)
return object : AQueue {
override suspend fun <T> execute(key: Any?, context: CoroutineContext, action: suspend () -> T): T {
return queue.execute(
key = key,
context = context + fixedContext,
action = action
)
}
}
}

/**
* Asynchronous Queue that uses [Dispatchers.IO] to create a queue
*
* @param key It is guaranteed that requests with the same [key] will be executed consecutively
* @param context The context that is used to launch new coroutines. You may limit parallelism using context
* @param queue The queue that is used to parallel requests
* @param action The action to perform with [request]
*/
@DelicateCoroutinesApi
public fun <TRequest, TResponse> AQueue.Bound.Companion.io(
key: (TRequest) -> Any? = { null },
context: CoroutineContext = EmptyCoroutineContext,
queue: AQueue = AQueue(),
action: suspend (TRequest) -> TResponse,
): AQueue.Bound<TRequest, TResponse> {
return AQueue.Bound(
key = key,
context = context + Dispatchers.IO,
queue = queue,
action = action
)
public fun AQueue.Companion.io(queue: AQueue = AQueue()): AQueue {
return object : AQueue {
override suspend fun <T> execute(key: Any?, context: CoroutineContext, action: suspend () -> T): T {
return queue.execute(
key = key,
context = context + Dispatchers.IO,
action = action
)
}
}
}
30 changes: 24 additions & 6 deletions example/src/main/kotlin/Main.kt
Original file line number Diff line number Diff line change
@@ -1,34 +1,38 @@
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.take
import me.y9san9.aqueue.AQueue
import me.y9san9.aqueue.fixedThreadPool
import me.y9san9.aqueue.flow.mapInAQueue
import kotlin.time.measureTime

@OptIn(DelicateCoroutinesApi::class)
suspend fun main() {
val natural = flow {
var number = 0
while (true) emit(number++)
}.take(count = 10)

// This will be executed in roughly 1 second,
// This will be executed in roughly 0.1 seconds,
// because every key is unique.
// Every action will run in parallel.
val duration1 = measureTime {
natural.mapInAQueue(
key = { it },
action = { delay(1_000); it }
action = { delay(100); it }
).collect()
}
println("First: $duration1")

// This will be executed in roughly 10 seconds,
// This will be executed in roughly 1 second,
// because all keys are the same.
// Every action will run consecutively.
val duration2 = measureTime {
natural.mapInAQueue(
key = { Unit },
action = { delay(1_000); it }
action = { delay(100); it }
).collect()
}
println("Second: $duration2")
Expand All @@ -38,12 +42,26 @@ suspend fun main() {
// There would be 2 consecutive queues:
// - For even numbers
// - For odd numbers
// Two queues cut time from 10 seconds to 5 seconds
// Two queues cut time from 1 second to 0.5 seconds
val duration3 = measureTime {
natural.mapInAQueue(
key = { it % 2 },
action = { delay(1_000); it }
action = { delay(100); it }
).collect()
}
println("Third: $duration3")

// This will be executed in roughly 1 second because of single-threaded pool
val singleThreadedQueue = AQueue.fixedThreadPool(numberOfThreads = 1, name = "Test")

val duration4 = measureTime {
natural.mapInAQueue(
queue = singleThreadedQueue,
action = {
Thread.sleep(100)
it
}
).collect()
}
println("Third: $duration4")
}
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ kotlin = "2.0.0"
coroutines = "1.8.0"
maven-publish = "0.29.0"

aqueue = "1.0.1"
aqueue = "1.0.2"

[libraries]

Expand Down

0 comments on commit dd895bf

Please sign in to comment.