Skip to content

Commit

Permalink
make synchronous local connection a proper non test implementation
Browse files Browse the repository at this point in the history
I am unreasonably satisfied that it just works like this
  • Loading branch information
rmgk committed Nov 24, 2024
1 parent f8d3a07 commit e067144
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package channels

import de.rmgk.delay.{Async, Callback, Promise, Sync}

/** Allows establishing a single direct synchronous connection. */
class SynchronousLocalConnection[T] {
object client extends LatentConnection[T] {

object connection extends Connection[T] {
def send(msg: T): Async[Any, Unit] = Async {
val cb = server.incomingMessageCallback.async.bind
cb.succeed(msg)
}
override def close(): Unit = ()
}

def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Async {
val callback = incomingHandler.getCallbackFor(connection)

val serverConn = new Connection[T] {
override def close(): Unit = ()
override def send(message: T): Async[Any, Unit] = Sync {
callback.succeed(message)
}
}

val established = server.connectionEstablished.async.bind
established.succeed(serverConn)
connection
}
}

object server extends LatentConnection[T] {

val incomingMessageCallback: Promise[Callback[T]] = Promise()

val connectionEstablished: Promise[Callback[Connection[T]]] = Promise()

def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Async.fromCallback[Connection[T]] {
connectionEstablished.succeed(Async.handler)
}.map { conn =>
incomingMessageCallback.succeed(incomingHandler.getCallbackFor(conn))
conn
}
}

}
Original file line number Diff line number Diff line change
@@ -1,60 +1,22 @@
package replication.example

import channels.{Abort, Connection, Handler, LatentConnection}
import de.rmgk.delay.{Async, Callback, Sync}
import channels.SynchronousLocalConnection
import rdts.base.LocalUid
import replication.{DeltaDissemination, ProtocolMessage}

class SynchronizedConnection[T] {
object client extends LatentConnection[T] {

object connection extends Connection[T] {
def send(msg: T): Async[Any, Unit] = Sync {
server.callback.succeed(msg)
}
override def close(): Unit = ()
}

def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Sync {
val callback = incomingHandler.getCallbackFor(connection)

val serverConn = new Connection[T] {
override def close(): Unit = ()
override def send(message: T): Async[Any, Unit] = Sync {
callback.succeed(message)
}
}
server.connectionEstablished.succeed(serverConn)
connection
}
}

object server extends LatentConnection[T] {

var callback: Callback[T] = null

var connectionEstablished: Callback[Connection[T]] = null

def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Async.fromCallback[Connection[T]] {
connectionEstablished = Async.handler
}.map { conn =>
callback = incomingHandler.getCallbackFor(conn)
conn
}
}

}

class DeltaDisseminationTest extends munit.FunSuite {
test("basics") {

// I have no clue why this syntax is still not deprecated xD
val dd1, dd2 = DeltaDissemination[Set[String]](LocalUid.gen(), _ => ())

val sync = SynchronizedConnection[ProtocolMessage[Set[String]]]()
val sync = SynchronousLocalConnection[ProtocolMessage[Set[String]]]()

dd1.addLatentConnection(sync.server)
dd2.addLatentConnection(sync.client)
dd1.addLatentConnection(sync.server)

dd1.pingAll()
dd2.pingAll()

dd1.applyDelta(Set("a"))
dd2.applyDelta(Set("b"))
Expand Down

0 comments on commit e067144

Please sign in to comment.