Skip to content

Commit

Permalink
Merge pull request #510 from ScalaConsultants/remove-old-akka-cluster…
Browse files Browse the repository at this point in the history
…-support

Remove old cluster metrics collection code. It was replaced in otel-e…
  • Loading branch information
lgajowy authored Sep 26, 2022
2 parents 115ff6f + c699556 commit 1474266
Show file tree
Hide file tree
Showing 12 changed files with 22 additions and 800 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.metrics.Meter
import org.slf4j.LoggerFactory

import scala.reflect.ClassTag
import scala.reflect.classTag
import scala.util.Try

import io.scalac.mesmer.core.AkkaDispatcher
Expand Down Expand Up @@ -47,9 +45,7 @@ final class AkkaMonitoring(system: ActorSystem[_]) extends Extension {
private val meter: Meter = GlobalOpenTelemetry.getMeter("mesmer-akka")
private val actorSystemConfig: Config = system.settings.config
private val config: AkkaMonitoringConfig = AkkaMonitoringConfig.fromConfig(system.settings.config)
private val openTelemetryClusterMetricsMonitor: OpenTelemetryClusterMetricsMonitor =
OpenTelemetryClusterMetricsMonitor(meter, AkkaClusterModule.enabled, actorSystemConfig)
private val dispatcher = AkkaDispatcher.safeDispatcherSelector(system)
private val dispatcher = AkkaDispatcher.safeDispatcherSelector(system)

private def reflectiveIsInstanceOf(fqcn: String, ref: Any): Either[String, Unit] =
Try(Class.forName(fqcn)).toEither.left.map {
Expand All @@ -65,13 +61,6 @@ final class AkkaMonitoring(system: ActorSystem[_]) extends Extension {

startStreamMonitor()
}

if (autoStartConfig.akkaCluster) {
log.debug("Start akka cluster service")
startClusterEventsMonitor()
startClusterRegionsMonitor()
startSelfMemberMonitor()
}
}

private def startStreamMonitor(): Unit = {
Expand Down Expand Up @@ -99,30 +88,6 @@ final class AkkaMonitoring(system: ActorSystem[_]) extends Extension {
}
}

private def startSelfMemberMonitor(): Unit = startClusterMonitor(ClusterSelfNodeEventsActor)

private def startClusterEventsMonitor(): Unit = startClusterMonitor(ClusterEventsMonitor)

private def startClusterMonitor[T <: ClusterMonitorActor: ClassTag](
actor: T
): Unit = {
val name = classTag[T].runtimeClass.getSimpleName
clusterNodeName.fold {
log.error("ActorSystem is not properly configured to start cluster monitor of type {}", name)
} { _ =>
log.debug("Starting cluster monitor of type {}", name)
system.systemActorOf(
Behaviors
.supervise(actor(openTelemetryClusterMetricsMonitor))
.onFailure[Exception](SupervisorStrategy.restart),
name,
dispatcher
)
}
}

private def startClusterRegionsMonitor(): Unit = startClusterMonitor(ClusterRegionsMonitorActor)

private def startWithConfig[M <: Module](module: M, config: M#All[Boolean])(startUp: M#All[Boolean] => Unit)(implicit
traverse: Traverse[M#All]
): Unit =
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 1474266

Please sign in to comment.