Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add prometheus metrics. #540

Merged
merged 7 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@
<version>1.2.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jitsi</groupId>
<artifactId>jicoco-metrics</artifactId>
<version>1.1-133-g768ef2e</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down
55 changes: 19 additions & 36 deletions src/main/kotlin/org/jitsi/jibri/JibriManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.jitsi.jibri
import org.jitsi.jibri.config.Config
import org.jitsi.jibri.config.XmppCredentials
import org.jitsi.jibri.health.EnvironmentContext
import org.jitsi.jibri.metrics.JibriMetrics
import org.jitsi.jibri.selenium.CallParams
import org.jitsi.jibri.service.JibriService
import org.jitsi.jibri.service.JibriServiceStatusHandler
Expand All @@ -30,14 +31,6 @@ import org.jitsi.jibri.service.impl.SipGatewayJibriService
import org.jitsi.jibri.service.impl.SipGatewayServiceParams
import org.jitsi.jibri.service.impl.StreamingJibriService
import org.jitsi.jibri.service.impl.StreamingParams
import org.jitsi.jibri.statsd.ASPECT_BUSY
import org.jitsi.jibri.statsd.ASPECT_ERROR
import org.jitsi.jibri.statsd.ASPECT_START
import org.jitsi.jibri.statsd.ASPECT_STOP
import org.jitsi.jibri.statsd.JibriStatsDClient
import org.jitsi.jibri.statsd.TAG_SERVICE_LIVE_STREAM
import org.jitsi.jibri.statsd.TAG_SERVICE_RECORDING
import org.jitsi.jibri.statsd.TAG_SERVICE_SIP_GATEWAY
import org.jitsi.jibri.status.ComponentBusyStatus
import org.jitsi.jibri.status.ComponentHealthStatus
import org.jitsi.jibri.status.ComponentState
Expand Down Expand Up @@ -99,40 +92,23 @@ class JibriManager : StatusPublisher<Any>() {
private var pendingIdleFunc: () -> Unit = {}
private var serviceTimeoutTask: ScheduledFuture<*>? = null

private val enableStatsD: Boolean by config {
"JibriConfig::enableStatsD" { Config.legacyConfigSource.enabledStatsD!! }
"jibri.stats.enable-stats-d".from(Config.configSource)
}

private val statsdHost: String by config {
"jibri.stats.host".from(Config.configSource)
}

private val statsdPort: Int by config {
"jibri.stats.port".from(Config.configSource)
}

private val singleUseMode: Boolean by config {
"JibriConfig::singleUseMode" { Config.legacyConfigSource.singleUseMode!! }
"jibri.single-use-mode".from(Config.configSource)
}

val statsDClient: JibriStatsDClient? = if (enableStatsD) {
JibriStatsDClient(statsdHost, statsdPort)
} else {
null
}
val jibriMetrics = JibriMetrics()

/**
* Note: should only be called if the instance-wide lock is held (i.e. called from
* one of the synchronized methods)
* TODO: instead of the synchronized decorators, use a synchronized(this) block
* which we can also use here
*/
private fun throwIfBusy() {
private fun throwIfBusy(sinkType: RecordingSinkType) {
if (busy()) {
logger.info("Jibri is busy, can't start service")
statsDClient?.incrementCounter(ASPECT_BUSY, TAG_SERVICE_RECORDING)
jibriMetrics.requestWhileBusy(sinkType)
throw JibriBusyException()
}
}
Expand All @@ -148,7 +124,7 @@ class JibriManager : StatusPublisher<Any>() {
environmentContext: EnvironmentContext? = null,
serviceStatusHandler: JibriServiceStatusHandler? = null
) {
throwIfBusy()
throwIfBusy(RecordingSinkType.FILE)
logger.info("Starting a file recording with params: $fileRecordingRequestParams")
val service = FileRecordingJibriService(
FileRecordingParams(
Expand All @@ -158,7 +134,7 @@ class JibriManager : StatusPublisher<Any>() {
serviceParams.appData?.fileRecordingMetadata
)
)
statsDClient?.incrementCounter(ASPECT_START, TAG_SERVICE_RECORDING)
jibriMetrics.start(RecordingSinkType.FILE)
startService(service, serviceParams, environmentContext, serviceStatusHandler)
}

Expand All @@ -174,9 +150,9 @@ class JibriManager : StatusPublisher<Any>() {
serviceStatusHandler: JibriServiceStatusHandler? = null
) {
logger.info("Starting a stream with params: $serviceParams $streamingParams")
throwIfBusy()
throwIfBusy(RecordingSinkType.STREAM)
val service = StreamingJibriService(streamingParams)
statsDClient?.incrementCounter(ASPECT_START, TAG_SERVICE_LIVE_STREAM)
jibriMetrics.start(RecordingSinkType.STREAM)
startService(service, serviceParams, environmentContext, serviceStatusHandler)
}

Expand All @@ -188,15 +164,15 @@ class JibriManager : StatusPublisher<Any>() {
serviceStatusHandler: JibriServiceStatusHandler? = null
) {
logger.info("Starting a SIP gateway with params: $serviceParams $sipGatewayServiceParams")
throwIfBusy()
throwIfBusy(RecordingSinkType.GATEWAY)
val service = SipGatewayJibriService(
SipGatewayServiceParams(
sipGatewayServiceParams.callParams,
sipGatewayServiceParams.callLoginParams,
sipGatewayServiceParams.sipClientParams
)
)
statsDClient?.incrementCounter(ASPECT_START, TAG_SERVICE_SIP_GATEWAY)
jibriMetrics.start(RecordingSinkType.GATEWAY)
return startService(service, serviceParams, environmentContext, serviceStatusHandler)
}

Expand All @@ -219,7 +195,7 @@ class JibriManager : StatusPublisher<Any>() {
when (it) {
is ComponentState.Error -> {
if (it.error.scope == ErrorScope.SYSTEM) {
statsDClient?.incrementCounter(ASPECT_ERROR, JibriStatsDClient.getTagForService(jibriService))
jibriMetrics.error(jibriService.getSinkType())
publishStatus(ComponentHealthStatus.UNHEALTHY)
}
stopService()
Expand Down Expand Up @@ -270,7 +246,7 @@ class JibriManager : StatusPublisher<Any>() {
logger.info("No service active, ignoring stop")
return
}
statsDClient?.incrementCounter(ASPECT_STOP, JibriStatsDClient.getTagForService(currentService))
jibriMetrics.stop(currentService.getSinkType())
logger.info("Stopping the current service")
serviceTimeoutTask?.cancel(false)
// Note that this will block until the service is completely stopped
Expand Down Expand Up @@ -309,3 +285,10 @@ class JibriManager : StatusPublisher<Any>() {
}
}
}

private fun JibriService.getSinkType() = when (this) {
is FileRecordingJibriService -> RecordingSinkType.FILE
is StreamingJibriService -> RecordingSinkType.GATEWAY
is SipGatewayJibriService -> RecordingSinkType.GATEWAY
else -> throw IllegalArgumentException("JibriService of unsupported type: ${JibriService::class.java.name}")
}
6 changes: 4 additions & 2 deletions src/main/kotlin/org/jitsi/jibri/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ fun main(args: Array<String>) {
jibriStatusManager.addStatusHandler {
webhookClient.updateStatus(it)
}
jibriStatusManager.addStatusHandler {
jibriManager.jibriMetrics.updateStatus(it)
}
webhookSubscribers.forEach(webhookClient::addSubscriber)
val statusUpdaterTask = TaskPools.recurringTasksPool.scheduleAtFixedRate(
1,
Expand Down Expand Up @@ -144,8 +147,7 @@ fun main(args: Array<String>) {
val xmppApi = XmppApi(
jibriManager = jibriManager,
xmppConfigs = xmppEnvironments,
jibriStatusManager = jibriStatusManager,
jibriManager.statsDClient
jibriStatusManager = jibriStatusManager
)
xmppApi.start()

Expand Down
30 changes: 30 additions & 0 deletions src/main/kotlin/org/jitsi/jibri/api/http/HttpApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.jitsi.jibri.api.http

import io.ktor.http.ContentType
import io.ktor.http.HttpStatusCode
import io.ktor.serialization.jackson.jackson
import io.ktor.server.application.Application
Expand All @@ -24,10 +25,12 @@ import io.ktor.server.application.install
import io.ktor.server.plugins.contentnegotiation.ContentNegotiation
import io.ktor.server.request.receive
import io.ktor.server.response.respond
import io.ktor.server.response.respondText
import io.ktor.server.routing.get
import io.ktor.server.routing.post
import io.ktor.server.routing.route
import io.ktor.server.routing.routing
import io.prometheus.client.exporter.common.TextFormat
import jakarta.ws.rs.core.Response
import org.jitsi.jibri.FileRecordingRequestParams
import org.jitsi.jibri.JibriBusyException
Expand All @@ -36,6 +39,8 @@ import org.jitsi.jibri.RecordingSinkType
import org.jitsi.jibri.config.Config
import org.jitsi.jibri.config.XmppCredentials
import org.jitsi.jibri.health.JibriHealth
import org.jitsi.jibri.metrics.JibriMetricsContainer
import org.jitsi.jibri.metrics.StatsConfig
import org.jitsi.jibri.selenium.CallParams
import org.jitsi.jibri.service.JibriServiceStatusHandler
import org.jitsi.jibri.service.ServiceParams
Expand Down Expand Up @@ -130,6 +135,31 @@ class HttpApi(
call.respond(HttpStatusCode.OK)
}
}
if (StatsConfig.enablePrometheus) {
logger.info("Enabling prometheus interface at :$port/metrics")
get("/metrics") {
val accept = call.request.headers["Accept"]
when {
accept?.startsWith("application/openmetrics-text") == true ->
call.respondText(
JibriMetricsContainer.getPrometheusMetrics(TextFormat.CONTENT_TYPE_OPENMETRICS_100),
contentType = ContentType.parse(TextFormat.CONTENT_TYPE_OPENMETRICS_100)
)

accept?.startsWith("text/plain") == true ->
call.respondText(
JibriMetricsContainer.getPrometheusMetrics(TextFormat.CONTENT_TYPE_004),
contentType = ContentType.parse(TextFormat.CONTENT_TYPE_004)
)

else ->
call.respondText(
JibriMetricsContainer.jsonString,
contentType = ContentType.parse("application/json")
)
}
}
}
}
}

Expand Down
23 changes: 7 additions & 16 deletions src/main/kotlin/org/jitsi/jibri/api/xmpp/XmppApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ import org.jitsi.jibri.service.impl.SipGatewayServiceParams
import org.jitsi.jibri.service.impl.StreamingParams
import org.jitsi.jibri.service.impl.YOUTUBE_URL
import org.jitsi.jibri.sipgateway.SipClientParams
import org.jitsi.jibri.statsd.JibriStatsDClient
import org.jitsi.jibri.statsd.STOPPED_ON_XMPP_CLOSED
import org.jitsi.jibri.statsd.XMPP_CLOSED
import org.jitsi.jibri.statsd.XMPP_CLOSED_ON_ERROR
import org.jitsi.jibri.statsd.XMPP_CONNECTED
import org.jitsi.jibri.statsd.XMPP_PING_FAILED
import org.jitsi.jibri.statsd.XMPP_RECONNECTING
import org.jitsi.jibri.statsd.XMPP_RECONNECTION_FAILED
import org.jitsi.jibri.status.ComponentState
import org.jitsi.jibri.status.JibriStatus
import org.jitsi.jibri.status.JibriStatusManager
Expand Down Expand Up @@ -78,22 +70,21 @@ class XmppApi(
private val jibriManager: JibriManager,
private val xmppConfigs: List<XmppEnvironmentConfig>,
private val jibriStatusManager: JibriStatusManager,
private val statsDClient: JibriStatsDClient? = null
) : IQListener {
private val logger = createLogger()

private val connectionStateListener = object : ConnectionStateListener {
override fun connected(mucClient: MucClient) {
statsDClient?.incrementCounter(XMPP_CONNECTED, mucClient.tags())
jibriManager.jibriMetrics.xmppConnected(mucClient.tags())
}
override fun reconnecting(mucClient: MucClient) {
statsDClient?.incrementCounter(XMPP_RECONNECTING, mucClient.tags())
jibriManager.jibriMetrics.xmppReconnecting(mucClient.tags())
}
override fun reconnectionFailed(mucClient: MucClient) {
statsDClient?.incrementCounter(XMPP_RECONNECTION_FAILED, mucClient.tags())
jibriManager.jibriMetrics.xmppReconnectionFailed(mucClient.tags())
}
override fun pingFailed(mucClient: MucClient) {
statsDClient?.incrementCounter(XMPP_PING_FAILED, mucClient.tags())
jibriManager.jibriMetrics.xmppPingFailed(mucClient.tags())
}

/**
Expand All @@ -102,7 +93,7 @@ class XmppApi(
* recording is stopped.
*/
override fun closed(mucClient: MucClient) {
statsDClient?.incrementCounter(XMPP_CLOSED, mucClient.tags())
jibriManager.jibriMetrics.xmppClosed(mucClient.tags())
maybeStop(mucClient)
}

Expand All @@ -112,7 +103,7 @@ class XmppApi(
* recording is stopped.
*/
override fun closedOnError(mucClient: MucClient) {
statsDClient?.incrementCounter(XMPP_CLOSED_ON_ERROR, mucClient.tags())
jibriManager.jibriMetrics.xmppClosedOnError(mucClient.tags())
maybeStop(mucClient)
}

Expand All @@ -121,7 +112,7 @@ class XmppApi(
val environmentContext = createEnvironmentContext(xmppEnvironment, mucClient)
if (jibriManager.currentEnvironmentContext == environmentContext) {
logger.warn("XMPP disconnected, stopping.")
statsDClient?.incrementCounter(STOPPED_ON_XMPP_CLOSED, mucClient.tags())
jibriManager.jibriMetrics.stoppedOnXmppClosed(mucClient.tags())
jibriManager.stopService()
}
}
Expand Down
Loading
Loading