From c34282ff6ff77fec626fbf8af49671bb4e519f15 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Sun, 11 Feb 2018 20:16:05 +0100 Subject: [PATCH] Refactor metrics This patch refactors the metrics support within fabio to achieve several goals: * support labels for DogstatD, Prometheus and others * support raw events for statsd and others without aggregation * support multiple backends simultaneously to support migration * make integration of new metrics backends and/or libraries simple * being fully backwards compatible to not break existing setups One of the main challenges is that fabio generates names for the metrics targets through templates and that these templates are configurable. A metrics backend which supports labels needs to generate a different name for the same metric than one that does not support labels. Combining this with the need to support multiple different metrics backends at runtime means that none of the existing metrics libraries like go-kit/metrics and hashicorp/go-metrics is a good fit. However, they can be used as drivers which reduces the need for testing and makes integration of new backends simpler since fabio does not need to rely on a single metrics library. This patch is work in progress with the new metrics code in the metrics4 package and the old metrics code in the 'metrics_old' package where it is kept for reference until it has been migrated or removed. The basic architecture should be good enough to add more providers and functionality so that the community can help. Right now there are two providers "flat" and "label" to demonstrate the concepts. They provide counter and timers in statsd/dogstatd format and can be configured with -metrics.target=flat[,label]. Other providers should be added in the same way. The metrics_old code should be converted to the new provider scheme. The go-kit/metrics library currently supports most of the necessary drivers and is the preferred way of integrating new drivers. The list below is a probably incomplete list of things that need to be done: * The configuration is not used except for metrics.target * The drivers from the metrics_old package need to be migrated * Additional drivers from go-kit need to be added * The old rcrowley/go-metrics code relies on 'registries' for namespacing. After the routing table has changed the registry needs to be pruned of services which no longer exist so that go-metrics stops reporting them. * The RX/TX counters in the TCP and TCP+SNI proxy are probably sub-optimal or broken. * Some counters may not have been initialized properly, e.g. WSConn * WSConn should probably be a gauge which also needs to be set to 0 on startup. * The approach of injecting a noop metrics provider if none has been set has already bitten me once since some metrics are reported while others are not. I need to think about this a bit more. Fixes #126 Fixes #165 Fixes #211 Fixes #253 Fixes #326 Fixes #327 Fixes #371 Closes #329 Closes #331 Closes #334 Closes #335 Remove 'Register' methods and create objects directly. Add Gauge and use it for websocket connections Vendoring in github.com/alexcesaro/statsd First stab at integrating raw statsd from #335 Rebase off master 10/23/2020 - NJ --- admin/api/routes.go | 4 +- go.mod | 2 + go.sum | 4 + main.go | 159 +++++++---- metrics4/flat/metrics.go | 49 ++++ metrics4/label/metrics.go | 52 ++++ metrics4/metrics.go | 117 ++++++++ metrics4/names/names.go | 43 +++ metrics4/statsdraw/statsd.go | 73 +++++ {metrics => metrics_old}/circonus.go | 0 {metrics => metrics_old}/circonus_test.go | 0 {metrics => metrics_old}/gometrics.go | 0 {metrics => metrics_old}/metrics.go | 0 {metrics => metrics_old}/metrics_test.go | 0 {metrics => metrics_old}/noop.go | 0 {metrics => metrics_old}/registry.go | 0 proxy/grpc_handler.go | 21 +- proxy/http_proxy.go | 25 +- proxy/tcp/copy_buffer.go | 6 +- proxy/tcp/sni_proxy.go | 42 +-- proxy/tcp/tcp_dynamic_proxy.go | 28 +- proxy/tcp/tcp_proxy.go | 30 +- proxy/ws_handler.go | 17 +- route/route.go | 17 +- route/table.go | 43 --- route/table_registry_test.go | 114 ++++---- route/target.go | 7 +- .../github.com/alexcesaro/statsd/.travis.yml | 9 + .../github.com/alexcesaro/statsd/CHANGELOG.md | 64 +++++ vendor/github.com/alexcesaro/statsd/LICENSE | 20 ++ vendor/github.com/alexcesaro/statsd/README.md | 50 ++++ vendor/github.com/alexcesaro/statsd/conn.go | 270 ++++++++++++++++++ vendor/github.com/alexcesaro/statsd/doc.go | 29 ++ .../github.com/alexcesaro/statsd/options.go | 250 ++++++++++++++++ vendor/github.com/alexcesaro/statsd/statsd.go | 169 +++++++++++ vendor/modules.txt | 2 + 36 files changed, 1474 insertions(+), 242 deletions(-) create mode 100644 metrics4/flat/metrics.go create mode 100644 metrics4/label/metrics.go create mode 100644 metrics4/metrics.go create mode 100644 metrics4/names/names.go create mode 100644 metrics4/statsdraw/statsd.go rename {metrics => metrics_old}/circonus.go (100%) rename {metrics => metrics_old}/circonus_test.go (100%) rename {metrics => metrics_old}/gometrics.go (100%) rename {metrics => metrics_old}/metrics.go (100%) rename {metrics => metrics_old}/metrics_test.go (100%) rename {metrics => metrics_old}/noop.go (100%) rename {metrics => metrics_old}/registry.go (100%) create mode 100644 vendor/github.com/alexcesaro/statsd/.travis.yml create mode 100644 vendor/github.com/alexcesaro/statsd/CHANGELOG.md create mode 100644 vendor/github.com/alexcesaro/statsd/LICENSE create mode 100644 vendor/github.com/alexcesaro/statsd/README.md create mode 100644 vendor/github.com/alexcesaro/statsd/conn.go create mode 100644 vendor/github.com/alexcesaro/statsd/doc.go create mode 100644 vendor/github.com/alexcesaro/statsd/options.go create mode 100644 vendor/github.com/alexcesaro/statsd/statsd.go diff --git a/admin/api/routes.go b/admin/api/routes.go index 10c2ac373..78bc40a63 100644 --- a/admin/api/routes.go +++ b/admin/api/routes.go @@ -59,8 +59,8 @@ func (h *RoutesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { Weight: tg.Weight, Tags: tg.Tags, Cmd: "route add", - Rate1: tg.Timer.Rate1(), - Pct99: tg.Timer.Percentile(0.99), + // Rate1: tg.Timer.Rate1(), + // Pct99: tg.Timer.Percentile(0.99), } routes = append(routes, ar) } diff --git a/go.mod b/go.mod index 73e873b12..ca6993045 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/DataDog/datadog-go v0.0.0-20180822151419-281ae9f2d895 // indirect github.com/Shopify/sarama v1.19.0 // indirect github.com/Shopify/toxiproxy v2.1.3+incompatible // indirect + github.com/alexcesaro/statsd v2.0.0+incompatible github.com/apache/thrift v0.0.0-20181028152738-da1169d75b15 // indirect github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect github.com/armon/go-proxyproto v0.0.0-20180202201750-5b7edb60ff5f @@ -102,6 +103,7 @@ require ( google.golang.org/appengine v1.3.0 // indirect google.golang.org/genproto v0.0.0-20190219182410-082222b4a5c5 // indirect google.golang.org/grpc v1.16.0 + gopkg.in/alexcesaro/statsd.v2 v2.0.0 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/vmihailenco/msgpack.v2 v2.9.1 // indirect labix.org/v2/mgo v0.0.0-20140701140051-000000000287 // indirect diff --git a/go.sum b/go.sum index 58bcf22b7..563bffd7e 100644 --- a/go.sum +++ b/go.sum @@ -13,6 +13,8 @@ github.com/Shopify/sarama v1.19.0 h1:9oksLxC6uxVPHPVYUmq6xhr1BOF/hHobWH2UzO67z1s github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.3+incompatible h1:awiJqUYH4q4OmoBiRccJykjd7B+w0loJi2keSna4X/M= github.com/Shopify/toxiproxy v2.1.3+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/alexcesaro/statsd v2.0.0+incompatible h1:HG17k1Qk8V1F4UOoq6tx+IUoAbOcI5PHzzEUGeDD72w= +github.com/alexcesaro/statsd v2.0.0+incompatible/go.mod h1:vNepIbQAiyLe1j480173M6NYYaAsGwEcvuDTU3OCUGY= github.com/apache/thrift v0.0.0-20181028152738-da1169d75b15 h1:GGI45yUJ942DZsI3uodzVabeRUbxPsioScY/EmmE5ek= github.com/apache/thrift v0.0.0-20181028152738-da1169d75b15/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I= @@ -281,6 +283,8 @@ google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmE google.golang.org/grpc v1.15.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= google.golang.org/grpc v1.16.0 h1:dz5IJGuC2BB7qXR5AyHNwAUBhZscK2xVez7mznh72sY= google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= +gopkg.in/alexcesaro/statsd.v2 v2.0.0 h1:FXkZSCZIH17vLCO5sO2UucTHsH9pc+17F6pl3JVCwMc= +gopkg.in/alexcesaro/statsd.v2 v2.0.0/go.mod h1:i0ubccKGzBVNBpdGV5MocxyA/XlLUJzA7SLonnE4drU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/main.go b/main.go index e37fb9b30..4b160e82b 100644 --- a/main.go +++ b/main.go @@ -24,7 +24,10 @@ import ( "github.com/fabiolb/fabio/config" "github.com/fabiolb/fabio/exit" "github.com/fabiolb/fabio/logger" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" + "github.com/fabiolb/fabio/metrics4/flat" + "github.com/fabiolb/fabio/metrics4/label" + "github.com/fabiolb/fabio/metrics4/statsdraw" "github.com/fabiolb/fabio/noroute" "github.com/fabiolb/fabio/proxy" "github.com/fabiolb/fabio/proxy/tcp" @@ -120,9 +123,7 @@ func main() { registry.Default.DeregisterAll() }) - // init metrics early since that create the global metric registries - // that are used by other parts of the code. - initMetrics(cfg) + metrics := initMetrics(cfg) initRuntime(cfg) initBackend(cfg) @@ -134,12 +135,12 @@ func main() { go watchNoRouteHTML(cfg) first := make(chan bool) - go watchBackend(cfg, first) + go watchBackend(cfg, metrics, first) log.Print("[INFO] Waiting for first routing table") <-first // create proxies after metrics since they use the metrics registry. - startServers(cfg) + startServers(cfg, metrics) // warn again so that it is visible in the terminal WarnIfRunAsRoot(cfg.Insecure) @@ -148,15 +149,16 @@ func main() { log.Print("[INFO] Down") } -func newGrpcProxy(cfg *config.Config, tlscfg *tls.Config) []grpc.ServerOption { +func newGrpcProxy(cfg *config.Config, tlscfg *tls.Config, stats metrics4.Provider) []grpc.ServerOption { //Init Glob Cache globCache := route.NewGlobCache(cfg.GlobCacheSize) statsHandler := &proxy.GrpcStatsHandler{ - Connect: metrics.DefaultRegistry.GetCounter("grpc.conn"), - Request: metrics.DefaultRegistry.GetTimer("grpc.requests"), - NoRoute: metrics.DefaultRegistry.GetCounter("grpc.noroute"), + Connect: stats.NewCounter("grpc.conn"), + Request: stats.NewTimer("grpc.requests"), + NoRoute: stats.NewCounter("grpc.noroute"), + Metrics: stats, } proxyInterceptor := proxy.GrpcProxyInterceptor{ @@ -175,7 +177,7 @@ func newGrpcProxy(cfg *config.Config, tlscfg *tls.Config) []grpc.ServerOption { } } -func newHTTPProxy(cfg *config.Config) http.Handler { +func newHTTPProxy(cfg *config.Config, stats metrics4.Provider) *proxy.HTTPProxy { var w io.Writer //Init Glob Cache @@ -206,7 +208,7 @@ func newHTTPProxy(cfg *config.Config) http.Handler { pick := route.Picker[cfg.Proxy.Strategy] match := route.Matcher[cfg.Proxy.Matcher] - notFound := metrics.DefaultRegistry.GetCounter("notfound") + notFound := stats.NewCounter("notfound") log.Printf("[INFO] Using routing strategy %q", cfg.Proxy.Strategy) log.Printf("[INFO] Using route matching %q", cfg.Proxy.Matcher) @@ -235,26 +237,27 @@ func newHTTPProxy(cfg *config.Config) http.Handler { Lookup: func(r *http.Request) *route.Target { t := route.GetTable().Lookup(r, r.Header.Get("trace"), pick, match, globCache, cfg.GlobMatchingDisabled) if t == nil { - notFound.Inc(1) + notFound.Count(1) log.Print("[WARN] No route for ", r.Host, r.URL) } return t }, - Requests: metrics.DefaultRegistry.GetTimer("requests"), - Noroute: metrics.DefaultRegistry.GetCounter("notfound"), + Requests: stats.NewTimer("requests"), + Noroute: stats.NewCounter("notfound"), Logger: l, TracerCfg: cfg.Tracing, AuthSchemes: authSchemes, + WSConn: stats.NewGauge("ws.conn"), + Metrics: stats, } } -func lookupHostFn(cfg *config.Config) func(string) *route.Target { +func lookupHostFn(cfg *config.Config, notFound metrics4.Counter) func(string) *route.Target { pick := route.Picker[cfg.Proxy.Strategy] - notFound := metrics.DefaultRegistry.GetCounter("notfound") return func(host string) *route.Target { t := route.GetTable().LookupHost(host, pick) if t == nil { - notFound.Inc(1) + notFound.Count(1) log.Print("[WARN] No route for ", host) } return t @@ -321,7 +324,7 @@ func startAdmin(cfg *config.Config) { }() } -func startServers(cfg *config.Config) { +func startServers(cfg *config.Config, stats metrics4.Provider) { for _, l := range cfg.Listen { l := l // capture loop var for go routines below tlscfg, err := makeTLSConfig(l) @@ -334,17 +337,20 @@ func startServers(cfg *config.Config) { log.Printf("[INFO] Client certificate authentication enabled on %s", l.Addr) } + notFound := stats.NewCounter("notfound") switch l.Proto { case "http", "https": go func() { - h := newHTTPProxy(cfg) + h := newHTTPProxy(cfg, stats) + // reset the ws.conn gauge + h.WSConn.Update(0) if err := proxy.ListenAndServeHTTP(l, h, tlscfg); err != nil { exit.Fatal("[FATAL] ", err) } }() case "grpc", "grpcs": go func() { - h := newGrpcProxy(cfg, tlscfg) + h := newGrpcProxy(cfg, tlscfg, stats) if err := proxy.ListenAndServeGRPC(l, h, tlscfg); err != nil { exit.Fatal("[FATAL] ", err) } @@ -353,10 +359,11 @@ func startServers(cfg *config.Config) { go func() { h := &tcp.Proxy{ DialTimeout: cfg.Proxy.DialTimeout, - Lookup: lookupHostFn(cfg), - Conn: metrics.DefaultRegistry.GetCounter("tcp.conn"), - ConnFail: metrics.DefaultRegistry.GetCounter("tcp.connfail"), - Noroute: metrics.DefaultRegistry.GetCounter("tcp.noroute"), + Lookup: lookupHostFn(cfg, notFound), + Conn: stats.NewCounter("tcp.conn"), + ConnFail: stats.NewCounter("tcp.connfail"), + Noroute: stats.NewCounter("tcp.noroute"), + Metrics: stats, } if err := proxy.ListenAndServeTCP(l, h, tlscfg); err != nil { exit.Fatal("[FATAL] ", err) @@ -366,10 +373,11 @@ func startServers(cfg *config.Config) { go func() { h := &tcp.SNIProxy{ DialTimeout: cfg.Proxy.DialTimeout, - Lookup: lookupHostFn(cfg), - Conn: metrics.DefaultRegistry.GetCounter("tcp_sni.conn"), - ConnFail: metrics.DefaultRegistry.GetCounter("tcp_sni.connfail"), - Noroute: metrics.DefaultRegistry.GetCounter("tcp_sni.noroute"), + Lookup: lookupHostFn(cfg, notFound), + Conn: stats.NewCounter("tcp_sni.conn"), + ConnFail: stats.NewCounter("tcp_sni.connfail"), + Noroute: stats.NewCounter("tcp_sni.noroute"), + Metrics: stats, } if err := proxy.ListenAndServeTCP(l, h, tlscfg); err != nil { exit.Fatal("[FATAL] ", err) @@ -408,10 +416,11 @@ func startServers(cfg *config.Config) { go func() { h := &tcp.DynamicProxy{ DialTimeout: cfg.Proxy.DialTimeout, - Lookup: lookupHostFn(cfg), - Conn: metrics.DefaultRegistry.GetCounter("tcp.conn"), - ConnFail: metrics.DefaultRegistry.GetCounter("tcp.connfail"), - Noroute: metrics.DefaultRegistry.GetCounter("tcp.noroute"), + Lookup: lookupHostFn(cfg, notFound), + Conn: stats.NewCounter("tcp.conn"), + ConnFail: stats.NewCounter("tcp.connfail"), + Noroute: stats.NewCounter("tcp.noroute"), + Metrics: stats, } l.Addr = port if err := proxy.ListenAndServeTCP(l, h, tlscfg); err != nil { @@ -423,13 +432,14 @@ func startServers(cfg *config.Config) { }() case "https+tcp+sni": go func() { - hp := newHTTPProxy(cfg) + hp := newHTTPProxy(cfg, stats) tp := &tcp.SNIProxy{ DialTimeout: cfg.Proxy.DialTimeout, - Lookup: lookupHostFn(cfg), - Conn: metrics.DefaultRegistry.GetCounter("tcp_sni.conn"), - ConnFail: metrics.DefaultRegistry.GetCounter("tcp_sni.connfail"), - Noroute: metrics.DefaultRegistry.GetCounter("tcp_sni.noroute"), + Lookup: lookupHostFn(cfg, notFound), + Conn: stats.NewCounter("tcp_sni.conn"), + ConnFail: stats.NewCounter("tcp_sni.connfail"), + Noroute: stats.NewCounter("tcp_sni.noroute"), + Metrics: stats, } if err := proxy.ListenAndServeHTTPSTCPSNI(l, hp, tp, tlscfg, lookupHostMatcher(cfg)); err != nil { exit.Fatal("[FATAL] ", err) @@ -441,31 +451,33 @@ func startServers(cfg *config.Config) { } } -func initMetrics(cfg *config.Config) { - if cfg.Metrics.Target == "" { - log.Printf("[INFO] Metrics disabled") - return - } - - var deadline = time.Now().Add(cfg.Metrics.Timeout) - var err error - for { - metrics.DefaultRegistry, err = metrics.NewRegistry(cfg.Metrics) - if err == nil { - route.ServiceRegistry, err = metrics.NewRegistry(cfg.Metrics) - } - if err == nil { - return - } - if time.Now().After(deadline) { - exit.Fatal("[FATAL] ", err) - } - log.Print("[WARN] Error initializing metrics. ", err) - time.Sleep(cfg.Metrics.Retry) - if atomic.LoadInt32(&shuttingDown) > 0 { - exit.Exit(1) +func initMetrics(cfg *config.Config) metrics4.Provider { + var p []metrics4.Provider + for _, x := range strings.Split(cfg.Metrics.Target, ",") { + x = strings.TrimSpace(x) + switch x { + case "flat": + p = append(p, &flat.Provider{}) + case "label": + p = append(p, &label.Provider{}) + case "statsd_raw": + // prefix := cfg.Metrics.Prefix // prefix is a template and needs to be expanded + prefix := "" + pp, err := statsdraw.NewProvider(prefix, cfg.Metrics.StatsDAddr, cfg.Metrics.Interval) + if err != nil { + exit.Fatalf("[FATAL] Cannot initialize statsd metrics: %s", err) + } + p = append(p, pp) + default: + log.Printf("[WARN] Skipping unknown metrics provider %q", x) + continue } + log.Printf("[INFO] Registering metrics provider %q", x) } + if len(p) == 0 { + log.Printf("[INFO] Metrics disabled") + } + return metrics4.NewMultiProvider(p) } func initRuntime(cfg *config.Config) { @@ -519,7 +531,7 @@ func initBackend(cfg *config.Config) { } } -func watchBackend(cfg *config.Config, first chan bool) { +func watchBackend(cfg *config.Config, p metrics4.Provider, first chan bool) { var ( nextTable string lastTable string @@ -580,6 +592,29 @@ func watchBackend(cfg *config.Config, first chan bool) { } } +func unregisterMetrics(p metrics4.Provider, oldTable, newTable route.Table) { + names := func(t route.Table) map[string]bool { + m := map[string]bool{} + for _, routes := range t { + for _, r := range routes { + for _, t := range r.Targets { + m[t.TimerName.String()] = true + } + } + } + return m + } + + oldNames := names(oldTable) + newNames := names(newTable) + for n := range oldNames { + if !newNames[n] { + log.Printf("[INFO] Unregistering metric %s", n) + p.Unregister(n) + } + } +} + func watchNoRouteHTML(cfg *config.Config) { html := registry.Default.WatchNoRouteHTML() for { diff --git a/metrics4/flat/metrics.go b/metrics4/flat/metrics.go new file mode 100644 index 000000000..5abdaf71e --- /dev/null +++ b/metrics4/flat/metrics.go @@ -0,0 +1,49 @@ +package flat + +import ( + "fmt" + "time" + + "github.com/fabiolb/fabio/metrics4" + "github.com/fabiolb/fabio/metrics4/names" +) + +type Provider struct{} + +func (p *Provider) NewCounter(name string, labels ...string) metrics4.Counter { + return &Counter{Name: names.Flatten(name, labels, names.DotSeparator)} +} + +func (p *Provider) NewGauge(name string, labels ...string) metrics4.Gauge { + return &Gauge{Name: names.Flatten(name, labels, names.DotSeparator)} +} + +func (p *Provider) NewTimer(name string, labels ...string) metrics4.Timer { + return &Timer{Name: names.Flatten(name, labels, names.DotSeparator)} +} + +func (p *Provider) Unregister(interface{}) {} + +type Counter struct { + Name string +} + +func (c *Counter) Count(n int) { + fmt.Printf("%s:%d|c\n", c.Name, n) +} + +type Gauge struct { + Name string +} + +func (g *Gauge) Update(n int) { + fmt.Printf("%s:%d|g\n", g.Name, n) +} + +type Timer struct { + Name string +} + +func (t *Timer) Update(d time.Duration) { + fmt.Printf("%s:%d|ms\n", t.Name, d/time.Millisecond) +} diff --git a/metrics4/label/metrics.go b/metrics4/label/metrics.go new file mode 100644 index 000000000..4b18356e0 --- /dev/null +++ b/metrics4/label/metrics.go @@ -0,0 +1,52 @@ +package label + +import ( + "fmt" + "time" + + "github.com/fabiolb/fabio/metrics4" + "github.com/fabiolb/fabio/metrics4/names" +) + +type Provider struct{} + +func (p *Provider) NewCounter(name string, labels ...string) metrics4.Counter { + return &Counter{Name: name, Labels: labels} +} + +func (p *Provider) NewGauge(name string, labels ...string) metrics4.Gauge { + return &Gauge{Name: name, Labels: labels} +} + +func (p *Provider) NewTimer(name string, labels ...string) metrics4.Timer { + return &Timer{Name: name, Labels: labels} +} + +func (p *Provider) Unregister(interface{}) {} + +type Counter struct { + Name string + Labels []string +} + +func (c *Counter) Count(n int) { + fmt.Printf("%s:%d|c%s\n", c.Name, n, names.Labels(c.Labels, "|#", ":", ",")) +} + +type Gauge struct { + Name string + Labels []string +} + +func (g *Gauge) Update(n int) { + fmt.Printf("%s:%d|g%s\n", g.Name, n, names.Labels(g.Labels, "|#", ":", ",")) +} + +type Timer struct { + Name string + Labels []string +} + +func (t *Timer) Update(d time.Duration) { + fmt.Printf("%s:%d|ns%s\n", t.Name, d.Nanoseconds(), names.Labels(t.Labels, "|#", ":", ",")) +} diff --git a/metrics4/metrics.go b/metrics4/metrics.go new file mode 100644 index 000000000..bfa79caa9 --- /dev/null +++ b/metrics4/metrics.go @@ -0,0 +1,117 @@ +package metrics4 + +import ( + "time" +) + +// Provider is an abstraction of a metrics backend. +type Provider interface { + // NewCounter creates a new counter object. + NewCounter(name string, labels ...string) Counter + + // NewGauge creates a new gauge object. + NewGauge(name string, labels ...string) Gauge + + // NewTimer creates a new timer object. + NewTimer(name string, labels ...string) Timer + + // Unregister removes a previously registered + // name or metric. Required for go-metrics and + // service pruning. This signature is probably not + // correct. + Unregister(v interface{}) +} + +// MultiProvider wraps zero or more providers. +type MultiProvider struct { + p []Provider +} + +func NewMultiProvider(p []Provider) *MultiProvider { + return &MultiProvider{p} +} + +// NewCounter creates a MultiCounter with counter objects for all registered +// providers. +func (mp *MultiProvider) NewCounter(name string, labels ...string) Counter { + var c []Counter + for _, p := range mp.p { + c = append(c, p.NewCounter(name, labels...)) + } + return &MultiCounter{c} +} + +// NewGauge creates a MultiGauge with gauge objects for all registered +// providers. +func (mp *MultiProvider) NewGauge(name string, labels ...string) Gauge { + var v []Gauge + for _, p := range mp.p { + v = append(v, p.NewGauge(name, labels...)) + } + return &MultiGauge{v} +} + +// NewTimer creates a MultiTimer with timer objects for all registered +// providers. +func (mp *MultiProvider) NewTimer(name string, labels ...string) Timer { + var t []Timer + for _, p := range mp.p { + t = append(t, p.NewTimer(name, labels...)) + } + return &MultiTimer{t} +} + +// Unregister removes the metric object from all registered providers. +func (mp *MultiProvider) Unregister(v interface{}) { + for _, p := range mp.p { + p.Unregister(v) + } +} + +// Count measures a number. +type Counter interface { + Count(int) +} + +// MultiCounter wraps zero or more counters. +type MultiCounter struct { + c []Counter +} + +func (mc *MultiCounter) Count(n int) { + for _, c := range mc.c { + c.Count(n) + } +} + +// Gauge measures a value. +type Gauge interface { + Update(int) +} + +// MultiGauge wraps zero or more gauges. +type MultiGauge struct { + v []Gauge +} + +func (m *MultiGauge) Update(n int) { + for _, v := range m.v { + v.Update(n) + } +} + +// Timer measures the time of an event. +type Timer interface { + Update(time.Duration) +} + +// MultTimer wraps zero or more timers. +type MultiTimer struct { + t []Timer +} + +func (mt *MultiTimer) Update(d time.Duration) { + for _, t := range mt.t { + t.Update(d) + } +} diff --git a/metrics4/names/names.go b/metrics4/names/names.go new file mode 100644 index 000000000..8ec1b64d1 --- /dev/null +++ b/metrics4/names/names.go @@ -0,0 +1,43 @@ +package names + +import ( + "net/url" + "strings" +) + +type Service struct { + Service string + Host string + Path string + TargetURL *url.URL +} + +func (s Service) String() string { + return s.Service +} + +const DotSeparator = "." +const PipeSeparator = "|" + +func Flatten(name string, labels []string, separator string) string { + if len(labels) == 0 { + return name + } + return name + separator + strings.Join(labels, separator) +} + +// todo(fs): this function probably allocates like crazy. If on the stack then it might be ok. +// todo(fs): otherwise, give some love. +func Labels(labels []string, prefix, fieldsep, recsep string) string { + if len(labels) == 0 { + return "" + } + if len(labels)%2 != 0 { + labels = append(labels, "???") + } + var fields []string + for i := 0; i < len(labels); i += 2 { + fields = append(fields, labels[i]+fieldsep+labels[i+1]) + } + return prefix + strings.Join(fields, recsep) +} diff --git a/metrics4/statsdraw/statsd.go b/metrics4/statsdraw/statsd.go new file mode 100644 index 000000000..d67906578 --- /dev/null +++ b/metrics4/statsdraw/statsd.go @@ -0,0 +1,73 @@ +package statsdraw + +import ( + "time" + + "github.com/alexcesaro/statsd" + "github.com/fabiolb/fabio/metrics4" + "github.com/fabiolb/fabio/metrics4/names" +) + +type Provider struct { + c *statsd.Client +} + +func NewProvider(prefix, addr string, interval time.Duration) (*Provider, error) { + opts := []statsd.Option{ + statsd.Address(addr), + statsd.FlushPeriod(interval), + } + if prefix != "" { + opts = append(opts, statsd.Prefix(prefix)) + } + + c, err := statsd.New(opts...) + if err != nil { + return nil, err + } + return &Provider{c}, nil +} + +func (p *Provider) NewCounter(name string, labels ...string) metrics4.Counter { + return &Counter{c: p.c, name: name, labels: labels} +} + +func (p *Provider) NewGauge(name string, labels ...string) metrics4.Gauge { + return &Gauge{c: p.c, name: name, labels: labels} +} + +func (p *Provider) NewTimer(name string, labels ...string) metrics4.Timer { + return &Timer{c: p.c, name: name, labels: labels} +} + +func (p *Provider) Unregister(interface{}) {} + +type Counter struct { + c *statsd.Client + name string + labels []string +} + +func (v *Counter) Count(n int) { + v.c.Count(names.Flatten(v.name, v.labels, names.DotSeparator), n) +} + +type Gauge struct { + c *statsd.Client + name string + labels []string +} + +func (v *Gauge) Update(n int) { + v.c.Gauge(names.Flatten(v.name, v.labels, names.DotSeparator), n) +} + +type Timer struct { + c *statsd.Client + name string + labels []string +} + +func (v *Timer) Update(d time.Duration) { + v.c.Timing(names.Flatten(v.name, v.labels, names.DotSeparator), d) +} diff --git a/metrics/circonus.go b/metrics_old/circonus.go similarity index 100% rename from metrics/circonus.go rename to metrics_old/circonus.go diff --git a/metrics/circonus_test.go b/metrics_old/circonus_test.go similarity index 100% rename from metrics/circonus_test.go rename to metrics_old/circonus_test.go diff --git a/metrics/gometrics.go b/metrics_old/gometrics.go similarity index 100% rename from metrics/gometrics.go rename to metrics_old/gometrics.go diff --git a/metrics/metrics.go b/metrics_old/metrics.go similarity index 100% rename from metrics/metrics.go rename to metrics_old/metrics.go diff --git a/metrics/metrics_test.go b/metrics_old/metrics_test.go similarity index 100% rename from metrics/metrics_test.go rename to metrics_old/metrics_test.go diff --git a/metrics/noop.go b/metrics_old/noop.go similarity index 100% rename from metrics/noop.go rename to metrics_old/noop.go diff --git a/metrics/registry.go b/metrics_old/registry.go similarity index 100% rename from metrics/registry.go rename to metrics_old/registry.go diff --git a/proxy/grpc_handler.go b/proxy/grpc_handler.go index 863d834d8..86471d880 100644 --- a/proxy/grpc_handler.go +++ b/proxy/grpc_handler.go @@ -8,12 +8,11 @@ import ( "net" "net/http" "net/url" - "strings" "sync" "time" "github.com/fabiolb/fabio/config" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" "github.com/fabiolb/fabio/route" grpc_proxy "github.com/mwitkow/grpc-proxy/proxy" "google.golang.org/grpc" @@ -99,7 +98,7 @@ func (g GrpcProxyInterceptor) Stream(srv interface{}, stream grpc.ServerStream, } if target == nil { - g.StatsHandler.NoRoute.Inc(1) + g.StatsHandler.NoRoute.Count(1) log.Println("[WARN] grpc: no route found for", info.FullMethod) return status.Error(codes.NotFound, "no route found") } @@ -158,9 +157,10 @@ func (g GrpcProxyInterceptor) lookup(ctx context.Context, fullMethodName string) } type GrpcStatsHandler struct { - Connect metrics.Counter - Request metrics.Timer - NoRoute metrics.Counter + Connect metrics4.Counter + Request metrics4.Timer + NoRoute metrics4.Counter + Metrics metrics4.Provider } type connCtxKey struct{} @@ -175,6 +175,11 @@ func (h *GrpcStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) c } func (h *GrpcStatsHandler) HandleRPC(ctx context.Context, rpc stats.RPCStats) { + metrics := h.Metrics + if metrics == nil { + metrics = &metrics4.MultiProvider{} + } + rpcStats, _ := rpc.(*stats.End) if rpcStats == nil { @@ -186,7 +191,7 @@ func (h *GrpcStatsHandler) HandleRPC(ctx context.Context, rpc stats.RPCStats) { h.Request.Update(dur) s, _ := status.FromError(rpcStats.Error) - metrics.DefaultRegistry.GetTimer(fmt.Sprintf("grpc.status.%s", strings.ToLower(s.Code().String()))) + metrics.NewTimer("grpc.status", "code", s.Code().String()).Update(dur) } // HandleConn processes the Conn stats. @@ -194,7 +199,7 @@ func (h *GrpcStatsHandler) HandleConn(ctx context.Context, conn stats.ConnStats) connBegin, _ := conn.(*stats.ConnBegin) if connBegin != nil { - h.Connect.Inc(1) + h.Connect.Count(1) } } diff --git a/proxy/http_proxy.go b/proxy/http_proxy.go index 889d071fa..b49957e72 100644 --- a/proxy/http_proxy.go +++ b/proxy/http_proxy.go @@ -15,7 +15,7 @@ import ( "github.com/fabiolb/fabio/auth" "github.com/fabiolb/fabio/config" "github.com/fabiolb/fabio/logger" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" "github.com/fabiolb/fabio/noroute" "github.com/fabiolb/fabio/proxy/gzip" "github.com/fabiolb/fabio/route" @@ -46,11 +46,17 @@ type HTTPProxy struct { Lookup func(*http.Request) *route.Target // Requests is a timer metric which is updated for every request. - Requests metrics.Timer + Requests metrics4.Timer // Noroute is a counter metric which is updated for every request // where Lookup() returns nil. - Noroute metrics.Counter + Noroute metrics4.Counter + + // WSConn counts the number of open web socket connections. + WSConn metrics4.Gauge + + // Metrics is the configured metrics backend provider. + Metrics metrics4.Provider // Logger is the access logger for the requests. Logger logger.Logger @@ -71,6 +77,11 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { panic("no lookup function") } + metrics := p.Metrics + if metrics == nil { + metrics = &metrics4.MultiProvider{} + } + if p.Config.RequestID != "" { id := p.UUID if id == nil { @@ -122,7 +133,7 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { if t.Timer != nil { t.Timer.Update(0) } - metrics.DefaultRegistry.GetTimer(key(t.RedirectCode)).Update(0) + metrics.NewCounter("http.status", "code", strconv.Itoa(t.RedirectCode)).Count(1) return } @@ -184,9 +195,9 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { if targetURL.Scheme == "https" || targetURL.Scheme == "wss" { h = newWSHandler(targetURL.Host, func(network, address string) (net.Conn, error) { return tls.Dial(network, address, tr.(*http.Transport).TLSClientConfig) - }) + }, p.WSConn) } else { - h = newWSHandler(targetURL.Host, net.Dial) + h = newWSHandler(targetURL.Host, net.Dial, p.WSConn) } case accept == "text/event-stream": @@ -223,7 +234,7 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - metrics.DefaultRegistry.GetTimer(key(rw.code)).Update(dur) + metrics.NewTimer("http.status", "code", strconv.Itoa(rw.code)).Update(dur) // write access log if p.Logger != nil { diff --git a/proxy/tcp/copy_buffer.go b/proxy/tcp/copy_buffer.go index c06c50fd1..ce7395b14 100644 --- a/proxy/tcp/copy_buffer.go +++ b/proxy/tcp/copy_buffer.go @@ -3,12 +3,12 @@ package tcp import ( "io" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" ) // copyBuffer is an adapted version of io.copyBuffer which updates a // counter instead of returning the total bytes written. -func copyBuffer(dst io.Writer, src io.Reader, c metrics.Counter) (err error) { +func copyBuffer(dst io.Writer, src io.Reader, c metrics4.Counter) (err error) { buf := make([]byte, 32*1024) for { nr, er := src.Read(buf) @@ -16,7 +16,7 @@ func copyBuffer(dst io.Writer, src io.Reader, c metrics.Counter) (err error) { nw, ew := dst.Write(buf[0:nr]) if nw > 0 { if c != nil { - c.Inc(int64(nw)) + c.Count(nw) } } if ew != nil { diff --git a/proxy/tcp/sni_proxy.go b/proxy/tcp/sni_proxy.go index 6a40ee8a1..1c78dccb0 100644 --- a/proxy/tcp/sni_proxy.go +++ b/proxy/tcp/sni_proxy.go @@ -7,7 +7,7 @@ import ( "net" "time" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" "github.com/fabiolb/fabio/route" ) @@ -26,20 +26,28 @@ type SNIProxy struct { Lookup func(host string) *route.Target // Conn counts the number of connections. - Conn metrics.Counter + Conn metrics4.Counter // ConnFail counts the failed upstream connection attempts. - ConnFail metrics.Counter + ConnFail metrics4.Counter // Noroute counts the failed Lookup() calls. - Noroute metrics.Counter + Noroute metrics4.Counter + + // Metrics is the configured metrics backend provider. + Metrics metrics4.Provider } func (p *SNIProxy) ServeTCP(in net.Conn) error { defer in.Close() + metrics := p.Metrics + if metrics == nil { + metrics = &metrics4.MultiProvider{} + } + if p.Conn != nil { - p.Conn.Inc(1) + p.Conn.Count(1) } tlsReader := bufio.NewReader(in) @@ -47,7 +55,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if err != nil { log.Print("[DEBUG] tcp+sni: TLS handshake failed (failed to peek data)") if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return err } @@ -56,7 +64,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if err != nil { log.Printf("[DEBUG] tcp+sni: TLS handshake failed (%s)", err) if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return err } @@ -66,7 +74,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if err != nil { log.Printf("[DEBUG] tcp+sni: TLS handshake failed (%s)", err) if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return err } @@ -77,7 +85,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if !ok { log.Print("[DEBUG] tcp+sni: TLS handshake failed (unable to parse client hello)") if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return nil } @@ -85,7 +93,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if host == "" { log.Print("[DEBUG] tcp+sni: server_name missing") if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return nil } @@ -93,7 +101,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { t := p.Lookup(host) if t == nil { if p.Noroute != nil { - p.Noroute.Inc(1) + p.Noroute.Count(1) } return nil } @@ -107,7 +115,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if err != nil { log.Print("[WARN] tcp+sni: cannot connect to upstream ", addr) if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return err } @@ -130,23 +138,23 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if err != nil { log.Print("[WARN] tcp+sni: copy client hello failed. ", err) if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return err } errc := make(chan error, 2) - cp := func(dst io.Writer, src io.Reader, c metrics.Counter) { + cp := func(dst io.Writer, src io.Reader, c metrics4.Counter) { errc <- copyBuffer(dst, src, c) } // rx measures the traffic to the upstream server (in <- out) // tx measures the traffic from the upstream server (out <- in) - rx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".rx") - tx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".tx") + rx := metrics.NewCounter(t.TimerName.String() + ".rx") + tx := metrics.NewCounter(t.TimerName.String() + ".tx") // we've received the ClientHello already - rx.Inc(int64(n)) + rx.Count(n) go cp(in, out, rx) go cp(out, in, tx) diff --git a/proxy/tcp/tcp_dynamic_proxy.go b/proxy/tcp/tcp_dynamic_proxy.go index 2cd4ad8b3..3c49fe19c 100644 --- a/proxy/tcp/tcp_dynamic_proxy.go +++ b/proxy/tcp/tcp_dynamic_proxy.go @@ -6,7 +6,7 @@ import ( "net" "time" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" "github.com/fabiolb/fabio/route" ) @@ -21,26 +21,34 @@ type DynamicProxy struct { Lookup func(host string) *route.Target // Conn counts the number of connections. - Conn metrics.Counter + Conn metrics4.Counter // ConnFail counts the failed upstream connection attempts. - ConnFail metrics.Counter + ConnFail metrics4.Counter // Noroute counts the failed Lookup() calls. - Noroute metrics.Counter + Noroute metrics4.Counter + + // Metrics is the configured metrics backend provider. + Metrics metrics4.Provider } func (p *DynamicProxy) ServeTCP(in net.Conn) error { defer in.Close() + metrics := p.Metrics + if metrics == nil { + metrics = &metrics4.MultiProvider{} + } + if p.Conn != nil { - p.Conn.Inc(1) + p.Conn.Count(1) } target := in.LocalAddr().String() t := p.Lookup(target) if t == nil { if p.Noroute != nil { - p.Noroute.Inc(1) + p.Noroute.Count(1) } return nil } @@ -55,21 +63,21 @@ func (p *DynamicProxy) ServeTCP(in net.Conn) error { if err != nil { log.Print("[WARN] tcp: cannot connect to upstream ", addr) if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return err } defer out.Close() errc := make(chan error, 2) - cp := func(dst io.Writer, src io.Reader, c metrics.Counter) { + cp := func(dst io.Writer, src io.Reader, c metrics4.Counter) { errc <- copyBuffer(dst, src, c) } // rx measures the traffic to the upstream server (in <- out) // tx measures the traffic from the upstream server (out <- in) - rx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".rx") - tx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".tx") + rx := metrics.NewCounter(t.TimerName.String() + ".rx") + tx := metrics.NewCounter(t.TimerName.String() + ".tx") go cp(in, out, rx) go cp(out, in, tx) diff --git a/proxy/tcp/tcp_proxy.go b/proxy/tcp/tcp_proxy.go index a6c238937..15953cfe3 100644 --- a/proxy/tcp/tcp_proxy.go +++ b/proxy/tcp/tcp_proxy.go @@ -6,7 +6,7 @@ import ( "net" "time" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" "github.com/fabiolb/fabio/route" ) @@ -21,20 +21,28 @@ type Proxy struct { Lookup func(host string) *route.Target // Conn counts the number of connections. - Conn metrics.Counter + Conn metrics4.Counter // ConnFail counts the failed upstream connection attempts. - ConnFail metrics.Counter + ConnFail metrics4.Counter // Noroute counts the failed Lookup() calls. - Noroute metrics.Counter + Noroute metrics4.Counter + + // Metrics is the configured metrics backend provider. + Metrics metrics4.Provider } func (p *Proxy) ServeTCP(in net.Conn) error { defer in.Close() + metrics := p.Metrics + if metrics == nil { + metrics = &metrics4.MultiProvider{} + } + if p.Conn != nil { - p.Conn.Inc(1) + p.Conn.Count(1) } _, port, _ := net.SplitHostPort(in.LocalAddr().String()) @@ -42,7 +50,7 @@ func (p *Proxy) ServeTCP(in net.Conn) error { t := p.Lookup(port) if t == nil { if p.Noroute != nil { - p.Noroute.Inc(1) + p.Noroute.Count(1) } return nil } @@ -56,7 +64,7 @@ func (p *Proxy) ServeTCP(in net.Conn) error { if err != nil { log.Print("[WARN] tcp: cannot connect to upstream ", addr) if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return err } @@ -68,21 +76,21 @@ func (p *Proxy) ServeTCP(in net.Conn) error { if err != nil { log.Print("[WARN] tcp: write proxy protocol header failed. ", err) if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return err } } errc := make(chan error, 2) - cp := func(dst io.Writer, src io.Reader, c metrics.Counter) { + cp := func(dst io.Writer, src io.Reader, c metrics4.Counter) { errc <- copyBuffer(dst, src, c) } // rx measures the traffic to the upstream server (in <- out) // tx measures the traffic from the upstream server (out <- in) - rx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".rx") - tx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".tx") + rx := metrics.NewCounter(t.TimerName.String() + ".rx") + tx := metrics.NewCounter(t.TimerName.String() + ".tx") go cp(in, out, rx) go cp(out, in, tx) diff --git a/proxy/ws_handler.go b/proxy/ws_handler.go index 830f6c15e..85188b364 100644 --- a/proxy/ws_handler.go +++ b/proxy/ws_handler.go @@ -7,13 +7,14 @@ import ( "net" "net/http" "strings" + "sync/atomic" "time" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" ) -// conn measures the number of open web socket connections -var conn = metrics.DefaultRegistry.GetCounter("ws.conn") +// conns keeps track of the number of open ws connections +var conns int64 type dialFunc func(network, address string) (net.Conn, error) @@ -21,10 +22,14 @@ type dialFunc func(network, address string) (net.Conn, error) // an incoming and outgoing websocket connection. It checks whether // the handshake was completed successfully before forwarding data // between the client and server. -func newWSHandler(host string, dial dialFunc) http.Handler { +func newWSHandler(host string, dial dialFunc, conn metrics4.Gauge) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - conn.Inc(1) - defer func() { conn.Inc(-1) }() + if conn != nil { + conn.Update(int(atomic.AddInt64(&conns, 1))) + defer func() { + conn.Update(int(atomic.AddInt64(&conns, -1))) + }() + } hj, ok := w.(http.Hijacker) if !ok { diff --git a/route/route.go b/route/route.go index 69e865e97..3740b4c22 100644 --- a/route/route.go +++ b/route/route.go @@ -9,7 +9,7 @@ import ( "strconv" "strings" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4/names" "github.com/gobwas/glob" ) @@ -54,22 +54,21 @@ func (r *Route) addTarget(service string, targetURL *url.URL, fixedWeight float6 } } - name, err := metrics.TargetName(service, r.Host, r.Path, targetURL) - if err != nil { - log.Printf("[ERROR] Invalid metrics name: %s", err) - name = "unknown" - } - t := &Target{ Service: service, Tags: tags, Opts: opts, URL: targetURL, FixedWeight: fixedWeight, - Timer: ServiceRegistry.GetTimer(name), - TimerName: name, + TimerName: names.Service{ + Service: service, + Host: r.Host, + Path: r.Path, + TargetURL: targetURL, + }, } + var err error if opts != nil { t.StripPath = opts["strip"] t.TLSSkipVerify = opts["tlsskipverify"] == "true" diff --git a/route/table.go b/route/table.go index e6e59bc22..f6e0b26af 100644 --- a/route/table.go +++ b/route/table.go @@ -10,10 +10,8 @@ import ( "net/url" "sort" "strings" - "sync" "sync/atomic" - "github.com/fabiolb/fabio/metrics" "github.com/gobwas/glob" ) @@ -24,9 +22,6 @@ var errNoMatch = errors.New("route: no target match") // table stores the active routing table. Must never be nil. var table atomic.Value -// ServiceRegistry stores the metrics for the services. -var ServiceRegistry metrics.Registry = metrics.NoopRegistry{} - // init initializes the routing table. func init() { table.Store(make(Table)) @@ -39,9 +34,6 @@ func GetTable() Table { return table.Load().(Table) } -// mu guards table and registry in SetTable. -var mu sync.Mutex - // SetTable sets the active routing table. A nil value // logs a warning and is ignored. The function is safe // to be called from multiple goroutines. @@ -50,42 +42,7 @@ func SetTable(t Table) { log.Print("[WARN] Ignoring nil routing table") return } - mu.Lock() table.Store(t) - syncRegistry(t) - mu.Unlock() -} - -// syncRegistry unregisters all inactive timers. -// It assumes that all timers of the table have -// already been registered. -func syncRegistry(t Table) { - timers := map[string]bool{} - - // get all registered timers - for _, name := range ServiceRegistry.Names() { - timers[name] = false - } - - // mark the ones from this table as active. - // this can also add new entries but we do not - // really care since we are only interested in the - // inactive ones. - for _, routes := range t { - for _, r := range routes { - for _, tg := range r.Targets { - timers[tg.TimerName] = true - } - } - } - - // unregister inactive timers - for name, active := range timers { - if !active { - ServiceRegistry.Unregister(name) - log.Printf("[INFO] Unregistered timer %s", name) - } - } } // Table contains a set of routes grouped by host. diff --git a/route/table_registry_test.go b/route/table_registry_test.go index 3616dd9c5..5607fe3fe 100644 --- a/route/table_registry_test.go +++ b/route/table_registry_test.go @@ -1,63 +1,55 @@ package route -import ( - "reflect" - "sort" - "testing" - - "github.com/fabiolb/fabio/metrics" -) - -func TestSyncRegistry(t *testing.T) { - oldRegistry := ServiceRegistry - ServiceRegistry = newStubRegistry() - defer func() { ServiceRegistry = oldRegistry }() - - tbl := make(Table) - tbl.addRoute(&RouteDef{Service: "svc-a", Src: "/aaa", Dst: "http://localhost:1234", Weight: 1}) - tbl.addRoute(&RouteDef{Service: "svc-b", Src: "/bbb", Dst: "http://localhost:5678", Weight: 1}) - if got, want := ServiceRegistry.Names(), []string{"svc-a._./aaa.localhost_1234", "svc-b._./bbb.localhost_5678"}; !reflect.DeepEqual(got, want) { - t.Fatalf("got %v want %v", got, want) - } - - tbl.delRoute(&RouteDef{Service: "svc-b", Src: "/bbb", Dst: "http://localhost:5678"}) - syncRegistry(tbl) - if got, want := ServiceRegistry.Names(), []string{"svc-a._./aaa.localhost_1234"}; !reflect.DeepEqual(got, want) { - t.Fatalf("got %v want %v", got, want) - } -} - -func newStubRegistry() metrics.Registry { - return &stubRegistry{names: make(map[string]bool)} -} - -type stubRegistry struct { - names map[string]bool -} - -func (p *stubRegistry) Names() []string { - n := []string{} - for k := range p.names { - n = append(n, k) - } - sort.Strings(n) - return n -} - -func (p *stubRegistry) Unregister(name string) { - delete(p.names, name) -} - -func (p *stubRegistry) UnregisterAll() { - p.names = map[string]bool{} -} - -func (p *stubRegistry) GetCounter(name string) metrics.Counter { - p.names[name] = true - return metrics.NoopCounter{} -} - -func (p *stubRegistry) GetTimer(name string) metrics.Timer { - p.names[name] = true - return metrics.NoopTimer{} -} +// func TestSyncRegistry(t *testing.T) { +// oldRegistry := ServiceRegistry +// ServiceRegistry = newStubRegistry() +// defer func() { ServiceRegistry = oldRegistry }() +// +// tbl := make(Table) +// tbl.addRoute(&RouteDef{Service: "svc-a", Src: "/aaa", Dst: "http://localhost:1234", Weight: 1}) +// tbl.addRoute(&RouteDef{Service: "svc-b", Src: "/bbb", Dst: "http://localhost:5678", Weight: 1}) +// if got, want := ServiceRegistry.Names(), []string{"svc-a._./aaa.localhost_1234", "svc-b._./bbb.localhost_5678"}; !reflect.DeepEqual(got, want) { +// t.Fatalf("got %v want %v", got, want) +// } +// +// tbl.delRoute(&RouteDef{Service: "svc-b", Src: "/bbb", Dst: "http://localhost:5678"}) +// syncRegistry(tbl) +// if got, want := ServiceRegistry.Names(), []string{"svc-a._./aaa.localhost_1234"}; !reflect.DeepEqual(got, want) { +// t.Fatalf("got %v want %v", got, want) +// } +// } +// +// func newStubRegistry() metrics.Registry { +// return &stubRegistry{names: make(map[string]bool)} +// } +// +// type stubRegistry struct { +// names map[string]bool +// } +// +// func (p *stubRegistry) Names() []string { +// n := []string{} +// for k := range p.names { +// n = append(n, k) +// } +// sort.Strings(n) +// return n +// } +// +// func (p *stubRegistry) Unregister(name string) { +// delete(p.names, name) +// } +// +// func (p *stubRegistry) UnregisterAll() { +// p.names = map[string]bool{} +// } +// +// func (p *stubRegistry) GetCounter(name string) metrics.Counter { +// p.names[name] = true +// return metrics.NoopCounter{} +// } +// +// func (p *stubRegistry) GetTimer(name string) metrics.Timer { +// p.names[name] = true +// return metrics.NoopTimer{} +// } diff --git a/route/target.go b/route/target.go index 24341142a..f3064ab1d 100644 --- a/route/target.go +++ b/route/target.go @@ -4,7 +4,8 @@ import ( "net/url" "strings" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" + "github.com/fabiolb/fabio/metrics4/names" ) type Target struct { @@ -50,10 +51,10 @@ type Target struct { Weight float64 // Timer measures throughput and latency of this target - Timer metrics.Timer + Timer metrics4.Timer // TimerName is the name of the timer in the metrics registry - TimerName string + TimerName names.Service // accessRules is map of access information for the target. accessRules map[string][]interface{} diff --git a/vendor/github.com/alexcesaro/statsd/.travis.yml b/vendor/github.com/alexcesaro/statsd/.travis.yml new file mode 100644 index 000000000..48915e737 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/.travis.yml @@ -0,0 +1,9 @@ +language: go + +go: + - 1.2 + - 1.3 + - 1.4 + - 1.5 + - 1.6 + - tip diff --git a/vendor/github.com/alexcesaro/statsd/CHANGELOG.md b/vendor/github.com/alexcesaro/statsd/CHANGELOG.md new file mode 100644 index 000000000..04d811b71 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/CHANGELOG.md @@ -0,0 +1,64 @@ +# Change Log +All notable changes to this project will be documented in this file. +This project adheres to [Semantic Versioning](http://semver.org/). + +## [2.0.0] - 2016-03-20 + +- `New` signature changed. The default address used is now ":8125". To use + another address use the `Address` option: + + Before: + ``` + statsd.New(":8125") + statsd.New(":9000") + ``` + + After + ``` + statsd.New() + statsd.New(statsd.Address(":9000")) + ``` + +- The `rate` parameter has been removed from the `Count` and `Timing` methods. + Use the new `SampleRate` option instead. + +- `Count`, `Gauge` and `Timing` now accept a `interface{}` instead of an int as + the value parameter. So you can now use any type of integer or float in these + functions. + +- The `WithInfluxDBTags` and `WithDatadogTags` options were replaced by the + `TagsFormat` and `Tags` options: + + Before: + ``` + statsd.New(statsd.WithInfluxDBTags("tag", "value")) + statsd.New(statsd.WithDatadogTags("tag", "value")) + ``` + + After + ``` + statsd.New(statsd.TagsFormat(statsd.InfluxDB), statsd.Tags("tag", "value")) + statsd.New(statsd.TagsFormat(statsd.Datadog), statsd.Tags("tag", "value")) + ``` + +- All options whose named began by `With` had the `With` stripped: + + Before: + ``` + statsd.New(statsd.WithMaxPacketSize(65000)) + ``` + + After + ``` + statsd.New(statsd.MaxPacketSize(65000)) + ``` + +- `ChangeGauge` has been removed as it is a bad practice: UDP packets can be + lost so using relative changes can cause unreliable values in the long term. + Use `Gauge` instead which sends an absolute value. + +- The `Histogram` method has been added. + +- The `Clone` method was added to the `Client`, it allows to create a new + `Client` with different rate / prefix / tags parameters while still using the + same connection. diff --git a/vendor/github.com/alexcesaro/statsd/LICENSE b/vendor/github.com/alexcesaro/statsd/LICENSE new file mode 100644 index 000000000..4ec7268d5 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2015 Alexandre Cesaro + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/alexcesaro/statsd/README.md b/vendor/github.com/alexcesaro/statsd/README.md new file mode 100644 index 000000000..774a1c687 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/README.md @@ -0,0 +1,50 @@ +# statsd +[![Build Status](https://travis-ci.org/alexcesaro/statsd.svg?branch=v2)](https://travis-ci.org/alexcesaro/statsd) [![Code Coverage](http://gocover.io/_badge/gopkg.in/alexcesaro/statsd.v2)](http://gocover.io/gopkg.in/alexcesaro/statsd.v2) [![Documentation](https://godoc.org/gopkg.in/alexcesaro/statsd.v2?status.svg)](https://godoc.org/gopkg.in/alexcesaro/statsd.v2) + +## Introduction + +statsd is a simple and efficient [Statsd](https://github.com/etsy/statsd) +client. + +See the [benchmark](https://github.com/alexcesaro/statsdbench) for a comparison +with other Go StatsD clients. + +## Features + +- Supports all StatsD metrics: counter, gauge, timing and set +- Supports InfluxDB and Datadog tags +- Fast and GC-friendly: all functions for sending metrics do not allocate +- Efficient: metrics are buffered by default +- Simple and clean API +- 100% test coverage +- Versioned API using gopkg.in + + +## Documentation + +https://godoc.org/gopkg.in/alexcesaro/statsd.v2 + + +## Download + + go get gopkg.in/alexcesaro/statsd.v2 + + +## Example + +See the [examples in the documentation](https://godoc.org/gopkg.in/alexcesaro/statsd.v2#example-package). + + +## License + +[MIT](LICENSE) + + +## Contribute + +Do you have any question the documentation does not answer? Is there a use case +that you feel is common and is not well-addressed by the current API? + +If so you are more than welcome to ask questions in the +[thread on golang-nuts](https://groups.google.com/d/topic/golang-nuts/Tz6t4_iLgnw/discussion) +or open an issue or send a pull-request here on Github. diff --git a/vendor/github.com/alexcesaro/statsd/conn.go b/vendor/github.com/alexcesaro/statsd/conn.go new file mode 100644 index 000000000..4dbda6309 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/conn.go @@ -0,0 +1,270 @@ +package statsd + +import ( + "io" + "math/rand" + "net" + "strconv" + "sync" + "time" +) + +type conn struct { + // Fields settable with options at Client's creation. + addr string + errorHandler func(error) + flushPeriod time.Duration + maxPacketSize int + network string + tagFormat TagFormat + + mu sync.Mutex + // Fields guarded by the mutex. + closed bool + w io.WriteCloser + buf []byte + rateCache map[float32]string +} + +func newConn(conf connConfig, muted bool) (*conn, error) { + c := &conn{ + addr: conf.Addr, + errorHandler: conf.ErrorHandler, + flushPeriod: conf.FlushPeriod, + maxPacketSize: conf.MaxPacketSize, + network: conf.Network, + tagFormat: conf.TagFormat, + } + + if muted { + return c, nil + } + + var err error + c.w, err = dialTimeout(c.network, c.addr, 5*time.Second) + if err != nil { + return c, err + } + // When using UDP do a quick check to see if something is listening on the + // given port to return an error as soon as possible. + if c.network[:3] == "udp" { + for i := 0; i < 2; i++ { + _, err = c.w.Write(nil) + if err != nil { + _ = c.w.Close() + c.w = nil + return c, err + } + } + } + + // To prevent a buffer overflow add some capacity to the buffer to allow for + // an additional metric. + c.buf = make([]byte, 0, c.maxPacketSize+200) + + if c.flushPeriod > 0 { + go func() { + ticker := time.NewTicker(c.flushPeriod) + for _ = range ticker.C { + c.mu.Lock() + if c.closed { + ticker.Stop() + c.mu.Unlock() + return + } + c.flush(0) + c.mu.Unlock() + } + }() + } + + return c, nil +} + +func (c *conn) metric(prefix, bucket string, n interface{}, typ string, rate float32, tags string) { + c.mu.Lock() + l := len(c.buf) + c.appendBucket(prefix, bucket, tags) + c.appendNumber(n) + c.appendType(typ) + c.appendRate(rate) + c.closeMetric(tags) + c.flushIfBufferFull(l) + c.mu.Unlock() +} + +func (c *conn) gauge(prefix, bucket string, value interface{}, tags string) { + c.mu.Lock() + l := len(c.buf) + // To set a gauge to a negative value we must first set it to 0. + // https://github.com/etsy/statsd/blob/master/docs/metric_types.md#gauges + if isNegative(value) { + c.appendBucket(prefix, bucket, tags) + c.appendGauge(0, tags) + } + c.appendBucket(prefix, bucket, tags) + c.appendGauge(value, tags) + c.flushIfBufferFull(l) + c.mu.Unlock() +} + +func (c *conn) appendGauge(value interface{}, tags string) { + c.appendNumber(value) + c.appendType("g") + c.closeMetric(tags) +} + +func (c *conn) unique(prefix, bucket string, value string, tags string) { + c.mu.Lock() + l := len(c.buf) + c.appendBucket(prefix, bucket, tags) + c.appendString(value) + c.appendType("s") + c.closeMetric(tags) + c.flushIfBufferFull(l) + c.mu.Unlock() +} + +func (c *conn) appendByte(b byte) { + c.buf = append(c.buf, b) +} + +func (c *conn) appendString(s string) { + c.buf = append(c.buf, s...) +} + +func (c *conn) appendNumber(v interface{}) { + switch n := v.(type) { + case int: + c.buf = strconv.AppendInt(c.buf, int64(n), 10) + case uint: + c.buf = strconv.AppendUint(c.buf, uint64(n), 10) + case int64: + c.buf = strconv.AppendInt(c.buf, n, 10) + case uint64: + c.buf = strconv.AppendUint(c.buf, n, 10) + case int32: + c.buf = strconv.AppendInt(c.buf, int64(n), 10) + case uint32: + c.buf = strconv.AppendUint(c.buf, uint64(n), 10) + case int16: + c.buf = strconv.AppendInt(c.buf, int64(n), 10) + case uint16: + c.buf = strconv.AppendUint(c.buf, uint64(n), 10) + case int8: + c.buf = strconv.AppendInt(c.buf, int64(n), 10) + case uint8: + c.buf = strconv.AppendUint(c.buf, uint64(n), 10) + case float64: + c.buf = strconv.AppendFloat(c.buf, n, 'f', -1, 64) + case float32: + c.buf = strconv.AppendFloat(c.buf, float64(n), 'f', -1, 32) + } +} + +func isNegative(v interface{}) bool { + switch n := v.(type) { + case int: + return n < 0 + case uint: + return n < 0 + case int64: + return n < 0 + case uint64: + return n < 0 + case int32: + return n < 0 + case uint32: + return n < 0 + case int16: + return n < 0 + case uint16: + return n < 0 + case int8: + return n < 0 + case uint8: + return n < 0 + case float64: + return n < 0 + case float32: + return n < 0 + } + return false +} + +func (c *conn) appendBucket(prefix, bucket string, tags string) { + c.appendString(prefix) + c.appendString(bucket) + if c.tagFormat == InfluxDB { + c.appendString(tags) + } + c.appendByte(':') +} + +func (c *conn) appendType(t string) { + c.appendByte('|') + c.appendString(t) +} + +func (c *conn) appendRate(rate float32) { + if rate == 1 { + return + } + if c.rateCache == nil { + c.rateCache = make(map[float32]string) + } + + c.appendString("|@") + if s, ok := c.rateCache[rate]; ok { + c.appendString(s) + } else { + s = strconv.FormatFloat(float64(rate), 'f', -1, 32) + c.rateCache[rate] = s + c.appendString(s) + } +} + +func (c *conn) closeMetric(tags string) { + if c.tagFormat == Datadog { + c.appendString(tags) + } + c.appendByte('\n') +} + +func (c *conn) flushIfBufferFull(lastSafeLen int) { + if len(c.buf) > c.maxPacketSize { + c.flush(lastSafeLen) + } +} + +// flush flushes the first n bytes of the buffer. +// If n is 0, the whole buffer is flushed. +func (c *conn) flush(n int) { + if len(c.buf) == 0 { + return + } + if n == 0 { + n = len(c.buf) + } + + // Trim the last \n, StatsD does not like it. + _, err := c.w.Write(c.buf[:n-1]) + c.handleError(err) + if n < len(c.buf) { + copy(c.buf, c.buf[n:]) + } + c.buf = c.buf[:len(c.buf)-n] +} + +func (c *conn) handleError(err error) { + if err != nil && c.errorHandler != nil { + c.errorHandler(err) + } +} + +// Stubbed out for testing. +var ( + dialTimeout = net.DialTimeout + now = time.Now + randFloat = rand.Float32 +) diff --git a/vendor/github.com/alexcesaro/statsd/doc.go b/vendor/github.com/alexcesaro/statsd/doc.go new file mode 100644 index 000000000..bb7b986e3 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/doc.go @@ -0,0 +1,29 @@ +/* +Package statsd is a simple and efficient StatsD client. + + +Options + +Use options to configure the Client: target host/port, sampling rate, tags, etc. + +Whenever you want to use different options (e.g. other tags, different sampling +rate), you should use the Clone() method of the Client. + +Because when cloning a Client, the same connection is reused so this is way +cheaper and more efficient than creating another Client using New(). + + +Internals + +Client's methods buffer metrics. The buffer is flushed when either: + - the background goroutine flushes the buffer (every 100ms by default) + - the buffer is full (1440 bytes by default so that IP packets are not + fragmented) + +The background goroutine can be disabled using the FlushPeriod(0) option. + +Buffering can be disabled using the MaxPacketSize(0) option. + +StatsD homepage: https://github.com/etsy/statsd +*/ +package statsd diff --git a/vendor/github.com/alexcesaro/statsd/options.go b/vendor/github.com/alexcesaro/statsd/options.go new file mode 100644 index 000000000..ef95bb8c3 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/options.go @@ -0,0 +1,250 @@ +package statsd + +import ( + "bytes" + "strings" + "time" +) + +type config struct { + Conn connConfig + Client clientConfig +} + +type clientConfig struct { + Muted bool + Rate float32 + Prefix string + Tags []tag +} + +type connConfig struct { + Addr string + ErrorHandler func(error) + FlushPeriod time.Duration + MaxPacketSize int + Network string + TagFormat TagFormat +} + +// An Option represents an option for a Client. It must be used as an +// argument to New() or Client.Clone(). +type Option func(*config) + +// Address sets the address of the StatsD daemon. +// +// By default, ":8125" is used. This option is ignored in Client.Clone(). +func Address(addr string) Option { + return Option(func(c *config) { + c.Conn.Addr = addr + }) +} + +// ErrorHandler sets the function called when an error happens when sending +// metrics (e.g. the StatsD daemon is not listening anymore). +// +// By default, these errors are ignored. This option is ignored in +// Client.Clone(). +func ErrorHandler(h func(error)) Option { + return Option(func(c *config) { + c.Conn.ErrorHandler = h + }) +} + +// FlushPeriod sets how often the Client's buffer is flushed. If p is 0, the +// goroutine that periodically flush the buffer is not lauched and the buffer +// is only flushed when it is full. +// +// By default, the flush period is 100 ms. This option is ignored in +// Client.Clone(). +func FlushPeriod(p time.Duration) Option { + return Option(func(c *config) { + c.Conn.FlushPeriod = p + }) +} + +// MaxPacketSize sets the maximum packet size in bytes sent by the Client. +// +// By default, it is 1440 to avoid IP fragmentation. This option is ignored in +// Client.Clone(). +func MaxPacketSize(n int) Option { + return Option(func(c *config) { + c.Conn.MaxPacketSize = n + }) +} + +// Network sets the network (udp, tcp, etc) used by the client. See the +// net.Dial documentation (https://golang.org/pkg/net/#Dial) for the available +// network options. +// +// By default, network is udp. This option is ignored in Client.Clone(). +func Network(network string) Option { + return Option(func(c *config) { + c.Conn.Network = network + }) +} + +// Mute sets whether the Client is muted. All methods of a muted Client do +// nothing and return immedialtly. +// +// This option can be used in Client.Clone() only if the parent Client is not +// muted. The clones of a muted Client are always muted. +func Mute(b bool) Option { + return Option(func(c *config) { + c.Client.Muted = b + }) +} + +// SampleRate sets the sample rate of the Client. It allows sending the metrics +// less often which can be useful for performance intensive code paths. +func SampleRate(rate float32) Option { + return Option(func(c *config) { + c.Client.Rate = rate + }) +} + +// Prefix appends the prefix that will be used in every bucket name. +// +// Note that when used in cloned, the prefix of the parent Client is not +// replaced but is prepended to the given prefix. +func Prefix(p string) Option { + return Option(func(c *config) { + c.Client.Prefix += strings.TrimSuffix(p, ".") + "." + }) +} + +// TagFormat represents the format of tags sent by a Client. +type TagFormat uint8 + +// TagsFormat sets the format of tags. +func TagsFormat(tf TagFormat) Option { + return Option(func(c *config) { + c.Conn.TagFormat = tf + }) +} + +// Tags appends the given tags to the tags sent with every metrics. If a tag +// already exists, it is replaced. +// +// The tags must be set as key-value pairs. If the number of tags is not even, +// Tags panics. +// +// If the format of tags have not been set using the TagsFormat option, the tags +// will be ignored. +func Tags(tags ...string) Option { + if len(tags)%2 != 0 { + panic("statsd: Tags only accepts an even number of arguments") + } + + return Option(func(c *config) { + if len(tags) == 0 { + return + } + + newTags := make([]tag, len(tags)/2) + for i := 0; i < len(tags)/2; i++ { + newTags[i] = tag{K: tags[2*i], V: tags[2*i+1]} + } + + for _, newTag := range newTags { + exists := false + for _, oldTag := range c.Client.Tags { + if newTag.K == oldTag.K { + exists = true + oldTag.V = newTag.V + } + } + if !exists { + c.Client.Tags = append(c.Client.Tags, tag{ + K: newTag.K, + V: newTag.V, + }) + } + } + }) +} + +type tag struct { + K, V string +} + +func joinTags(tf TagFormat, tags []tag) string { + if len(tags) == 0 || tf == 0 { + return "" + } + join := joinFuncs[tf] + return join(tags) +} + +func splitTags(tf TagFormat, tags string) []tag { + if len(tags) == 0 || tf == 0 { + return nil + } + split := splitFuncs[tf] + return split(tags) +} + +const ( + // InfluxDB tag format. + // See https://influxdb.com/blog/2015/11/03/getting_started_with_influx_statsd.html + InfluxDB TagFormat = iota + 1 + // Datadog tag format. + // See http://docs.datadoghq.com/guides/metrics/#tags + Datadog +) + +var ( + joinFuncs = map[TagFormat]func([]tag) string{ + // InfluxDB tag format: ,tag1=payroll,region=us-west + // https://influxdb.com/blog/2015/11/03/getting_started_with_influx_statsd.html + InfluxDB: func(tags []tag) string { + var buf bytes.Buffer + for _, tag := range tags { + _ = buf.WriteByte(',') + _, _ = buf.WriteString(tag.K) + _ = buf.WriteByte('=') + _, _ = buf.WriteString(tag.V) + } + return buf.String() + }, + // Datadog tag format: |#tag1:value1,tag2:value2 + // http://docs.datadoghq.com/guides/dogstatsd/#datagram-format + Datadog: func(tags []tag) string { + buf := bytes.NewBufferString("|#") + first := true + for _, tag := range tags { + if first { + first = false + } else { + _ = buf.WriteByte(',') + } + _, _ = buf.WriteString(tag.K) + _ = buf.WriteByte(':') + _, _ = buf.WriteString(tag.V) + } + return buf.String() + }, + } + splitFuncs = map[TagFormat]func(string) []tag{ + InfluxDB: func(s string) []tag { + s = s[1:] + pairs := strings.Split(s, ",") + tags := make([]tag, len(pairs)) + for i, pair := range pairs { + kv := strings.Split(pair, "=") + tags[i] = tag{K: kv[0], V: kv[1]} + } + return tags + }, + Datadog: func(s string) []tag { + s = s[2:] + pairs := strings.Split(s, ",") + tags := make([]tag, len(pairs)) + for i, pair := range pairs { + kv := strings.Split(pair, ":") + tags[i] = tag{K: kv[0], V: kv[1]} + } + return tags + }, + } +) diff --git a/vendor/github.com/alexcesaro/statsd/statsd.go b/vendor/github.com/alexcesaro/statsd/statsd.go new file mode 100644 index 000000000..f19204d79 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/statsd.go @@ -0,0 +1,169 @@ +package statsd + +import "time" + +// A Client represents a StatsD client. +type Client struct { + conn *conn + muted bool + rate float32 + prefix string + tags string +} + +// New returns a new Client. +func New(opts ...Option) (*Client, error) { + // The default configuration. + conf := &config{ + Client: clientConfig{ + Rate: 1, + }, + Conn: connConfig{ + Addr: ":8125", + FlushPeriod: 100 * time.Millisecond, + // Worst-case scenario: + // Ethernet MTU - IPv6 Header - TCP Header = 1500 - 40 - 20 = 1440 + MaxPacketSize: 1440, + Network: "udp", + }, + } + for _, o := range opts { + o(conf) + } + + conn, err := newConn(conf.Conn, conf.Client.Muted) + c := &Client{ + conn: conn, + muted: conf.Client.Muted, + } + if err != nil { + c.muted = true + return c, err + } + c.rate = conf.Client.Rate + c.prefix = conf.Client.Prefix + c.tags = joinTags(conf.Conn.TagFormat, conf.Client.Tags) + return c, nil +} + +// Clone returns a clone of the Client. The cloned Client inherits its +// configuration from its parent. +// +// All cloned Clients share the same connection, so cloning a Client is a cheap +// operation. +func (c *Client) Clone(opts ...Option) *Client { + tf := c.conn.tagFormat + conf := &config{ + Client: clientConfig{ + Rate: c.rate, + Prefix: c.prefix, + Tags: splitTags(tf, c.tags), + }, + } + for _, o := range opts { + o(conf) + } + + clone := &Client{ + conn: c.conn, + muted: c.muted || conf.Client.Muted, + rate: conf.Client.Rate, + prefix: conf.Client.Prefix, + tags: joinTags(tf, conf.Client.Tags), + } + clone.conn = c.conn + return clone +} + +// Count adds n to bucket. +func (c *Client) Count(bucket string, n interface{}) { + if c.skip() { + return + } + c.conn.metric(c.prefix, bucket, n, "c", c.rate, c.tags) +} + +func (c *Client) skip() bool { + return c.muted || (c.rate != 1 && randFloat() > c.rate) +} + +// Increment increment the given bucket. It is equivalent to Count(bucket, 1). +func (c *Client) Increment(bucket string) { + c.Count(bucket, 1) +} + +// Gauge records an absolute value for the given bucket. +func (c *Client) Gauge(bucket string, value interface{}) { + if c.skip() { + return + } + c.conn.gauge(c.prefix, bucket, value, c.tags) +} + +// Timing sends a timing value to a bucket. +func (c *Client) Timing(bucket string, value interface{}) { + if c.skip() { + return + } + c.conn.metric(c.prefix, bucket, value, "ms", c.rate, c.tags) +} + +// Histogram sends an histogram value to a bucket. +func (c *Client) Histogram(bucket string, value interface{}) { + if c.skip() { + return + } + c.conn.metric(c.prefix, bucket, value, "h", c.rate, c.tags) +} + +// A Timing is an helper object that eases sending timing values. +type Timing struct { + start time.Time + c *Client +} + +// NewTiming creates a new Timing. +func (c *Client) NewTiming() Timing { + return Timing{start: now(), c: c} +} + +// Send sends the time elapsed since the creation of the Timing. +func (t Timing) Send(bucket string) { + t.c.Timing(bucket, int(t.Duration()/time.Millisecond)) +} + +// Duration returns the time elapsed since the creation of the Timing. +func (t Timing) Duration() time.Duration { + return now().Sub(t.start) +} + +// Unique sends the given value to a set bucket. +func (c *Client) Unique(bucket string, value string) { + if c.skip() { + return + } + c.conn.unique(c.prefix, bucket, value, c.tags) +} + +// Flush flushes the Client's buffer. +func (c *Client) Flush() { + if c.muted { + return + } + c.conn.mu.Lock() + c.conn.flush(0) + c.conn.mu.Unlock() +} + +// Close flushes the Client's buffer and releases the associated ressources. The +// Client and all the cloned Clients must not be used afterward. +func (c *Client) Close() { + if c.muted { + return + } + c.conn.mu.Lock() + c.conn.flush(0) + c.conn.handleError(c.conn.w.Close()) + c.conn.closed = true + c.conn.mu.Unlock() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 31a3f15a3..3a7f39073 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,5 +1,7 @@ # github.com/Shopify/sarama v1.19.0 github.com/Shopify/sarama +# github.com/alexcesaro/statsd v2.0.0+incompatible +github.com/alexcesaro/statsd # github.com/apache/thrift v0.0.0-20181028152738-da1169d75b15 github.com/apache/thrift/lib/go/thrift # github.com/armon/go-proxyproto v0.0.0-20180202201750-5b7edb60ff5f