Skip to content

Commit

Permalink
add RequestResponseQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
Kiibou-chan committed Dec 2, 2024
1 parent 98c905e commit 32c5ba9
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package probench.data

import probench.data.RequestResponseQueue.{Req, Res}
import rdts.base.{Bottom, Lattice, LocalUid, Uid}
import rdts.datatypes.GrowOnlyCounter
import rdts.dotted.{Dotted, HasDots}
import rdts.time.{Dot, Dots, VectorClock}

import scala.collection.immutable.Queue

case class RequestResponseQueue[S, T](
requests: Queue[Req[S]],
responses: Queue[Res[S, T]],
processed: Map[LocalUid, VectorClock],
clock: VectorClock
) {

type Delta = Dotted[RequestResponseQueue[S, T]]

def request(value: S)(using id: LocalUid): Delta = {
val time = clock.merge(clock.inc(LocalUid.replicaId))
val dot = time.dotOf(LocalUid.replicaId)

Dotted(
RequestResponseQueue(
Queue(Req(value, id, dot, time)),
Queue.empty,
Map.empty,
time
)
)
}

def respond(request: Req[S], value: T)(using id: LocalUid): Delta = {
val time = clock.merge(clock.inc(LocalUid.replicaId))
val dot = time.dotOf(LocalUid.replicaId)

Dotted(
RequestResponseQueue(
Queue.empty,
Queue(Res(request, value, id, dot, time)),
Map.empty,
time
)
)
}

def responsesTo(req: Req[S]): Seq[Res[S, T]] =
responses.filter(res => res.request == req)

def firstUnansweredRequests: Option[Req[S]] =
requests.collectFirst { case r: Req[S] if responsesTo(r).isEmpty => r }

def complete(req: Req[S])(using id: LocalUid): Delta = {
val time = clock.merge(clock.inc(LocalUid.replicaId))

Dotted(
RequestResponseQueue(
Queue.empty,
Queue.empty,
Map(id -> req.order),
time
)
)
}

}

object RequestResponseQueue {
case class Req[+T](value: T, requester: LocalUid, dot: Dot, order: VectorClock)
case class Res[+S, +T](request: Req[S], value: T, responder: LocalUid, dot: Dot, order: VectorClock)

given reqLattice[T]: Lattice[Req[T]] with {
override def merge(left: Req[T], right: Req[T]): Req[T] =
if left.order < right.order then right else left
}

given resLattice[S, T]: Lattice[Res[S, T]] with {
override def merge(left: Res[S, T], right: Res[S, T]): Res[S, T] =
if left.order < right.order then right else left
}

def empty[S, T]: RequestResponseQueue[S, T] = RequestResponseQueue(Queue.empty, Queue.empty, Map.empty, VectorClock.zero)

given hasDots[S, T]: HasDots[RequestResponseQueue[S, T]] with {
extension (value: RequestResponseQueue[S, T])
override def dots: Dots = Dots.from(value.requests.view.map(_.dot) ++ value.responses.view.map(_.dot))

override def removeDots(dots: Dots): Option[RequestResponseQueue[S, T]] =
Some(
RequestResponseQueue(
value.requests.filter(req => !dots.contains(req.dot)),
value.responses.filter(res => !dots.contains(res.dot)),
value.processed,
value.clock
)
)
}

given procLattice: Lattice[Map[LocalUid, VectorClock]] = Lattice.mapLattice
given bottomInstance[S, T]: Bottom[RequestResponseQueue[S, T]] = Bottom.derived

given lattice[S, T]: Lattice[RequestResponseQueue[S, T]] with {
override def merge(
left: RequestResponseQueue[S, T],
right: RequestResponseQueue[S, T]
): RequestResponseQueue[S, T] =
val processed = left.processed.merge(right.processed)

RequestResponseQueue(
(left.requests concat right.requests)
.filter { req => processed.get(req.requester).forall(time => !(req.order <= time)) }
.sortBy { req => req.order }(using Ordering.fromLessThan(VectorClock.vectorClockOrdering.lt))
.distinct,
(left.responses concat right.responses)
.filter { req => processed.get(req.request.requester).forall(time => !(req.request.order <= time)) }
.sortBy { req => req.order }(using Ordering.fromLessThan(VectorClock.vectorClockOrdering.lt))
.distinct,
processed,
left.clock.merge(right.clock)
)

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package probench.data

import rdts.base.{Lattice, LocalUid}
import rdts.dotted.Dotted

import scala.util.Random

class RequestResponseQueueTest extends munit.ScalaCheckSuite {

type CUT = Dotted[RequestResponseQueue[String, String]]

def empty: CUT = Dotted.empty

test("add request works") {
given LocalUid = LocalUid.predefined("id1")

val queue = empty
val reqDelta = queue.mod(q => q.request("one"))
val merged = queue.merge(reqDelta)

assertEquals(merged.data.requests.head.value, "one")
}

test("add requests merge out of order") {
given LocalUid = LocalUid.predefined("id1")

var queue = empty

val deltas = (0 to 10).map { i =>
val delta = queue.mod(q => q.request(f"req $i"))
queue = queue.merge(delta)
delta
}

for _ <- 0 to 100 do {
assertEquals(Random.shuffle(deltas).foldLeft(empty)(Lattice.merge), queue)
}
}

test("respond to single request") {
given LocalUid = LocalUid.predefined("id1")

val queue = empty
val reqDelta = queue.mod(q => q.request("one"))
val resDelta = queue.merge(reqDelta).mod(q => q.respond(reqDelta.data.requests.head, "1"))
val merged: CUT = queue.merge(reqDelta).merge(resDelta)

assertEquals(merged.data.responsesTo(merged.data.requests.head).map(_.value), List("1"))
}

test("respond merge out of order") {
given LocalUid = LocalUid.predefined("id1")

var queue = empty

val reqDeltas = (0 to 10).map { i =>
val delta = queue.mod(q => q.request(f"req $i"))
queue = queue.merge(delta)
delta
}

val resDeltas = (0 to 10).map { i =>
val delta = queue.mod(q => q.respond(q.requests(i), f"res $i"))
queue = queue.merge(delta)
delta
}

val allDeltas = reqDeltas ++ resDeltas

for _ <- 0 to 100 do {
assertEquals(Random.shuffle(allDeltas).foldLeft(empty)(Lattice.merge), queue)
}
}

test("one uncompleted, one completed") {
given LocalUid = LocalUid.predefined("id1")

var queue = empty

def mod(f: CUT => CUT): CUT = {
val delta = f(queue)
queue = queue.merge(delta)
delta
}

val req1 = mod(_.mod(_.request("one")))
val req2 = mod(_.mod(_.request("two")))
val res1 = mod(_.mod(q => q.respond(q.requests.head, "1")))
val res2 = mod(_.mod(q => q.respond(q.requests(1), "2")))

assertEquals(queue.data.requests.map(_.value).toList, List("one", "two"))
assertEquals(queue.data.responses.map(_.value).toList, List("1", "2"))

val requestOne = queue.data.requests.head

val comp = mod(_.mod(q => q.complete(q.requests.head)))

assertEquals(queue.data.requests.map(_.value).toList, List("two"))
assertEquals(queue.data.responses.map(_.value).toList, List("2"))

mod(_.mod(q => q.respond(requestOne, "1")))

assertEquals(queue.data.responses.map(_.value).toList, List("2"))
}

}

0 comments on commit 32c5ba9

Please sign in to comment.