Skip to content

Commit

Permalink
Merge pull request #3110 from ivan-klass/merge-sorted
Browse files Browse the repository at this point in the history
Add .interleaveOrdered to combine sorted streams (i.e. "merge-sort")
  • Loading branch information
diesalbla authored Jan 20, 2023
2 parents 848dbd6 + 1aa41fe commit 597102b
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 2 deletions.
1 change: 1 addition & 0 deletions core/shared/src/main/scala/fs2/Pull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,7 @@ object Pull extends PullLowPriority {

private final case class GetScope[F[_]]() extends AlgEffect[Nothing, Scope[F]]

/** Ignores current stepLeg head, goes on with remaining data */
private[fs2] def stepLeg[F[_], O](
leg: Stream.StepLeg[F, O]
): Pull[F, Nothing, Option[Stream.StepLeg[F, O]]] =
Expand Down
63 changes: 62 additions & 1 deletion core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1979,6 +1979,67 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
): Stream[F2, O2] =
that.mergeHaltL(this)

/** Given two sorted streams emits a single sorted stream, like in merge-sort.
* For entries that are considered equal by the Order, left stream element is emitted first.
* Note: both this and another streams MUST BE ORDERED already
* @example {{{
* scala> Stream(1, 2, 5, 6).interleaveOrdered(Stream(0, 2, 3, 4)).toList
* res0: List[Int] = List(0, 1, 2, 2, 3, 4, 5, 6)
* }}}
*/
def interleaveOrdered[F2[x] >: F[x], O2 >: O: Order](that: Stream[F2, O2]): Stream[F2, O2] = {
val order = Order[O2].toOrdering // collections API needs Ordering, not cats.Order

def go(
leftLeg: Stream.StepLeg[F2, O2],
rightLeg: Stream.StepLeg[F2, O2]
): Pull[F2, O2, Unit] = {
val lChunk = leftLeg.head
val rChunk = rightLeg.head
if (lChunk.nonEmpty && rChunk.nonEmpty) { // the only case we need chunk merging and sorting
val lLast = lChunk(lChunk.size - 1)
val rLast = rChunk(rChunk.size - 1)
val wholeLeftSide = Order.lteqv(lLast, rLast) // otherwise we can emit whole right
val (emitLeft, keepLeft) =
if (wholeLeftSide) (lChunk, Chunk.empty)
else
lChunk.splitAt(
lChunk.indexWhere(order.gt(_, rLast)).getOrElse(lChunk.size)
)
val (emitRight, keepRight) =
if (!wholeLeftSide) (rChunk, Chunk.empty)
else
rChunk.splitAt( // not emitting equal from right side to keep stable sorting
rChunk.indexWhere(order.gteq(_, lLast)).getOrElse(rChunk.size)
)
Pull.output(
Chunk.vector((emitLeft ++ emitRight).toVector.sorted(order))
) >> go(leftLeg.setHead(keepLeft), rightLeg.setHead(keepRight))
} else { // otherwise, we need to shift leg
if (lChunk.isEmpty) {
leftLeg.stepLeg.flatMap {
case Some(nextLl) => go(nextLl, rightLeg)
case None => Pull.output(rChunk) >> rightLeg.next
}
} else {
rightLeg.stepLeg.flatMap {
case Some(nextRl) => go(leftLeg, nextRl)
case None => Pull.output(lChunk) >> leftLeg.next
}
}
}
}

val thisPull = covaryAll[F2, O2].pull
val thatPull = that.pull

(thisPull.stepLeg, thatPull.stepLeg).tupled.flatMap {
case (Some(leg1), Some(leg2)) => go(leg1, leg2)
case (_, None) => thisPull.echo
case (None, _) => thatPull.echo
}.stream
}

/** Emits each output wrapped in a `Some` and emits a `None` at the end of the stream.
*
* `s.noneTerminate.unNoneTerminate == s`
Expand Down Expand Up @@ -5020,7 +5081,7 @@ object Stream extends StreamLowPriority {
def setHead[O2 >: O](nextHead: Chunk[O2]): StepLeg[F, O2] =
new StepLeg[F, O2](nextHead, scopeId, next)

/** Provides an `uncons`-like operation on this leg of the stream. */
/** Provides an `uncons`-like operation on this leg of the stream, dropping current `head` */
def stepLeg: Pull[F, Nothing, Option[StepLeg[F, O]]] =
Pull.stepLeg(self)
}
Expand Down
31 changes: 30 additions & 1 deletion core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import cats.effect.{IO, SyncIO}
import cats.syntax.all._
import fs2.concurrent.SignallingRef
import org.scalacheck.effect.PropF.forAllF
import org.scalacheck.Gen
import org.scalacheck.{Arbitrary, Gen}
import org.scalacheck.Prop.forAll

import scala.concurrent.duration._
Expand Down Expand Up @@ -890,6 +890,35 @@ class StreamCombinatorsSuite extends Fs2Suite {
)
}
}

property("interleaveOrdered for ordered streams emits stable-sorted stream with same data") {
// stability estimating element type and ordering
type Elem = (Int, Byte)
implicit val ordering: Ordering[Elem] = Ordering.by(_._1)
implicit val order: cats.Order[Elem] = cats.Order.fromOrdering

type SortedData = Vector[Chunk[Elem]]
implicit val arbSortedData: Arbitrary[SortedData] = Arbitrary(
for {
sortedData <- Arbitrary.arbContainer[Array, Elem].arbitrary.map(_.sorted)
splitIdxs <- Gen.someOf(sortedData.indices).map(_.sorted)
borders = (0 +: splitIdxs).zip(splitIdxs :+ sortedData.length)
} yield borders.toVector
.map { case (from, to) =>
Chunk.array(sortedData, from, to - from)
}
)

def mkStream(parts: SortedData): Stream[Pure, Elem] = parts.map(Stream.chunk).combineAll

forAll { (sortedL: SortedData, sortedR: SortedData) =>
mkStream(sortedL)
.interleaveOrdered(mkStream(sortedR))
.assertEmits(
(sortedL ++ sortedR).toList.flatMap(_.toList).sorted // std .sorted is stable
)
}
}
}

property("intersperse") {
Expand Down

0 comments on commit 597102b

Please sign in to comment.