diff --git a/Modules/Channels/shared/src/main/scala/channels/Channels.scala b/Modules/Channels/shared/src/main/scala/channels/Channels.scala index 635060aff..535b1a704 100644 --- a/Modules/Channels/shared/src/main/scala/channels/Channels.scala +++ b/Modules/Channels/shared/src/main/scala/channels/Channels.scala @@ -55,7 +55,10 @@ trait LatentConnection[T] { * That is, no messages should be lost during setup. * Similarly, the provider of the callback (the result of `incoming`) of this method should make sure that the other end of the callback is ready to receive callbacks before running the async. * - * It is not safe to call prepare multiple times. + * It is generally not assumed to be safe to run prepare twice (neither running a single async twice, nor running two different returned asyncs). + * Notably, “server” like implementations may try to bind a specific port, and immediately fail if that is not available. + * + * The async may produce multiple connections and will run [[incomingHandler]] for each of them. */ def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] } diff --git a/Modules/Channels/shared/src/main/scala/channels/SynchronousLocalConnection.scala b/Modules/Channels/shared/src/main/scala/channels/SynchronousLocalConnection.scala index 40bca41d8..cc2909d67 100644 --- a/Modules/Channels/shared/src/main/scala/channels/SynchronousLocalConnection.scala +++ b/Modules/Channels/shared/src/main/scala/channels/SynchronousLocalConnection.scala @@ -2,11 +2,35 @@ package channels import de.rmgk.delay.{Async, Callback, Promise, Sync} -/** Allows establishing a single direct synchronous connection. */ +/** Allows establishing direct synchronous connections between a single “server” and multiple “clients”. + * + * Synchronous here means that any `send` calls immediately execute the handler code no the receiving side (after a couple of callback indirections). There is no runtime and no threads and the send call will only return after the handling code has completed. + * + * It does not matter if the server or client is started first, connection is established immediately when the second one joins the connection. + */ // You like callback hell? Definitely callback hell. class SynchronousLocalConnection[T] { + + /** The server prepares by fullfillling the [[connectionEstablished]] promise, which contains a callback that allwows any number of clients to connect. The inner callback contains the [[Connection]] the server sends on, as well as a promise that the server completes immediately with it’s own receive handler. */ + object server extends LatentConnection[T] { + + case class Establish(serverSendsOn: Connection[T], clientConnectionSendsTo: Promise[Callback[T]]) + val connectionEstablished: Promise[Callback[Establish]] = Promise() + + def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Async.fromCallback[Establish] { + connectionEstablished.succeed(Async.handler) + }.map { connChan => + connChan.clientConnectionSendsTo.succeed(incomingHandler.getCallbackFor(connChan.serverSendsOn)) + connChan.serverSendsOn + } + } + + /** Clients create both the client side and server side connection object. */ def client(id: String): LatentConnection[T] = new LatentConnection[T] { + /** This promise is send (unfullfilled) to the server, to be completed with the callback that handles received messages on the server side. + * Thus, once completed, the inner callback directly executes whatever handler code was passed to the server. + */ val toServerMessages: Promise[Callback[T]] = Promise() object toServer extends Connection[T] { @@ -22,6 +46,7 @@ class SynchronousLocalConnection[T] { def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Async { val callback = incomingHandler.getCallbackFor(toServer) + /* This is the connection that is passed to the server, which just calls the callback defined by the handler. */ val toClient = new Connection[T] { override def close(): Unit = () override def send(message: T): Async[Any, Unit] = Sync { @@ -36,17 +61,4 @@ class SynchronousLocalConnection[T] { } } - object server extends LatentConnection[T] { - - case class Establish(serverSendsOn: Connection[T], clientConnectionSendsTo: Promise[Callback[T]]) - val connectionEstablished: Promise[Callback[Establish]] = Promise() - - def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] = Async.fromCallback[Establish] { - connectionEstablished.succeed(Async.handler) - }.map { connChan => - connChan.clientConnectionSendsTo.succeed(incomingHandler.getCallbackFor(connChan.serverSendsOn)) - connChan.serverSendsOn - } - } - }