Skip to content

Commit

Permalink
#2400 Use dedicated stream topic for stream dispatcher subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Jul 1, 2022
1 parent d281061 commit 0e330c7
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 6 deletions.
2 changes: 1 addition & 1 deletion thehive/app/org/thp/thehive/services/AuditSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class AuditSrv @Inject() (
case Status.COMMIT =>
logger.debug("Sending audit to stream bus and to notification actor")
val auditIds = ids.map(_._2)
eventSrv.publish(StreamTopic())(AuditStreamMessage(auditIds: _*))
eventSrv.publish(StreamTopic.dispatcher)(AuditStreamMessage(auditIds: _*))
notificationActor ! AuditNotificationMessage(auditIds: _*)
case _ =>
}
Expand Down
4 changes: 2 additions & 2 deletions thehive/app/org/thp/thehive/services/FlowActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ class FlowActor extends Actor {
def fromDate: Date = new Date(System.currentTimeMillis() - maxAgeConfig.get.toMillis)

lazy val eventSrv: EventSrv = injector.getInstance(classOf[EventSrv])
override def preStart(): Unit = eventSrv.subscribe(StreamTopic(), self)
override def postStop(): Unit = eventSrv.unsubscribe(StreamTopic(), self)
override def preStart(): Unit = eventSrv.subscribe(StreamTopic.dispatcher, self)
override def postStop(): Unit = eventSrv.unsubscribe(StreamTopic.dispatcher, self)

def flowQuery(
caseId: Option[EntityIdOrName]
Expand Down
7 changes: 4 additions & 3 deletions thehive/app/org/thp/thehive/services/StreamSrv.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.thp.thehive.services

import akka.actor.{actorRef2Scala, Actor, ActorIdentity, ActorRef, ActorSystem, Cancellable, Identify, PoisonPill, Props}
import akka.actor.{Actor, ActorIdentity, ActorRef, ActorSystem, Cancellable, Identify, PoisonPill, Props}
import akka.pattern.{ask, AskTimeoutException}
import akka.serialization.Serializer
import akka.util.Timeout
Expand All @@ -26,7 +26,8 @@ import scala.util.{Random, Try}
sealed trait StreamMessage extends Serializable

object StreamTopic {
def apply(streamId: String = ""): String = if (streamId.isEmpty) "stream" else s"stream-$streamId"
def apply(streamId: String): String = s"stream-$streamId"
val dispatcher: String = "stream"
}

case class AuditStreamMessage(id: EntityId*) extends StreamMessage
Expand Down Expand Up @@ -192,7 +193,7 @@ class StreamSrv @Inject() (
)
logger.debug(s"Register stream actor ${streamActor.path}")
eventSrv.subscribe(StreamTopic(streamId), streamActor)
eventSrv.subscribe(StreamTopic(), streamActor)
eventSrv.subscribe(StreamTopic.dispatcher, streamActor)
streamId
}

Expand Down

0 comments on commit 0e330c7

Please sign in to comment.