From 4f40b78fb1b31bcace7ab8f269da44a9bd1274d0 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Sun, 13 Aug 2023 18:53:21 +0100 Subject: [PATCH] Improvement to batching --- .../processing/Processing.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala index 055e3a8..8bd2c2d 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala @@ -150,7 +150,7 @@ object Processing { private def parseBytes[F[_]: Async]( env: Environment[F], processor: BadRowProcessor - ): Pipe[F, List[Array[Byte]], (List[BadRow], List[Parsed])] = + ): Pipe[F, List[Array[Byte]], (List[BadRow], Batched)] = _.parEvalMapUnordered(env.cpuParallelism) { list => list .traverse { bytes => @@ -166,6 +166,13 @@ object Processing { } } .map(_.separate) + .map { case (bad, parsed) => + val batched = parsed.foldLeft(Monoid[Batched].empty) { + case (Batched(allEvents, allEntities, allBytes), Parsed(event, bytes, entities)) => + Batched(event :: allEvents, allEntities |+| entities, allBytes + bytes.toLong) + } + (bad, batched) + } } private implicit def batchedMonoid: Monoid[Batched] = new Monoid[Batched] { @@ -174,10 +181,10 @@ object Processing { Batched(x.events |+| y.events, x.entities |+| y.entities, x.originalBytes + y.originalBytes) } - private def batchUp[F[_]](maxBytes: Long): Pipe[F, List[Parsed], Batched] = { + private def batchUp[F[_]](maxBytes: Long): Pipe[F, Batched, Batched] = { def go( - source: Stream[F, List[Parsed]], + source: Stream[F, Batched], batch: Batched ): Pull[F, Batched, Unit] = source.pull.uncons1.flatMap { @@ -185,12 +192,7 @@ object Processing { case None => Pull.done case Some((Nil, source)) => go(source, batch) case Some((pulled, source)) => - val combined = pulled - .map { case Parsed(event, originalBytes, entities) => - Batched(List(event), entities, originalBytes.toLong) - } - .foldLeft(batch)(_ |+| _) - + val combined = batch |+| pulled if (combined.originalBytes > maxBytes) Pull.output1(combined) >> go(source, Monoid[Batched].empty) else