diff --git a/Modules/Examples/TodoMVC/src/main/scala/todo/Todolist.scala b/Modules/Examples/TodoMVC/src/main/scala/todo/Todolist.scala index a7e6fe866..3dc36076e 100644 --- a/Modules/Examples/TodoMVC/src/main/scala/todo/Todolist.scala +++ b/Modules/Examples/TodoMVC/src/main/scala/todo/Todolist.scala @@ -42,7 +42,7 @@ object Todolist { document.body.appendChild: all.div.render.reattach(TodoDataManager.receivedCallback.map(_ => - val state = TodoDataManager.dataManager.allDeltas.reduceOption(Lattice.merge) + val state = TodoDataManager.dataManager.allDeltas.map(_.data).reduceOption(Lattice.merge) all.pre(all.stringFrag(pprint.apply(state).plainText)).render ).hold(all.span.render)) diff --git a/Modules/Replication/shared/src/main/scala/replication/DeltaDissemination.scala b/Modules/Replication/shared/src/main/scala/replication/DeltaDissemination.scala index 14d5cfced..a00607510 100644 --- a/Modules/Replication/shared/src/main/scala/replication/DeltaDissemination.scala +++ b/Modules/Replication/shared/src/main/scala/replication/DeltaDissemination.scala @@ -23,7 +23,15 @@ object ProtocolMessage { /** Guarantees that for two payloads a and b, that if a.dots <= b.dots, * then a.data <= b.data according to the lattice of T */ - case class Payload[T](sender: Uid, dots: Dots, data: T) extends ProtocolMessage[T] + case class Payload[+T](senders: Set[Uid], dots: Dots, data: T) extends ProtocolMessage[T] { + def addSender(s: Uid) = copy(senders = senders + s) + } + object Payload { + def apply[T](sender: Uid, dots: Dots, data: T): Payload[T] = Payload(Set(sender), dots, data) + + // this kinda makes sense, but kinda does not + // given [T: Lattice]: Lattice[Payload[T]] = Lattice.derived + } case class Ping(time: Long) extends ProtocolMessage[Nothing] case class Pong(time: Long) extends ProtocolMessage[Nothing] @@ -62,8 +70,6 @@ class DeltaDissemination[State]( type ConnectionContext = Connection[ProtocolMessage[State]] - type TransferState = ProtocolDots[State] - val globalAbort = Abort() var connections: List[ConnectionContext] = Nil @@ -108,25 +114,25 @@ class DeltaDissemination[State]( } // note that deltas are not guaranteed to be ordered the same in the buffers - val lock: AnyRef = new {} - private var localDeltas: List[TransferState] = Nil - private var localBuffer: List[TransferState] = Nil - private var remoteDeltas: List[TransferState] = Nil + val lock: AnyRef = new {} + private var localDeltas: List[Payload[State]] = Nil + private var remoteDeltas: List[Payload[State]] = Nil private var contexts: Map[Uid, Dots] = Map.empty def selfContext: Dots = contexts.getOrElse(replicaId.uid, Dots.empty) - def applyDelta(delta: State): Unit = lock.synchronized { - val nextDot = selfContext.nextDot(replicaId.uid) - val dotted = ProtocolDots(delta, Dots.single(nextDot)) - localBuffer = dotted :: localBuffer - updateContext(replicaId.uid, dotted.context) - disseminateLocalBuffer() - } + def applyDelta(delta: State): Unit = + val payload = lock.synchronized { + val nextDot = selfContext.nextDot(replicaId.uid) + val payload = Payload(replicaId.uid, Dots.single(nextDot), delta) + updateContext(replicaId.uid, payload.dots) + payload + } + disseminate(payload) - def allDeltas: List[ProtocolDots[State]] = lock.synchronized { - List(localBuffer, remoteDeltas, localDeltas).flatten + def allDeltas: List[Payload[State]] = lock.synchronized { + List(remoteDeltas, localDeltas).flatten } def updateContext(rr: Uid, dots: Dots): Unit = lock.synchronized { @@ -144,37 +150,27 @@ class DeltaDissemination[State]( case Pong(time) => println(s"ping took ${(System.nanoTime() - time.toLong).doubleValue / 1000_000}ms") case Request(uid, knows) => - val relevant = allDeltas.filterNot { dt => dt.context <= knows } + val relevant = allDeltas.filterNot { dt => dt.dots <= knows } relevant.foreach: msg => - biChan.send(Payload(replicaId.uid, msg.context, msg.data)).run(using ())(debugCallbackAndRemoveCon(biChan)) + biChan.send(msg.addSender(replicaId.uid)).run(using ())(debugCallbackAndRemoveCon(biChan)) updateContext(uid, selfContext `merge` knows) - case Payload(uid, context, data) => + case payload @ Payload(uid, context, data) => if context <= selfContext then return - val internalized = ProtocolDots[State](data, context) lock.synchronized { - updateContext(uid, context) + uid.foreach { uid => + updateContext(uid, context) + } updateContext(replicaId.uid, context) - remoteDeltas = internalized :: remoteDeltas + remoteDeltas = payload :: remoteDeltas } receiveCallback(data) - if immediateForward then disseminateDeltas(List(internalized), List(biChan)) - - } + if immediateForward then disseminate(payload, Set(biChan)) - def disseminateLocalBuffer(): Unit = { - val deltas: Seq[TransferState] = lock.synchronized { - val deltas = localBuffer - localBuffer = Nil - localDeltas = deltas ::: localDeltas - deltas - } - disseminateDeltas(deltas, Nil) } - def disseminateDeltas(deltas: Seq[TransferState], except: Seq[ConnectionContext]): Unit = { + def disseminate(payload: Payload[State], except: Set[ConnectionContext] = Set.empty): Unit = { connections.filterNot(con => except.contains(con)).foreach: con => - deltas.foreach: delta => - con.send(Payload(replicaId.uid, delta.context, delta.data)).run(using ())(debugCallbackAndRemoveCon(con)) + con.send(payload).run(using ())(debugCallbackAndRemoveCon(con)) } }