Skip to content

Commit

Permalink
Add channel info collector
Browse files Browse the repository at this point in the history
This collector allows us to produce channel level metrics
  • Loading branch information
Christoph Petrausch committed Mar 25, 2020
1 parent 2b065f4 commit 49df915
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 21 deletions.
10 changes: 9 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,23 @@ func main() {
user := flag.String("user", "serveradmin", "the serverquery user of the ts3exporter")
password := flag.String("password", "", "the serverquery password of the ts3exporter")
passwordFile := flag.String("passwordfile", "/etc/ts3exporter/password", "file containing the password. Only read if -password not set. Must have 0600 permission.")
disableChannel := flag.Bool("disablechannel", false, "disables the channel collector. Disable the channel collector if it produces a too high label cardinality.")

flag.Parse()
c, err := serverquery.NewClient(*remote, *user, mustReadPassword(*password, *passwordFile))
if err != nil {
log.Fatalf("failed to init client %v\n", err)
}
sInfo := collector.NewServerInfo(c)
sqInfo := serverquery.NewVirtualServer(c)
sInfo := collector.NewServerInfo(sqInfo)
mc := collector.NewMultiCollector(sInfo)

if !*disableChannel {
cqInfo := serverquery.NewChannelView(c, sqInfo)
cInfo := collector.NewChannel(cqInfo)
mc.Add(cInfo)
}

prometheus.MustRegister(mc)
// The Handler function provides a default handler to expose metrics
// via an HTTP server. "/metrics" is the usual endpoint for that.
Expand Down
70 changes: 70 additions & 0 deletions pkg/collector/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package collector

import (
"github.com/hikhvar/ts3exporter/pkg/serverquery"
"github.com/prometheus/client_golang/prometheus"
)

const channelSubsystem = "channel"

var channelLabels = []string{virtualServerLabel, channelLabel}

type Channel struct {
channels ChannelInformer

ClientsOnline *prometheus.Desc
MaxClients *prometheus.Desc
Codec *prometheus.Desc
CodecQuality *prometheus.Desc
LatencyFactor *prometheus.Desc
Unencrypted *prometheus.Desc
Permanent *prometheus.Desc
SemiPermanent *prometheus.Desc
Default *prometheus.Desc
Password *prometheus.Desc
}

// A ChannelInformer knows how to collect the data from all the channels on the monitoring target
type ChannelInformer interface {
Refresh() error
All() []serverquery.Channel
}

func NewChannel(ci ChannelInformer) *Channel {
return &Channel{
channels: ci,
ClientsOnline: prometheus.NewDesc(fqdn(channelSubsystem, "clients_online"), "number of clients currently online", channelLabels, nil),
MaxClients: prometheus.NewDesc(fqdn(channelSubsystem, "max_clients"), "maximal number of clients allowed in this channel", channelLabels, nil),
Codec: prometheus.NewDesc(fqdn(channelSubsystem, "codec"), "numeric number of configured codec for this channel", channelLabels, nil),
CodecQuality: prometheus.NewDesc(fqdn(channelSubsystem, "codec_quality"), "numeric number of codec quality level chosen for this channel", channelLabels, nil),
LatencyFactor: prometheus.NewDesc(fqdn(channelSubsystem, "codec_latency_factor"), "numeric number of codec latency factor chosen for this channel", channelLabels, nil),
Unencrypted: prometheus.NewDesc(fqdn(channelSubsystem, "unencrypted"), "is the channel unencrypted", channelLabels, nil),
Permanent: prometheus.NewDesc(fqdn(channelSubsystem, "permanent"), "is the channel permanent", channelLabels, nil),
SemiPermanent: prometheus.NewDesc(fqdn(channelSubsystem, "semi_permanent"), "is the channel semi permanent", channelLabels, nil),
Default: prometheus.NewDesc(fqdn(channelSubsystem, "default"), "is the channel the default channel", channelLabels, nil),
Password: prometheus.NewDesc(fqdn(channelSubsystem, "password"), "is the channel saved by an password", channelLabels, nil),
}
}

func (c *Channel) Describe(desc chan<- *prometheus.Desc) {
prometheus.DescribeByCollect(c, desc)
}

func (c *Channel) Collect(met chan<- prometheus.Metric) {
for _, ch := range c.channels.All() {
met <- prometheus.MustNewConstMetric(c.ClientsOnline, prometheus.GaugeValue, float64(ch.ClientsOnline), ch.HostingServer.Name, ch.Name)
met <- prometheus.MustNewConstMetric(c.MaxClients, prometheus.GaugeValue, float64(ch.MaxClients), ch.HostingServer.Name, ch.Name)
met <- prometheus.MustNewConstMetric(c.Codec, prometheus.GaugeValue, float64(ch.Codec), ch.HostingServer.Name, ch.Name)
met <- prometheus.MustNewConstMetric(c.CodecQuality, prometheus.GaugeValue, float64(ch.CodecQuality), ch.HostingServer.Name, ch.Name)
met <- prometheus.MustNewConstMetric(c.LatencyFactor, prometheus.GaugeValue, float64(ch.LatencyFactor), ch.HostingServer.Name, ch.Name)
met <- prometheus.MustNewConstMetric(c.Unencrypted, prometheus.GaugeValue, float64(ch.Unencrypted), ch.HostingServer.Name, ch.Name)
met <- prometheus.MustNewConstMetric(c.Permanent, prometheus.GaugeValue, float64(ch.Permanent), ch.HostingServer.Name, ch.Name)
met <- prometheus.MustNewConstMetric(c.SemiPermanent, prometheus.GaugeValue, float64(ch.SemiPermanent), ch.HostingServer.Name, ch.Name)
met <- prometheus.MustNewConstMetric(c.Default, prometheus.GaugeValue, float64(ch.Default), ch.HostingServer.Name, ch.Name)
met <- prometheus.MustNewConstMetric(c.Password, prometheus.GaugeValue, float64(ch.Password), ch.HostingServer.Name, ch.Name)
}
}

func (c *Channel) Refresh() error {
return c.channels.Refresh()
}
8 changes: 8 additions & 0 deletions pkg/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ type TS3Collector interface {
Refresh() error
}

// A MultiCollector implements the prometheus.Collector interface. The MultiCollector triggers a Refresh on all managed
// TS3Collectors before calling their Collect function. Thus every TS3Collector should provide the current values upon
// scrape.
type MultiCollector struct {
collectors []TS3Collector
refreshErrors prometheus.Counter
Expand All @@ -28,6 +31,11 @@ func NewMultiCollector(collectors ...TS3Collector) *MultiCollector {
}
}

// Add adds a new TS3Collector to the collectors managed by this MultiCollector
func (m *MultiCollector) Add(c TS3Collector) {
m.collectors = append(m.collectors, c)
}

func (m *MultiCollector) Describe(c chan<- *prometheus.Desc) {
prometheus.DescribeByCollect(m, c)
}
Expand Down
14 changes: 10 additions & 4 deletions pkg/collector/serverinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (

const serverInfoSubsystem = "serverinfo"

var serverInfoLabels = []string{"virtualserver"}
var serverInfoLabels = []string{virtualServerLabel}

type ServerInfo struct {
vServerView *serverquery.VirtualServerView
vServerView ServerInfoInformer

ClientsOnline *prometheus.Desc
QueryClientsOnline *prometheus.Desc
Expand Down Expand Up @@ -39,9 +39,15 @@ type ServerInfo struct {
BytesReceivedTotal *prometheus.Desc
}

func NewServerInfo(c serverquery.Executor) *ServerInfo {
// A ServerInfoInformer knows how to collect the data from all virtual servers on the monitoring target
type ServerInfoInformer interface {
Refresh() error
All() []serverquery.VirtualServer
}

func NewServerInfo(c ServerInfoInformer) *ServerInfo {
return &ServerInfo{
vServerView: serverquery.NewVirtualServer(c),
vServerView: c,
ClientsOnline: prometheus.NewDesc(fqdn(serverInfoSubsystem, "clients_online"), "number of currently online clients", serverInfoLabels, nil),
QueryClientsOnline: prometheus.NewDesc(fqdn(serverInfoSubsystem, "query_clients_online"), "number of currently online query clients", serverInfoLabels, nil),
Online: prometheus.NewDesc(fqdn(serverInfoSubsystem, "online"), "is the virtualserver online", serverInfoLabels, nil),
Expand Down
2 changes: 2 additions & 0 deletions pkg/collector/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package collector
import "fmt"

const namespace = "ts3"
const virtualServerLabel = "virtualserver"
const channelLabel = "channel"

// fqdn generates a full qualified name of a metric. Given the subsystem and the name of the metric.
func fqdn(subsystem, name string) string {
Expand Down
127 changes: 127 additions & 0 deletions pkg/serverquery/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package serverquery

import "fmt"

type ChannelID int

type Channel struct {
HostingServer VirtualServer
stale bool
ID ChannelID `sq:"cid"`
PID int `sq:"pid"`
Order int `sq:"channel_order"`
Name string `sq:"channel_name"`
ClientsOnline int `sq:"total_clients"`
MaxClients int `sq:"channel_maxclients"`
Codec int `sq:"channel_codec"`
CodecQuality int `sq:"channel_codec_quality"`
LatencyFactor int `sq:"channel_codec_latency_factor"`
Unencrypted int `sq:"channel_codec_is_unencrypted"`
Permanent int `sq:"channel_flag_permanent"`
SemiPermanent int `sq:"channel_flag_semi_permanent"`
Default int `sq:"channel_flag_default"`
Password int `sq:"channel_flag_password"`
}

// VServerInventory can return all known VServer
type VServerInventory interface {
All() []VirtualServer
}

type ChannelView struct {
e Executor
channels map[ChannelID]Channel
vServer VServerInventory
}

func NewChannelView(e Executor, inventory VServerInventory) *ChannelView {
return &ChannelView{
e: e,
channels: make(map[ChannelID]Channel),
vServer: inventory,
}
}

// Refresh refreshes the internal representation of the ChannelView. It changes into all virtual server
// known by the vServer inventory
func (c *ChannelView) Refresh() error {
c.markAllStale()
defer c.deleteAllStale()
for _, vServer := range c.vServer.All() {
err := c.updateAllOnVServer(vServer)
if err != nil {
return fmt.Errorf("failed to update metrics on vServer %s: %w", vServer.Name, err)
}
}
return nil
}

// All returns all known Channels
func (c *ChannelView) All() []Channel {
ret := make([]Channel, 0, len(c.channels))
for _, ch := range c.channels {
ret = append(ret, ch)
}
return ret
}

// markAllStale marks all channels stale. They are set during scrape. If the aren't set, they will be deleted
// by deleteAllStale
func (c *ChannelView) markAllStale() {
for id, channel := range c.channels {
channel.stale = true
c.channels[id] = channel
}
}

// deleteAllStale deletes all stale channels
func (c *ChannelView) deleteAllStale() {
for id, channel := range c.channels {
if channel.stale {
delete(c.channels, id)
}
}
}

// updateAllOnVServer update all channels on the given virtual server
func (c *ChannelView) updateAllOnVServer(vServer VirtualServer) error {
_, err := c.e.Exec(fmt.Sprintf("use %d", vServer.ID))
if err != nil {
return fmt.Errorf("failed to use virtual server %d: %w", vServer.ID, err)
}
res, err := c.e.Exec("channellist")
if err != nil {
return fmt.Errorf("failed to list channels: %w", err)
}
for _, r := range res {
for _, i := range r.Items {
var ch Channel
if err := i.ReadInto(&ch); err != nil {
return fmt.Errorf("failed to parse channel from response: %w", err)
}
if err := c.getDetails(&ch); err != nil {
return fmt.Errorf("failed to parse details for channel %d: %w", ch.ID, err)
}
c.channels[ch.ID] = ch
}
}
return nil
}

// getDetails populates the given channels with details
func (c *ChannelView) getDetails(ch *Channel) error {
res, err := c.e.Exec(fmt.Sprintf("channelinfo cid=%d", ch.ID))
if err != nil {
return fmt.Errorf("failed to run channelinfo command: %w", err)
}
for _, r := range res {
if len(r.Items) != 1 {
return fmt.Errorf("expected exactly one channelinfo response got %d", len(r.Items))
}
if err = r.Items[0].ReadInto(ch); err != nil {
return fmt.Errorf("failed to parse channel response: %w", err)
}
return nil
}
return fmt.Errorf("reached unreachable code")
}
41 changes: 41 additions & 0 deletions pkg/serverquery/channel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package serverquery

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const channelInput = `pid=10 channel_name=Default\sChannel channel_topic=Default\sChannel\shas\sno\stopic channel_description=This\sis\sthe\sdefault\schannel channel_password channel_codec=4 channel_codec_quality=6 channel_maxclients=-1 channel_maxfamilyclients=-1 channel_order=11 channel_flag_permanent=1 channel_flag_semi_permanent=0 channel_flag_default=1 channel_flag_password=0 channel_codec_latency_factor=1 channel_codec_is_unencrypted=1 channel_security_salt channel_delete_delay=0 channel_unique_identifier=d1eb7624-664a-4809-9b7e-84596d937a6d channel_flag_maxclients_unlimited=1 channel_flag_maxfamilyclients_unlimited=1 channel_flag_maxfamilyclients_inherited=0 channel_filepath=files\/virtualserver_1\/channel_1 channel_needed_talk_power=0 channel_forced_silence=0 channel_name_phonetic channel_icon_id=0 channel_banner_gfx_url channel_banner_mode=0 seconds_empty=-1`

func TestChannelStructTags(t *testing.T) {
res, err := Parse(channelInput)
require.Nil(t, err)
require.Len(t, res.Items, 1)
ch := Channel{
ClientsOnline: 5,
ID: 1,
}
err = res.Items[0].ReadInto(&ch)
require.Nil(t, err)
expected := Channel{
HostingServer: VirtualServer{},
stale: false,
ID: 1,
PID: 10,
Order: 11,
Name: "Default Channel",
ClientsOnline: 5,
MaxClients: -1,
Codec: 4,
CodecQuality: 6,
LatencyFactor: 1,
Unencrypted: 1,
Permanent: 1,
SemiPermanent: 0,
Default: 1,
Password: 0,
}
assert.Equal(t, expected, ch)
}
35 changes: 19 additions & 16 deletions pkg/serverquery/virtualserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@ import (
"fmt"
)

type VirtualServerID int

type VirtualServer struct {
ID int `sq:"virtualserver_id"`
Port int `sq:"virtualserver_port"`
Name string `sq:"virtualserver_name"`
Status string `sq:"virtualserver_status"`
ClientsOnline int `sq:"virtualserver_clientsonline"`
QueryClientsOnline int `sq:"virtualserver_queryclientsonline"`
MaxClients int `sq:"virtualserver_maxclients"`
Uptime int `sq:"virtualserver_uptime"`
ChannelsOnline int `sq:"virtualserver_channelsonline"`
MaxDownloadTotalBandwidth float64 `sq:"virtualserver_max_download_total_bandwidth"`
MaxUploadTotalBandwidth float64 `sq:"virtualserver_max_upload_total_bandwidth"`
ClientsConnections int `sq:"virtualserver_client_connections"`
QueryClientsConnections int `sq:"virtualserver_queryclientsonline"`
ID VirtualServerID `sq:"virtualserver_id"`
Port int `sq:"virtualserver_port"`
Name string `sq:"virtualserver_name"`
Status string `sq:"virtualserver_status"`
ClientsOnline int `sq:"virtualserver_clientsonline"`
QueryClientsOnline int `sq:"virtualserver_queryclientsonline"`
MaxClients int `sq:"virtualserver_maxclients"`
Uptime int `sq:"virtualserver_uptime"`
ChannelsOnline int `sq:"virtualserver_channelsonline"`
MaxDownloadTotalBandwidth float64 `sq:"virtualserver_max_download_total_bandwidth"`
MaxUploadTotalBandwidth float64 `sq:"virtualserver_max_upload_total_bandwidth"`
ClientsConnections int `sq:"virtualserver_client_connections"`
QueryClientsConnections int `sq:"virtualserver_queryclientsonline"`

FileTransferBytesSentTotal int `sq:"connection_filetransfer_bytes_sent_total"`
FileTransferBytesReceivedTotal int `sq:"connection_filetransfer_bytes_received_total"`
Expand All @@ -37,18 +39,19 @@ type VirtualServer struct {

type VirtualServerView struct {
e Executor
vServer map[int]VirtualServer
vServer map[VirtualServerID]VirtualServer
}

func NewVirtualServer(e Executor) *VirtualServerView {
return &VirtualServerView{
e: e,
vServer: make(map[int]VirtualServer),
vServer: make(map[VirtualServerID]VirtualServer),
}
}

// Refresh refreshes the internal representation of the VirtualServerView
func (v *VirtualServerView) Refresh() error {
// FIXME: implement cleanup of stale vServers
res, err := v.e.Exec("serverlist")
if err != nil {
return fmt.Errorf("failed to list v servers: %w", err)
Expand All @@ -70,7 +73,7 @@ func (v *VirtualServerView) Refresh() error {
}

// getDetails uses the serverinfo serverquery command to get the details of the given virtualserver
func (v *VirtualServerView) getDetails(vServerID int) (VirtualServer, error) {
func (v *VirtualServerView) getDetails(vServerID VirtualServerID) (VirtualServer, error) {
_, err := v.e.Exec(fmt.Sprintf("use %d", vServerID))
if err != nil {
return VirtualServer{}, fmt.Errorf("failed to use virtual server %d: %w", vServerID, err)
Expand Down

0 comments on commit 49df915

Please sign in to comment.