Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add time windowed chunking, groupedWithin #185

Open
Dogacel opened this issue Oct 14, 2023 · 2 comments
Open

Add time windowed chunking, groupedWithin #185

Dogacel opened this issue Oct 14, 2023 · 2 comments
Labels
enhancement New feature or request PRs-are-welcome

Comments

@Dogacel
Copy link

Dogacel commented Oct 14, 2023

Similar to groupedWithin in Akka, I think this is a very useful utility to have.

Akka Docs

Proposed solution

fun Flow<T>.groupedWithin(size: Int, limit: Duration): Flow<List<T>> { ... }

implementation can be based on Kotlin/kotlinx.coroutines#1290 (comment)

I have modified that code slightly, I can help work on a solution based on a channel flow.

Behavior

  • Once flow reaches size items, it emits.
  • If flow can't reach size items within limit time, it emits the items collected until now, unless there is none.

Why
This is very useful when we are bridging the gap between the regular APIs and streaming APIs. For example, assume you have an API to fetch SQS messages, traditionally you would implement it as

suspend fun main() {
  val sqsClient = getSqsClient()


  while (true) {
    val items = sqsClient.poll(10)
    process(items)
    delay(10.seconds)
  }
}

instead, we can use

suspend fun main() {
  val sqsClient = getSqsClient()

  flow {
    while (true) {
      val items = sqsClient.poll(10)
      items.forEach { emit(it) }
      delay(10.seconds)
    }
  }.groupedWithin(128, 30.seconds) {
    process(it)
  }
}
@hoc081098 hoc081098 added the enhancement New feature or request label Oct 15, 2023
@hoc081098
Copy link
Owner

Thank @Dogacel 🙏 for this issue.

Is this operator similar to

@Dogacel
Copy link
Author

Dogacel commented Oct 15, 2023

Thank @Dogacel 🙏 for this issue.

Is this operator similar to

I haven't used RxJava but from the explanation, it looks like it is the bufferTime described in RxJava.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request PRs-are-welcome
Projects
None yet
Development

No branches or pull requests

2 participants