Skip to content

Commit

Permalink
parTraverse
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Jul 5, 2023
1 parent c298f63 commit f4c4fdf
Showing 1 changed file with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
package com.snowplowanalytics.snowplow.lakes.processing

import cats.implicits._
import cats.effect.implicits._
import cats.data.NonEmptyList
import cats.{Applicative, Functor, Monad, Monoid}
import cats.{Applicative, Functor, Monad, Monoid, Parallel}
import cats.effect.{Async, Sync}
import cats.effect.kernel.{Ref, Unique}
import fs2.{Pipe, Pull, Stream}
Expand Down Expand Up @@ -117,12 +118,12 @@ object Processing {
// cannot make 100% use of the available cpu.
//
// The computation is wrapped in Applicative[F].pure() so the Cats Effect runtime can cede to other fibers
private def parseBytes[F[_]: Applicative](
private def parseBytes[F[_]: Applicative: Parallel](
processor: BadRowProcessor
): Pipe[F, List[Array[Byte]], (List[BadRow], List[Parsed])] =
_.evalMap { list =>
list
.traverse { bytes =>
.parTraverse { bytes =>
Applicative[F].pure {
val stringified = new String(bytes, StandardCharsets.UTF_8)
Event
Expand Down Expand Up @@ -180,12 +181,12 @@ object Processing {
// available cpu.
//
// The computation is wrapped in Applicative[F].pure() so the Cats Effect runtime can cede to other fibers
private def transformToSpark[F[_]: Applicative](
private def transformToSpark[F[_]: Applicative: Parallel](
processor: BadRowProcessor
): Pipe[F, BatchWithTypes, (List[BadRow], RowsWithSchema)] =
_.evalMap { case BatchWithTypes(events, entities) =>
events
.traverse { event =>
.parTraverse { event =>
Applicative[F].pure {
Transform.eventToRow(processor, event, entities)
}
Expand Down

0 comments on commit f4c4fdf

Please sign in to comment.