Skip to content

Commit

Permalink
simplify delta dissemination
Browse files Browse the repository at this point in the history
  • Loading branch information
rmgk committed Nov 24, 2024
1 parent 4cb9b68 commit 8ee130f
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object Todolist {

document.body.appendChild:
all.div.render.reattach(TodoDataManager.receivedCallback.map(_ =>
val state = TodoDataManager.dataManager.allDeltas.reduceOption(Lattice.merge)
val state = TodoDataManager.dataManager.allDeltas.map(_.data).reduceOption(Lattice.merge)
all.pre(all.stringFrag(pprint.apply(state).plainText)).render
).hold(all.span.render))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,15 @@ object ProtocolMessage {
/** Guarantees that for two payloads a and b, that if a.dots <= b.dots,
* then a.data <= b.data according to the lattice of T
*/
case class Payload[T](sender: Uid, dots: Dots, data: T) extends ProtocolMessage[T]
case class Payload[+T](senders: Set[Uid], dots: Dots, data: T) extends ProtocolMessage[T] {
def addSender(s: Uid) = copy(senders = senders + s)
}
object Payload {
def apply[T](sender: Uid, dots: Dots, data: T): Payload[T] = Payload(Set(sender), dots, data)

// this kinda makes sense, but kinda does not
// given [T: Lattice]: Lattice[Payload[T]] = Lattice.derived
}

case class Ping(time: Long) extends ProtocolMessage[Nothing]
case class Pong(time: Long) extends ProtocolMessage[Nothing]
Expand Down Expand Up @@ -62,8 +70,6 @@ class DeltaDissemination[State](

type ConnectionContext = Connection[ProtocolMessage[State]]

type TransferState = ProtocolDots[State]

val globalAbort = Abort()

var connections: List[ConnectionContext] = Nil
Expand Down Expand Up @@ -108,25 +114,25 @@ class DeltaDissemination[State](
}

// note that deltas are not guaranteed to be ordered the same in the buffers
val lock: AnyRef = new {}
private var localDeltas: List[TransferState] = Nil
private var localBuffer: List[TransferState] = Nil
private var remoteDeltas: List[TransferState] = Nil
val lock: AnyRef = new {}
private var localDeltas: List[Payload[State]] = Nil
private var remoteDeltas: List[Payload[State]] = Nil

private var contexts: Map[Uid, Dots] = Map.empty

def selfContext: Dots = contexts.getOrElse(replicaId.uid, Dots.empty)

def applyDelta(delta: State): Unit = lock.synchronized {
val nextDot = selfContext.nextDot(replicaId.uid)
val dotted = ProtocolDots(delta, Dots.single(nextDot))
localBuffer = dotted :: localBuffer
updateContext(replicaId.uid, dotted.context)
disseminateLocalBuffer()
}
def applyDelta(delta: State): Unit =
val payload = lock.synchronized {
val nextDot = selfContext.nextDot(replicaId.uid)
val payload = Payload(replicaId.uid, Dots.single(nextDot), delta)
updateContext(replicaId.uid, payload.dots)
payload
}
disseminate(payload)

def allDeltas: List[ProtocolDots[State]] = lock.synchronized {
List(localBuffer, remoteDeltas, localDeltas).flatten
def allDeltas: List[Payload[State]] = lock.synchronized {
List(remoteDeltas, localDeltas).flatten
}

def updateContext(rr: Uid, dots: Dots): Unit = lock.synchronized {
Expand All @@ -144,37 +150,27 @@ class DeltaDissemination[State](
case Pong(time) =>
println(s"ping took ${(System.nanoTime() - time.toLong).doubleValue / 1000_000}ms")
case Request(uid, knows) =>
val relevant = allDeltas.filterNot { dt => dt.context <= knows }
val relevant = allDeltas.filterNot { dt => dt.dots <= knows }
relevant.foreach: msg =>
biChan.send(Payload(replicaId.uid, msg.context, msg.data)).run(using ())(debugCallbackAndRemoveCon(biChan))
biChan.send(msg.addSender(replicaId.uid)).run(using ())(debugCallbackAndRemoveCon(biChan))
updateContext(uid, selfContext `merge` knows)
case Payload(uid, context, data) =>
case payload @ Payload(uid, context, data) =>
if context <= selfContext then return
val internalized = ProtocolDots[State](data, context)
lock.synchronized {
updateContext(uid, context)
uid.foreach { uid =>
updateContext(uid, context)
}
updateContext(replicaId.uid, context)
remoteDeltas = internalized :: remoteDeltas
remoteDeltas = payload :: remoteDeltas
}
receiveCallback(data)
if immediateForward then disseminateDeltas(List(internalized), List(biChan))

}
if immediateForward then disseminate(payload, Set(biChan))

def disseminateLocalBuffer(): Unit = {
val deltas: Seq[TransferState] = lock.synchronized {
val deltas = localBuffer
localBuffer = Nil
localDeltas = deltas ::: localDeltas
deltas
}
disseminateDeltas(deltas, Nil)
}

def disseminateDeltas(deltas: Seq[TransferState], except: Seq[ConnectionContext]): Unit = {
def disseminate(payload: Payload[State], except: Set[ConnectionContext] = Set.empty): Unit = {
connections.filterNot(con => except.contains(con)).foreach: con =>
deltas.foreach: delta =>
con.send(Payload(replicaId.uid, delta.context, delta.data)).run(using ())(debugCallbackAndRemoveCon(con))
con.send(payload).run(using ())(debugCallbackAndRemoveCon(con))
}

}

0 comments on commit 8ee130f

Please sign in to comment.