Skip to content

Commit

Permalink
Merge pull request #1843 from dedis/work-be2-daniel-rumor-handler
Browse files Browse the repository at this point in the history
Add rumor handler
  • Loading branch information
t1b00 committed May 14, 2024
2 parents 970f510 + 43639b2 commit 16f42fc
Show file tree
Hide file tree
Showing 11 changed files with 680 additions and 28 deletions.
11 changes: 7 additions & 4 deletions be2-scala/src/main/scala/ch/epfl/pop/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ package ch.epfl.pop

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.adapter.*
import akka.actor.{ActorRef, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Directives.*
import akka.http.scaladsl.server.{RequestContext, RouteResult}
import akka.pattern.{AskableActorRef, ask}
import akka.util.Timeout
import ch.epfl.pop.authentication.{GetRequestHandler, PopchaWebSocketResponseHandler}
import ch.epfl.pop.config.RuntimeEnvironment
import ch.epfl.pop.config.RuntimeEnvironment._
import ch.epfl.pop.decentralized.{ConnectionMediator, Monitor}
import ch.epfl.pop.config.RuntimeEnvironment.*
import ch.epfl.pop.decentralized.{ConnectionMediator, GossipManager, Monitor}
import ch.epfl.pop.pubsub.{MessageRegistry, PubSubMediator, PublishSubscribe}
import ch.epfl.pop.storage.{DbActor, SecurityModuleActor}
import org.iq80.leveldb.Options
Expand Down Expand Up @@ -51,6 +51,7 @@ object Server {
// Create necessary actors for server-server communications
val monitorRef: ActorRef = system.actorOf(Monitor.props(dbActorRef))
val connectionMediatorRef: ActorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, messageRegistry))
val gossipManagerRef: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef, connectionMediatorRef))

// Setup routes
def publishSubscribeRoute: RequestContext => Future[RouteResult] = {
Expand All @@ -63,6 +64,7 @@ object Server {
messageRegistry,
monitorRef,
connectionMediatorRef,
gossipManagerRef,
isServer = false
)(system)
)
Expand All @@ -75,6 +77,7 @@ object Server {
messageRegistry,
monitorRef,
connectionMediatorRef,
gossipManagerRef,
isServer = true
)(system)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.WebSocketRequest
import akka.pattern.AskableActorRef
import ch.epfl.pop.decentralized.ConnectionMediator.{NewServerConnected, ReadPeersClientAddress, ReadPeersClientAddressAck}
import ch.epfl.pop.decentralized.ConnectionMediator.{GetRandomPeerAck, NewServerConnected, ReadPeersClientAddress, ReadPeersClientAddressAck}
import ch.epfl.pop.model.network.method.{GreetServer, Heartbeat, ParamsWithMap}
import ch.epfl.pop.model.network.{JsonRpcRequest, MethodType}
import ch.epfl.pop.pubsub.ClientActor.ClientAnswer
import ch.epfl.pop.pubsub.graph.validators.RpcValidator
import ch.epfl.pop.pubsub.{AskPatternConstants, MessageRegistry, PublishSubscribe}
import akka.pattern.ask

import scala.collection.immutable.HashMap
import scala.util.Random
final case class ConnectionMediator(
monitorRef: ActorRef,
mediatorRef: ActorRef,
Expand All @@ -22,6 +24,7 @@ final case class ConnectionMediator(
implicit val system: ActorSystem = ActorSystem()

private var serverMap: HashMap[ActorRef, GreetServer] = HashMap()
private var gossipManagerRef: AskableActorRef = _

// Ping Monitor to inform it of our ActorRef
monitorRef ! ConnectionMediator.Ping()
Expand All @@ -41,6 +44,7 @@ final case class ConnectionMediator(
messageRegistry,
monitorRef,
self,
gossipManagerRef,
isServer = true,
initGreetServer = true
)
Expand Down Expand Up @@ -78,6 +82,18 @@ final case class ConnectionMediator(
))
)
)

case ConnectionMediator.GetRandomPeer(excludes) =>
if (serverMap.isEmpty)
sender() ! ConnectionMediator.NoPeer()
else
val serverRefs = serverMap.filter((k, _) => !excludes.contains(k))
val randomKey = serverRefs.keys.toList(Random.nextInt(serverRefs.size))
sender() ! ConnectionMediator.GetRandomPeerAck(randomKey, serverRefs(randomKey))

case GossipManager.Ping() =>
gossipManagerRef = sender()

}
}

Expand All @@ -92,7 +108,10 @@ object ConnectionMediator {
final case class ServerLeft(serverRef: ActorRef) extends Event
final case class Ping() extends Event
final case class ReadPeersClientAddress() extends Event
final case class GetRandomPeer(excludes: List[ActorRef] = List.empty) extends Event

sealed trait ConnectionMediatorMessage
final case class ReadPeersClientAddressAck(list: List[String]) extends ConnectionMediatorMessage
final case class GetRandomPeerAck(serverRef: ActorRef, greetServer: GreetServer) extends ConnectionMediatorMessage
final case class NoPeer() extends ConnectionMediatorMessage
}
134 changes: 134 additions & 0 deletions be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package ch.epfl.pop.decentralized

import akka.NotUsed
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.event.slf4j.Logger
import akka.pattern.AskableActorRef
import akka.stream.scaladsl.Flow
import ch.epfl.pop.decentralized.GossipManager.MonitoredRumor
import ch.epfl.pop.decentralized.{ConnectionMediator, GossipManager}
import ch.epfl.pop.model.network.method.{GreetServer, Rumor}
import ch.epfl.pop.model.network.{JsonRpcRequest, JsonRpcResponse, MethodType}
import ch.epfl.pop.pubsub.AskPatternConstants
import ch.epfl.pop.pubsub.ClientActor.ClientAnswer
import ch.epfl.pop.pubsub.graph.{ErrorCodes, GraphMessage, PipelineError}
import ch.epfl.pop.storage.DbActor
import ch.epfl.pop.storage.DbActor.DbActorReadRumor

import scala.concurrent.Await
import scala.util.Random

final case class GossipManager(
dbActorRef: AskableActorRef,
monitorRef: ActorRef,
connectionMediator: AskableActorRef,
stopProbability: Double = 0.5
) extends Actor with AskPatternConstants with ActorLogging {

private type ServerInfos = (ActorRef, GreetServer)

monitorRef ! GossipManager.Ping()
connectionMediator ? GossipManager.Ping()

private var activeGossipProtocol: Map[JsonRpcRequest, List[ServerInfos]] = Map.empty

private def isRumorNew(rumor: Rumor): Boolean = {
val readRumorDb = dbActorRef ? DbActor.ReadRumor(rumor.senderPk -> rumor.rumorId)
Await.result(readRumorDb, duration) match
case DbActorReadRumor(foundRumors) =>
foundRumors.isEmpty
}

private def getPeersForRumor(jsonRpcRequest: JsonRpcRequest): List[ServerInfos] = {
val activeGossip = activeGossipProtocol.get(jsonRpcRequest)
activeGossip match
case Some(peersInfosList) => peersInfosList
case None => List.empty
}

private def sendRumorToRandomPeer(rumorRpc: JsonRpcRequest): Unit = {
// checks the peers to which we already forwarded the message
val activeGossip: List[ServerInfos] = getPeersForRumor(rumorRpc)
// selects a random peer from remaining peers
val randomPeer = connectionMediator ? ConnectionMediator.GetRandomPeer(activeGossip.map(_._1))
Await.result(randomPeer, duration) match {
// updates the list based on response
// if some peers are available we send
case ConnectionMediator.GetRandomPeerAck(serverRef, greetServer) =>
val alreadySent: List[ServerInfos] = activeGossip :+ (serverRef -> greetServer)
activeGossipProtocol += (rumorRpc -> alreadySent)
serverRef ! ClientAnswer(
Right(rumorRpc)
)
// else remove entry
case ConnectionMediator.NoPeer =>
activeGossipProtocol = activeGossipProtocol.removed(rumorRpc)
case _ =>
log.info(s"Actor $self received an unexpected message waiting for a random peer")
}
}

private def processResponse(response: JsonRpcResponse): Unit = {
val activeGossipPeers = activeGossipProtocol.filter((k, _) => k.id == response.id)

// response is expected because only one entry exists
if (activeGossipPeers.size == 1) {
activeGossipPeers.foreach { (rumorRpc, _) =>
if (response.result.isEmpty && Random.nextDouble() < stopProbability) {
activeGossipProtocol -= rumorRpc
} else {
sendRumorToRandomPeer(rumorRpc)
}
}
} else {
log.info(s"Unexpected match for active gossip. Response with id ${response.id} matched with ${activeGossipPeers.size} entries")
// removes duplicate entries to come back to a stable state
activeGossipPeers.foreach { (rumorRpc, _) =>
activeGossipProtocol -= rumorRpc
}
}
}

override def receive: Receive = {
case GossipManager.SendRumorToRandomPeer(rumorRpc) =>
sendRumorToRandomPeer(rumorRpc)

case GossipManager.ManageGossipResponse(jsonRpcResponse) =>
processResponse(jsonRpcResponse)

case _ =>
log.info(s"Actor $self received an unexpected message")
}

}

object GossipManager extends AskPatternConstants {
def props(dbActorRef: AskableActorRef, monitorRef: ActorRef, connectionMediatorRef: AskableActorRef): Props =
Props(new GossipManager(dbActorRef, monitorRef, connectionMediatorRef))

def gossipHandler(gossipManager: AskableActorRef): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map {
case Right(jsonRpcRequest: JsonRpcRequest) =>
jsonRpcRequest.method match
case MethodType.rumor =>
gossipManager ? SendRumorToRandomPeer(jsonRpcRequest)
Right(jsonRpcRequest)
case _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, "GossipManager received a non expected jsonRpcRequest", jsonRpcRequest.id))
case graphMessage @ _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, "GossipManager received an unexpected message:" + graphMessage, None))
}

def monitorResponse(gossipManager: AskableActorRef): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map {
case Right(jsonRpcResponse: JsonRpcResponse) =>
gossipManager ? ManageGossipResponse(jsonRpcResponse)
Right(jsonRpcResponse)
case graphMessage @ _ => graphMessage
}

sealed trait Event
final case class MonitoredRumor(jsonRpcRumor: JsonRpcRequest)
final case class SendRumorToRandomPeer(jsonRpcRequest: JsonRpcRequest)
final case class ManageGossipResponse(jsonRpcResponse: JsonRpcResponse)

sealed trait GossipManagerMessage
final case class Ping() extends GossipManagerMessage

}
13 changes: 11 additions & 2 deletions be2-scala/src/main/scala/ch/epfl/pop/decentralized/Monitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import akka.pattern.{AskableActorRef, ask}
import akka.stream.scaladsl.Sink
import ch.epfl.pop.config.RuntimeEnvironment.{readServerPeers, serverPeersListPath}
import ch.epfl.pop.decentralized.Monitor.TriggerHeartbeat
import ch.epfl.pop.model.network.JsonRpcRequest
import ch.epfl.pop.model.network.method.{Heartbeat, ParamsWithMap}
import ch.epfl.pop.model.network.{JsonRpcRequest, MethodType}
import ch.epfl.pop.model.network.method.{Heartbeat, Params, ParamsWithMap}
import ch.epfl.pop.model.objects.{Channel, Hash}
import ch.epfl.pop.pubsub.AskPatternConstants
import ch.epfl.pop.pubsub.graph.GraphMessage
Expand Down Expand Up @@ -43,6 +43,7 @@ final case class Monitor(
// Monitor is self-contained,
// To that end it doesn't know the ref of the connectionMediator
private var connectionMediatorRef = ActorRef.noSender
private var gossipManagerRef = ActorRef.noSender

private var fileMonitor: FileMonitor = _

Expand Down Expand Up @@ -75,7 +76,11 @@ final case class Monitor(
jsonRpcMessage.getParams match {
case _: ParamsWithMap => /* Actively ignoring this specific message */
// For any other message, we schedule a single heartbeat to reduce messages propagation delay

case _ =>
if (jsonRpcMessage.method == MethodType.rumor) {
gossipManagerRef ! GossipManager.MonitoredRumor(jsonRpcMessage)
}
if (someServerConnected && !timers.isTimerActive(singleHbKey)) {
log.info(s"Scheduling single heartbeat")
timers.startSingleTimer(singleHbKey, TriggerHeartbeat, messageDelay)
Expand All @@ -88,6 +93,10 @@ final case class Monitor(
fileMonitor = new FileMonitor(this.self)
new Thread(fileMonitor).start()

case GossipManager.Ping() =>
log.info("Received GossipManager Ping")
gossipManagerRef = sender()

case msg: ConnectionMediator.ConnectTo =>
connectionMediatorRef ! msg
/* ConnectTo message is sent by FileMonitor thread, which is not an actor, we thus forward it with a real actor to
Expand Down
20 changes: 13 additions & 7 deletions be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,25 @@ import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.pattern.{AskableActorRef, ask}
import akka.stream.FlowShape
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Partition, Sink}
import ch.epfl.pop.decentralized.Monitor
import ch.epfl.pop.decentralized.{GossipManager, Monitor}
import ch.epfl.pop.model.network.MethodType.*
import ch.epfl.pop.model.network.{JsonRpcRequest, JsonRpcResponse, MethodType}
import ch.epfl.pop.pubsub.graph.*
import ch.epfl.pop.pubsub.graph.handlers.{GetMessagesByIdResponseHandler, ParamsHandler, ParamsWithMapHandler, ParamsWithMessageHandler}
import ch.epfl.pop.pubsub.graph.handlers.{ProcessMessagesHandler, ParamsHandler, ParamsWithMapHandler, ParamsWithMessageHandler}

object PublishSubscribe {

private var dbActorRef: AskableActorRef = _
private var securityModuleActorRef: AskableActorRef = _
private var connectionMediatorRef: AskableActorRef = _
private var mediatorActorRef: ActorRef = _
private var gossipManager: AskableActorRef = _

def getDbActorRef: AskableActorRef = dbActorRef
def getSecurityModuleActorRef: AskableActorRef = securityModuleActorRef
def getConnectionMediatorRef: AskableActorRef = connectionMediatorRef
def getMediatorActorRef: ActorRef = mediatorActorRef
def getGossipManager: AskableActorRef = gossipManager

def buildGraph(
mediatorActorRefT: ActorRef,
Expand All @@ -31,6 +33,7 @@ object PublishSubscribe {
messageRegistry: MessageRegistry,
monitorRef: ActorRef,
connectionMediatorRefT: ActorRef,
gossipManager: AskableActorRef,
isServer: Boolean,
initGreetServer: Boolean = false
)(implicit system: ActorSystem): Flow[Message, Message, NotUsed] = Flow.fromGraph(GraphDSL.create() {
Expand Down Expand Up @@ -68,7 +71,9 @@ object PublishSubscribe {
))

val requestPartition = builder.add(validateRequests(clientActorRef, messageRegistry))
val responsePartition = builder.add(GetMessagesByIdResponseHandler.responseHandler(messageRegistry))

val gossipMonitorPartition = builder.add(GossipManager.monitorResponse(gossipManager))
val getMsgByIdResponsePartition = builder.add(ProcessMessagesHandler.getMsgByIdResponseHandler(messageRegistry))

// ResponseHandler messages do not go in the merger
val merger = builder.add(Merge[GraphMessage](totalPorts - 1))
Expand All @@ -87,7 +92,7 @@ object PublishSubscribe {

methodPartitioner.out(portPipelineError) ~> merger
methodPartitioner.out(portRpcRequest) ~> requestPartition ~> merger
methodPartitioner.out(portRpcResponse) ~> responsePartition ~> droppingSink
methodPartitioner.out(portRpcResponse) ~> gossipMonitorPartition ~> getMsgByIdResponsePartition ~> droppingSink

merger ~> broadcast
broadcast ~> jsonRpcAnswerGenerator ~> jsonRpcAnswerer ~> output
Expand All @@ -98,7 +103,7 @@ object PublishSubscribe {
}
})

def validateRequests(clientActorRef: ActorRef, messageRegistry: MessageRegistry): Flow[GraphMessage, GraphMessage, NotUsed] =
def validateRequests(clientActorRef: ActorRef, messageRegistry: MessageRegistry)(implicit system: ActorSystem): Flow[GraphMessage, GraphMessage, NotUsed] =
Flow.fromGraph(GraphDSL.create() {
implicit builder: GraphDSL.Builder[NotUsed] =>
{
Expand Down Expand Up @@ -147,7 +152,8 @@ object PublishSubscribe {
val heartbeatPartition = builder.add(ParamsWithMapHandler.heartbeatHandler(dbActorRef))
val getMessagesByIdPartition = builder.add(ParamsWithMapHandler.getMessagesByIdHandler(dbActorRef))
val greetServerPartition = builder.add(ParamsHandler.greetServerHandler(clientActorRef))
val rumorPartition = builder.add(ParamsHandler.rumorHandler(dbActorRef))
val rumorPartition = builder.add(ParamsHandler.rumorHandler(dbActorRef, messageRegistry))
val gossipManagerPartition = builder.add(GossipManager.gossipHandler(gossipManager))

val merger = builder.add(Merge[GraphMessage](totalPorts))

Expand All @@ -162,7 +168,7 @@ object PublishSubscribe {
methodPartitioner.out(portHeartbeat) ~> heartbeatPartition ~> merger
methodPartitioner.out(portGetMessagesById) ~> getMessagesByIdPartition ~> merger
methodPartitioner.out(portGreetServer) ~> greetServerPartition ~> merger
methodPartitioner.out(portRumor) ~> rumorPartition ~> merger
methodPartitioner.out(portRumor) ~> gossipManagerPartition ~> rumorPartition ~> merger

/* close the shape */
FlowShape(input.in, merger.out)
Expand Down
Loading

0 comments on commit 16f42fc

Please sign in to comment.