diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala index 9799307..cc611f1 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala @@ -8,8 +8,9 @@ package com.snowplowanalytics.snowplow.lakes.processing import cats.implicits._ +import cats.effect.implicits._ import cats.data.NonEmptyList -import cats.{Applicative, Functor, Monad, Monoid} +import cats.{Applicative, Functor, Monad, Monoid, Parallel} import cats.effect.{Async, Sync} import cats.effect.kernel.{Ref, Unique} import fs2.{Pipe, Pull, Stream} @@ -117,12 +118,12 @@ object Processing { // cannot make 100% use of the available cpu. // // The computation is wrapped in Applicative[F].pure() so the Cats Effect runtime can cede to other fibers - private def parseBytes[F[_]: Applicative]( + private def parseBytes[F[_]: Applicative: Parallel]( processor: BadRowProcessor ): Pipe[F, List[Array[Byte]], (List[BadRow], List[Parsed])] = _.evalMap { list => list - .traverse { bytes => + .parTraverse { bytes => Applicative[F].pure { val stringified = new String(bytes, StandardCharsets.UTF_8) Event @@ -180,12 +181,12 @@ object Processing { // available cpu. // // The computation is wrapped in Applicative[F].pure() so the Cats Effect runtime can cede to other fibers - private def transformToSpark[F[_]: Applicative]( + private def transformToSpark[F[_]: Applicative: Parallel]( processor: BadRowProcessor ): Pipe[F, BatchWithTypes, (List[BadRow], RowsWithSchema)] = _.evalMap { case BatchWithTypes(events, entities) => events - .traverse { event => + .parTraverse { event => Applicative[F].pure { Transform.eventToRow(processor, event, entities) }