Skip to content

Commit

Permalink
wip use of custom channels for todolist
Browse files Browse the repository at this point in the history
kinda works but not well
  • Loading branch information
rmgk committed Mar 3, 2024
1 parent 760e123 commit dcf3d56
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 222 deletions.

This file was deleted.

11 changes: 3 additions & 8 deletions Modules/Example Todolist/src/main/scala/todo/Codecs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import rdts.datatypes.contextual.ReplicatedList
import rdts.dotted.Dotted
import rdts.syntax.DeltaBuffer
import rdts.time.Dot
import loci.transmitter.IdenticallyTransmittable
import reactives.extra.replication.DeltaFor

object Codecs {

Expand Down Expand Up @@ -43,14 +41,11 @@ object Codecs {
DeltaBuffer(Dotted(ReplicatedList.empty[TaskRef]))
}

implicit val transmittableList: IdenticallyTransmittable[DeltaFor[ReplicatedList[TaskRef]]] =
IdenticallyTransmittable()
implicit val codectDeltaForTasklist: JsonValueCodec[DeltaFor[ReplicatedList[TaskRef]]] = JsonCodecMaker.make
implicit val codectDeltaForTasklist: JsonValueCodec[ReplicatedList[TaskRef]] = JsonCodecMaker.make(CodecMakerConfig.withMapAsArray(true))

implicit val codecDeltaForLWW: JsonValueCodec[DeltaFor[LastWriterWins[Option[TaskData]]]] = JsonCodecMaker.make
implicit val codecDeltaForLWW: JsonValueCodec[LastWriterWins[Option[TaskData]]] = JsonCodecMaker.make
implicit val codecDottedLWW: JsonValueCodec[Dotted[LastWriterWins[Option[TaskData]]]] = JsonCodecMaker.make

implicit val transmittableDeltaForLWW: IdenticallyTransmittable[DeltaFor[LastWriterWins[Option[TaskData]]]] =
IdenticallyTransmittable()

implicit val codecLww: JsonValueCodec[DeltaBuffer[Dotted[LastWriterWins[Option[TaskData]]]]] = JsonCodecMaker.make

Expand Down
72 changes: 72 additions & 0 deletions Modules/Example Todolist/src/main/scala/todo/GlobalRegistry.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package todo

import channel.{ArrayMessageBuffer, MessageBuffer, OutChan}
import com.github.plokhotnyuk.jsoniter_scala.core.{JsonValueCodec, readFromSubArray, writeToArray}
import loci.communicator.webrtc.WebRTCConnection
import rdts.base.Lattice
import rdts.dotted.Dotted
import rdts.syntax.DeltaBuffer
import reactives.default.*

import java.nio.charset.StandardCharsets

object GlobalRegistry {

case class Listener[A](evt: Evt[Dotted[A]], codec: JsonValueCodec[Dotted[A]])

private var listeners: Map[String, Listener[?]] = Map.empty

private var channels: List[OutChan] = Nil

def addConnection(connection: WebRTCConnection): Unit = {
println(s"adding channel")
channels = connection :: channels
}

def handle(message: MessageBuffer): Unit = {
val messageArray = message.asArray
val pos = messageArray.indexOfSlice("\r\n\r\n".getBytes(StandardCharsets.UTF_8))
if pos <= 0 then println(s"Received invalid message")
else
println(s"handling received message")
val name = messageArray.slice(0, pos)
listeners.get(new String(name, StandardCharsets.UTF_8)).foreach: listener =>
val decoded = readFromSubArray(messageArray, pos + 4, messageArray.length)(listener.codec)
listener.evt.fire(decoded)
}

def subscribe[A](name: String)(using
codec: JsonValueCodec[Dotted[A]],
lattice: Lattice[Dotted[A]]
): Event[Dotted[A]] = {

val incoming: Evt[Dotted[A]] = Evt()

listeners = listeners.updated(name, Listener(incoming, codec))

incoming
}

def subscribeBranch[A](name: String)(using
codec: JsonValueCodec[Dotted[A]],
lattice: Lattice[Dotted[A]]
): Fold.Branch[DeltaBuffer[Dotted[A]]] =
subscribe(name).act[DeltaBuffer[Dotted[A]]](delta => Fold.current.clearDeltas().applyDelta(delta))

def encode[A: JsonValueCodec](name: String, value: A) =
ArrayMessageBuffer(s"$name\r\n\r\n".getBytes(StandardCharsets.UTF_8) ++ writeToArray(value))

def publish[A](name: String, reactive: Signal[DeltaBuffer[Dotted[A]]])(using JsonValueCodec[Dotted[A]]) = {
println(s"publishing $name")
val initial = encode(name, reactive.now.state)
channels.foreach(c => c.send(initial))

reactive.observe: db =>
println(s"publishing changes of $name to $channels")
db.deltaBuffer.foreach: delta =>
println(s"change is $delta")
val msg = encode(name, delta)
channels.foreach(c => c.send(msg).run(println))

}
}
Loading

0 comments on commit dcf3d56

Please sign in to comment.