Skip to content

Commit

Permalink
make SynchronousLocalConnection work with multiple clients
Browse files Browse the repository at this point in the history
  • Loading branch information
rmgk committed Nov 24, 2024
1 parent e067144 commit 36370bb
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,49 @@ package channels
import de.rmgk.delay.{Async, Callback, Promise, Sync}

/** Allows establishing a single direct synchronous connection. */
// You like callback hell? Definitely callback hell.
class SynchronousLocalConnection[T] {
object client extends LatentConnection[T] {
def client(id: String): LatentConnection[T] = new LatentConnection[T] {

object connection extends Connection[T] {
val toServerMessages: Promise[Callback[T]] = Promise()

object toServer extends Connection[T] {
def send(msg: T): Async[Any, Unit] = Async {
val cb = server.incomingMessageCallback.async.bind
val cb = toServerMessages.async.bind
cb.succeed(msg)
}
override def close(): Unit = ()

override def toString: String = s"From[$id]"
}

def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Async {
val callback = incomingHandler.getCallbackFor(connection)
val callback = incomingHandler.getCallbackFor(toServer)

val serverConn = new Connection[T] {
val toClient = new Connection[T] {
override def close(): Unit = ()
override def send(message: T): Async[Any, Unit] = Sync {
callback.succeed(message)
}
override def toString: String = s"To[$id]"
}

val established = server.connectionEstablished.async.bind
established.succeed(serverConn)
connection
established.succeed(server.Establish(toClient, toServerMessages))
toServer
}
}

object server extends LatentConnection[T] {

val incomingMessageCallback: Promise[Callback[T]] = Promise()

val connectionEstablished: Promise[Callback[Connection[T]]] = Promise()
case class Establish(serverSendsOn: Connection[T], clientConnectionSendsTo: Promise[Callback[T]])
val connectionEstablished: Promise[Callback[Establish]] = Promise()

def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Async.fromCallback[Connection[T]] {
def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Async.fromCallback[Establish] {
connectionEstablished.succeed(Async.handler)
}.map { conn =>
incomingMessageCallback.succeed(incomingHandler.getCallbackFor(conn))
conn
}.map { connChan =>
connChan.clientConnectionSendsTo.succeed(incomingHandler.getCallbackFor(connChan.serverSendsOn))
connChan.serverSendsOn
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object TodoDataManager {
)(create: (A, Fold.Branch[DeltaBuffer[A]]) => Signal[DeltaBuffer[A]]) = {
dataManager.lock.synchronized {
dataManager.applyDelta(wrap(init))
val fullInit = dataManager.allDeltas.flatMap(v => unwrap(v.data)).foldLeft(init)(Lattice.merge)
val fullInit = dataManager.allPayloads.flatMap(v => unwrap(v.data)).foldLeft(init)(Lattice.merge)

val branch = Fold.branch[DeltaBuffer[A]] {
receivedCallback.value.flatMap(unwrap) match
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object Todolist {

document.body.appendChild:
all.div.render.reattach(TodoDataManager.receivedCallback.map(_ =>
val state = TodoDataManager.dataManager.allDeltas.map(_.data).reduceOption(Lattice.merge)
val state = TodoDataManager.dataManager.allPayloads.map(_.data).reduceOption(Lattice.merge)
all.pre(all.stringFrag(pprint.apply(state).plainText)).render
).hold(all.span.render))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ class DeltaDissemination[State](
}

// note that deltas are not guaranteed to be ordered the same in the buffers
private val lock: AnyRef = new {}
private var pastDeltas: List[Payload[State]] = Nil
val lock: AnyRef = new {}
private var pastPayloads: List[Payload[State]] = Nil

def allDeltas: List[Payload[State]] = pastDeltas
def allPayloads: List[Payload[State]] = pastPayloads

private var contexts: Map[Uid, Dots] = Map.empty

Expand All @@ -125,7 +125,7 @@ class DeltaDissemination[State](
val nextDot = selfContext.nextDot(replicaId.uid)
val payload = Payload(replicaId.uid, Dots.single(nextDot), delta)
updateContext(replicaId.uid, payload.dots)
pastDeltas = payload :: pastDeltas
pastPayloads = payload :: pastPayloads
payload
}
disseminate(payload)
Expand All @@ -145,7 +145,7 @@ class DeltaDissemination[State](
case Pong(time) =>
println(s"ping took ${(System.nanoTime() - time.toLong).doubleValue / 1000_000}ms")
case Request(uid, knows) =>
val relevant = pastDeltas.filterNot { dt => dt.dots <= knows }
val relevant = pastPayloads.filterNot { dt => dt.dots <= knows }
relevant.foreach: msg =>
biChan.send(msg.addSender(replicaId.uid)).run(using ())(debugCallbackAndRemoveCon(biChan))
updateContext(uid, selfContext `merge` knows)
Expand All @@ -156,10 +156,11 @@ class DeltaDissemination[State](
updateContext(uid, context)
}
updateContext(replicaId.uid, context)
pastDeltas = payload :: pastDeltas
pastPayloads = payload :: pastPayloads
}
receiveCallback(data)
if immediateForward then disseminate(payload, Set(biChan))
if immediateForward then
disseminate(payload, Set(biChan))

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,24 @@ class DeltaDisseminationTest extends munit.FunSuite {
test("basics") {

// I have no clue why this syntax is still not deprecated xD
val dd1, dd2 = DeltaDissemination[Set[String]](LocalUid.gen(), _ => ())
val dd1, dd2, dd3 = DeltaDissemination[Set[String]](LocalUid.gen(), _ => (), None, true)

val sync = SynchronousLocalConnection[ProtocolMessage[Set[String]]]()

dd2.addLatentConnection(sync.client)
dd2.addLatentConnection(sync.client("2"))
dd1.addLatentConnection(sync.server)
dd3.addLatentConnection(sync.client("3"))

dd1.pingAll()
dd2.pingAll()
dd3.pingAll()

dd1.applyDelta(Set("a"))
dd2.applyDelta(Set("b"))
dd3.applyDelta(Set("c"))

assertEquals(dd1.allDeltas.toSet, dd2.allDeltas.toSet)
assertEquals(dd1.allPayloads.map(_.data).toSet, dd2.allPayloads.map(_.data).toSet)
assertEquals(dd2.allPayloads.map(_.data).toSet, dd3.allPayloads.map(_.data).toSet)

}
}

0 comments on commit 36370bb

Please sign in to comment.