Skip to content

Commit

Permalink
Improvement to batching
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Aug 13, 2023
1 parent 62be458 commit 4f40b78
Showing 1 changed file with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ object Processing {
private def parseBytes[F[_]: Async](
env: Environment[F],
processor: BadRowProcessor
): Pipe[F, List[Array[Byte]], (List[BadRow], List[Parsed])] =
): Pipe[F, List[Array[Byte]], (List[BadRow], Batched)] =
_.parEvalMapUnordered(env.cpuParallelism) { list =>
list
.traverse { bytes =>
Expand All @@ -166,6 +166,13 @@ object Processing {
}
}
.map(_.separate)
.map { case (bad, parsed) =>
val batched = parsed.foldLeft(Monoid[Batched].empty) {
case (Batched(allEvents, allEntities, allBytes), Parsed(event, bytes, entities)) =>
Batched(event :: allEvents, allEntities |+| entities, allBytes + bytes.toLong)
}
(bad, batched)
}
}

private implicit def batchedMonoid: Monoid[Batched] = new Monoid[Batched] {
Expand All @@ -174,23 +181,18 @@ object Processing {
Batched(x.events |+| y.events, x.entities |+| y.entities, x.originalBytes + y.originalBytes)
}

private def batchUp[F[_]](maxBytes: Long): Pipe[F, List[Parsed], Batched] = {
private def batchUp[F[_]](maxBytes: Long): Pipe[F, Batched, Batched] = {

def go(
source: Stream[F, List[Parsed]],
source: Stream[F, Batched],
batch: Batched
): Pull[F, Batched, Unit] =
source.pull.uncons1.flatMap {
case None if batch.originalBytes > 0 => Pull.output1(batch) >> Pull.done
case None => Pull.done
case Some((Nil, source)) => go(source, batch)
case Some((pulled, source)) =>
val combined = pulled
.map { case Parsed(event, originalBytes, entities) =>
Batched(List(event), entities, originalBytes.toLong)
}
.foldLeft(batch)(_ |+| _)

val combined = batch |+| pulled
if (combined.originalBytes > maxBytes)
Pull.output1(combined) >> go(source, Monoid[Batched].empty)
else
Expand Down

0 comments on commit 4f40b78

Please sign in to comment.