diff --git a/admin/api/routes.go b/admin/api/routes.go index 10c2ac373..78bc40a63 100644 --- a/admin/api/routes.go +++ b/admin/api/routes.go @@ -59,8 +59,8 @@ func (h *RoutesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { Weight: tg.Weight, Tags: tg.Tags, Cmd: "route add", - Rate1: tg.Timer.Rate1(), - Pct99: tg.Timer.Percentile(0.99), + // Rate1: tg.Timer.Rate1(), + // Pct99: tg.Timer.Percentile(0.99), } routes = append(routes, ar) } diff --git a/go.mod b/go.mod index 97b4cb6b0..7a1c4d660 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/fabiolb/fabio require ( github.com/Shopify/sarama v1.19.0 // indirect github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect + github.com/alexcesaro/statsd v2.0.0+incompatible github.com/apache/thrift v0.13.0 // indirect github.com/armon/go-metrics v0.3.4 // indirect github.com/armon/go-proxyproto v0.0.0-20180202201750-5b7edb60ff5f @@ -54,6 +55,7 @@ require ( golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e // indirect google.golang.org/grpc v1.33.0 google.golang.org/protobuf v1.25.0 // indirect + gopkg.in/alexcesaro/statsd.v2 v2.0.0 // indirect gopkg.in/square/go-jose.v2 v2.5.1 // indirect gopkg.in/yaml.v2 v2.3.0 // indirect ) diff --git a/go.sum b/go.sum index 06941eab9..9ea856bb1 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,8 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/alexcesaro/statsd v2.0.0+incompatible h1:HG17k1Qk8V1F4UOoq6tx+IUoAbOcI5PHzzEUGeDD72w= +github.com/alexcesaro/statsd v2.0.0+incompatible/go.mod h1:vNepIbQAiyLe1j480173M6NYYaAsGwEcvuDTU3OCUGY= github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= @@ -21,21 +23,17 @@ github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= -github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible h1:C29Ae4G5GtYyYMm1aztcyj/J5ckgJm2zwdDajFbx1NY= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonus-gometrics/v3 v3.2.0 h1:i50XamqTYFi/bFFZGiREXehet8DPsaa/XExTmXTjjMc= github.com/circonus-labs/circonus-gometrics/v3 v3.2.0/go.mod h1:hbHb81YGFfRAgDZHE8J5kws/aDP1D40tkaTnhVfnqSw= -github.com/circonus-labs/circonusllhist v0.1.3 h1:TJH+oke8D16535+jHExHj4nQvzlZrj7ug5D7I/orNUA= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/circonus-labs/circonusllhist v0.1.4 h1:G5qJPuD16akpIXMUR7KcfBvrQOVm95+qyqUm+SEAZks= github.com/circonus-labs/circonusllhist v0.1.4/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= -github.com/circonus-labs/go-apiclient v0.7.6 h1:Pr69I76ReDKRKe/yb9o0lKphRQ/a6Jr8XLh7M4GZrPY= github.com/circonus-labs/go-apiclient v0.7.6/go.mod h1:RP/BcaTRf8MlHaMGCSuSDPGPQqyMeBxaAdwNv5CM/eQ= github.com/circonus-labs/go-apiclient v0.7.9 h1:OYDi4XeO8RLPW22RDKb0vIfd2mZUj4Hv12kRLtJQtKI= github.com/circonus-labs/go-apiclient v0.7.9/go.mod h1:7PoP39q4+O82aWOsd0/3bMvBon6HCBwrs7g+/DXczNc= @@ -195,7 +193,6 @@ github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcME github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= -github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/miekg/dns v1.1.26 h1:gPxPSwALAeHJSjarOs00QjVdV9QoBvc1D2ujQUr5BzU= @@ -250,7 +247,6 @@ github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3O github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= @@ -283,7 +279,6 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/tg123/go-htpasswd v1.0.0 h1:Ze/pZsz73JiCwXIyJBPvNs75asKBgfodCf8iTEkgkXs= github.com/tg123/go-htpasswd v1.0.0/go.mod h1:eQTgl67UrNKQvEPKrDLGBssjVwYQClFZjALVLhIv8C0= -github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926 h1:G3dpKMzFDjgEh2q1Z7zUUtKa8ViPtH+ocF0bE0g00O8= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c h1:u6SKchux2yDvFQnDHS3lPnIRmfVJ5Sxy3ao2SIdysLQ= github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c/go.mod h1:hzIxponao9Kjc7aWznkXaL4U4TWaDSs8zcsY4Ka08nM= @@ -317,7 +312,6 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201008141435-b3e1573b7520 h1:Bx6FllMpG4NWDOfhMBz1VR2QYNp/SAOHPIAsaVmxfPo= golang.org/x/sync v0.0.0-20201008141435-b3e1573b7520/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -387,6 +381,8 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/alexcesaro/statsd.v2 v2.0.0 h1:FXkZSCZIH17vLCO5sO2UucTHsH9pc+17F6pl3JVCwMc= +gopkg.in/alexcesaro/statsd.v2 v2.0.0/go.mod h1:i0ubccKGzBVNBpdGV5MocxyA/XlLUJzA7SLonnE4drU= gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUyUwEgHQXw849cJrilpS5NeIjOWESAw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= diff --git a/main.go b/main.go index e37fb9b30..4b160e82b 100644 --- a/main.go +++ b/main.go @@ -24,7 +24,10 @@ import ( "github.com/fabiolb/fabio/config" "github.com/fabiolb/fabio/exit" "github.com/fabiolb/fabio/logger" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" + "github.com/fabiolb/fabio/metrics4/flat" + "github.com/fabiolb/fabio/metrics4/label" + "github.com/fabiolb/fabio/metrics4/statsdraw" "github.com/fabiolb/fabio/noroute" "github.com/fabiolb/fabio/proxy" "github.com/fabiolb/fabio/proxy/tcp" @@ -120,9 +123,7 @@ func main() { registry.Default.DeregisterAll() }) - // init metrics early since that create the global metric registries - // that are used by other parts of the code. - initMetrics(cfg) + metrics := initMetrics(cfg) initRuntime(cfg) initBackend(cfg) @@ -134,12 +135,12 @@ func main() { go watchNoRouteHTML(cfg) first := make(chan bool) - go watchBackend(cfg, first) + go watchBackend(cfg, metrics, first) log.Print("[INFO] Waiting for first routing table") <-first // create proxies after metrics since they use the metrics registry. - startServers(cfg) + startServers(cfg, metrics) // warn again so that it is visible in the terminal WarnIfRunAsRoot(cfg.Insecure) @@ -148,15 +149,16 @@ func main() { log.Print("[INFO] Down") } -func newGrpcProxy(cfg *config.Config, tlscfg *tls.Config) []grpc.ServerOption { +func newGrpcProxy(cfg *config.Config, tlscfg *tls.Config, stats metrics4.Provider) []grpc.ServerOption { //Init Glob Cache globCache := route.NewGlobCache(cfg.GlobCacheSize) statsHandler := &proxy.GrpcStatsHandler{ - Connect: metrics.DefaultRegistry.GetCounter("grpc.conn"), - Request: metrics.DefaultRegistry.GetTimer("grpc.requests"), - NoRoute: metrics.DefaultRegistry.GetCounter("grpc.noroute"), + Connect: stats.NewCounter("grpc.conn"), + Request: stats.NewTimer("grpc.requests"), + NoRoute: stats.NewCounter("grpc.noroute"), + Metrics: stats, } proxyInterceptor := proxy.GrpcProxyInterceptor{ @@ -175,7 +177,7 @@ func newGrpcProxy(cfg *config.Config, tlscfg *tls.Config) []grpc.ServerOption { } } -func newHTTPProxy(cfg *config.Config) http.Handler { +func newHTTPProxy(cfg *config.Config, stats metrics4.Provider) *proxy.HTTPProxy { var w io.Writer //Init Glob Cache @@ -206,7 +208,7 @@ func newHTTPProxy(cfg *config.Config) http.Handler { pick := route.Picker[cfg.Proxy.Strategy] match := route.Matcher[cfg.Proxy.Matcher] - notFound := metrics.DefaultRegistry.GetCounter("notfound") + notFound := stats.NewCounter("notfound") log.Printf("[INFO] Using routing strategy %q", cfg.Proxy.Strategy) log.Printf("[INFO] Using route matching %q", cfg.Proxy.Matcher) @@ -235,26 +237,27 @@ func newHTTPProxy(cfg *config.Config) http.Handler { Lookup: func(r *http.Request) *route.Target { t := route.GetTable().Lookup(r, r.Header.Get("trace"), pick, match, globCache, cfg.GlobMatchingDisabled) if t == nil { - notFound.Inc(1) + notFound.Count(1) log.Print("[WARN] No route for ", r.Host, r.URL) } return t }, - Requests: metrics.DefaultRegistry.GetTimer("requests"), - Noroute: metrics.DefaultRegistry.GetCounter("notfound"), + Requests: stats.NewTimer("requests"), + Noroute: stats.NewCounter("notfound"), Logger: l, TracerCfg: cfg.Tracing, AuthSchemes: authSchemes, + WSConn: stats.NewGauge("ws.conn"), + Metrics: stats, } } -func lookupHostFn(cfg *config.Config) func(string) *route.Target { +func lookupHostFn(cfg *config.Config, notFound metrics4.Counter) func(string) *route.Target { pick := route.Picker[cfg.Proxy.Strategy] - notFound := metrics.DefaultRegistry.GetCounter("notfound") return func(host string) *route.Target { t := route.GetTable().LookupHost(host, pick) if t == nil { - notFound.Inc(1) + notFound.Count(1) log.Print("[WARN] No route for ", host) } return t @@ -321,7 +324,7 @@ func startAdmin(cfg *config.Config) { }() } -func startServers(cfg *config.Config) { +func startServers(cfg *config.Config, stats metrics4.Provider) { for _, l := range cfg.Listen { l := l // capture loop var for go routines below tlscfg, err := makeTLSConfig(l) @@ -334,17 +337,20 @@ func startServers(cfg *config.Config) { log.Printf("[INFO] Client certificate authentication enabled on %s", l.Addr) } + notFound := stats.NewCounter("notfound") switch l.Proto { case "http", "https": go func() { - h := newHTTPProxy(cfg) + h := newHTTPProxy(cfg, stats) + // reset the ws.conn gauge + h.WSConn.Update(0) if err := proxy.ListenAndServeHTTP(l, h, tlscfg); err != nil { exit.Fatal("[FATAL] ", err) } }() case "grpc", "grpcs": go func() { - h := newGrpcProxy(cfg, tlscfg) + h := newGrpcProxy(cfg, tlscfg, stats) if err := proxy.ListenAndServeGRPC(l, h, tlscfg); err != nil { exit.Fatal("[FATAL] ", err) } @@ -353,10 +359,11 @@ func startServers(cfg *config.Config) { go func() { h := &tcp.Proxy{ DialTimeout: cfg.Proxy.DialTimeout, - Lookup: lookupHostFn(cfg), - Conn: metrics.DefaultRegistry.GetCounter("tcp.conn"), - ConnFail: metrics.DefaultRegistry.GetCounter("tcp.connfail"), - Noroute: metrics.DefaultRegistry.GetCounter("tcp.noroute"), + Lookup: lookupHostFn(cfg, notFound), + Conn: stats.NewCounter("tcp.conn"), + ConnFail: stats.NewCounter("tcp.connfail"), + Noroute: stats.NewCounter("tcp.noroute"), + Metrics: stats, } if err := proxy.ListenAndServeTCP(l, h, tlscfg); err != nil { exit.Fatal("[FATAL] ", err) @@ -366,10 +373,11 @@ func startServers(cfg *config.Config) { go func() { h := &tcp.SNIProxy{ DialTimeout: cfg.Proxy.DialTimeout, - Lookup: lookupHostFn(cfg), - Conn: metrics.DefaultRegistry.GetCounter("tcp_sni.conn"), - ConnFail: metrics.DefaultRegistry.GetCounter("tcp_sni.connfail"), - Noroute: metrics.DefaultRegistry.GetCounter("tcp_sni.noroute"), + Lookup: lookupHostFn(cfg, notFound), + Conn: stats.NewCounter("tcp_sni.conn"), + ConnFail: stats.NewCounter("tcp_sni.connfail"), + Noroute: stats.NewCounter("tcp_sni.noroute"), + Metrics: stats, } if err := proxy.ListenAndServeTCP(l, h, tlscfg); err != nil { exit.Fatal("[FATAL] ", err) @@ -408,10 +416,11 @@ func startServers(cfg *config.Config) { go func() { h := &tcp.DynamicProxy{ DialTimeout: cfg.Proxy.DialTimeout, - Lookup: lookupHostFn(cfg), - Conn: metrics.DefaultRegistry.GetCounter("tcp.conn"), - ConnFail: metrics.DefaultRegistry.GetCounter("tcp.connfail"), - Noroute: metrics.DefaultRegistry.GetCounter("tcp.noroute"), + Lookup: lookupHostFn(cfg, notFound), + Conn: stats.NewCounter("tcp.conn"), + ConnFail: stats.NewCounter("tcp.connfail"), + Noroute: stats.NewCounter("tcp.noroute"), + Metrics: stats, } l.Addr = port if err := proxy.ListenAndServeTCP(l, h, tlscfg); err != nil { @@ -423,13 +432,14 @@ func startServers(cfg *config.Config) { }() case "https+tcp+sni": go func() { - hp := newHTTPProxy(cfg) + hp := newHTTPProxy(cfg, stats) tp := &tcp.SNIProxy{ DialTimeout: cfg.Proxy.DialTimeout, - Lookup: lookupHostFn(cfg), - Conn: metrics.DefaultRegistry.GetCounter("tcp_sni.conn"), - ConnFail: metrics.DefaultRegistry.GetCounter("tcp_sni.connfail"), - Noroute: metrics.DefaultRegistry.GetCounter("tcp_sni.noroute"), + Lookup: lookupHostFn(cfg, notFound), + Conn: stats.NewCounter("tcp_sni.conn"), + ConnFail: stats.NewCounter("tcp_sni.connfail"), + Noroute: stats.NewCounter("tcp_sni.noroute"), + Metrics: stats, } if err := proxy.ListenAndServeHTTPSTCPSNI(l, hp, tp, tlscfg, lookupHostMatcher(cfg)); err != nil { exit.Fatal("[FATAL] ", err) @@ -441,31 +451,33 @@ func startServers(cfg *config.Config) { } } -func initMetrics(cfg *config.Config) { - if cfg.Metrics.Target == "" { - log.Printf("[INFO] Metrics disabled") - return - } - - var deadline = time.Now().Add(cfg.Metrics.Timeout) - var err error - for { - metrics.DefaultRegistry, err = metrics.NewRegistry(cfg.Metrics) - if err == nil { - route.ServiceRegistry, err = metrics.NewRegistry(cfg.Metrics) - } - if err == nil { - return - } - if time.Now().After(deadline) { - exit.Fatal("[FATAL] ", err) - } - log.Print("[WARN] Error initializing metrics. ", err) - time.Sleep(cfg.Metrics.Retry) - if atomic.LoadInt32(&shuttingDown) > 0 { - exit.Exit(1) +func initMetrics(cfg *config.Config) metrics4.Provider { + var p []metrics4.Provider + for _, x := range strings.Split(cfg.Metrics.Target, ",") { + x = strings.TrimSpace(x) + switch x { + case "flat": + p = append(p, &flat.Provider{}) + case "label": + p = append(p, &label.Provider{}) + case "statsd_raw": + // prefix := cfg.Metrics.Prefix // prefix is a template and needs to be expanded + prefix := "" + pp, err := statsdraw.NewProvider(prefix, cfg.Metrics.StatsDAddr, cfg.Metrics.Interval) + if err != nil { + exit.Fatalf("[FATAL] Cannot initialize statsd metrics: %s", err) + } + p = append(p, pp) + default: + log.Printf("[WARN] Skipping unknown metrics provider %q", x) + continue } + log.Printf("[INFO] Registering metrics provider %q", x) } + if len(p) == 0 { + log.Printf("[INFO] Metrics disabled") + } + return metrics4.NewMultiProvider(p) } func initRuntime(cfg *config.Config) { @@ -519,7 +531,7 @@ func initBackend(cfg *config.Config) { } } -func watchBackend(cfg *config.Config, first chan bool) { +func watchBackend(cfg *config.Config, p metrics4.Provider, first chan bool) { var ( nextTable string lastTable string @@ -580,6 +592,29 @@ func watchBackend(cfg *config.Config, first chan bool) { } } +func unregisterMetrics(p metrics4.Provider, oldTable, newTable route.Table) { + names := func(t route.Table) map[string]bool { + m := map[string]bool{} + for _, routes := range t { + for _, r := range routes { + for _, t := range r.Targets { + m[t.TimerName.String()] = true + } + } + } + return m + } + + oldNames := names(oldTable) + newNames := names(newTable) + for n := range oldNames { + if !newNames[n] { + log.Printf("[INFO] Unregistering metric %s", n) + p.Unregister(n) + } + } +} + func watchNoRouteHTML(cfg *config.Config) { html := registry.Default.WatchNoRouteHTML() for { diff --git a/metrics4/flat/metrics.go b/metrics4/flat/metrics.go new file mode 100644 index 000000000..5abdaf71e --- /dev/null +++ b/metrics4/flat/metrics.go @@ -0,0 +1,49 @@ +package flat + +import ( + "fmt" + "time" + + "github.com/fabiolb/fabio/metrics4" + "github.com/fabiolb/fabio/metrics4/names" +) + +type Provider struct{} + +func (p *Provider) NewCounter(name string, labels ...string) metrics4.Counter { + return &Counter{Name: names.Flatten(name, labels, names.DotSeparator)} +} + +func (p *Provider) NewGauge(name string, labels ...string) metrics4.Gauge { + return &Gauge{Name: names.Flatten(name, labels, names.DotSeparator)} +} + +func (p *Provider) NewTimer(name string, labels ...string) metrics4.Timer { + return &Timer{Name: names.Flatten(name, labels, names.DotSeparator)} +} + +func (p *Provider) Unregister(interface{}) {} + +type Counter struct { + Name string +} + +func (c *Counter) Count(n int) { + fmt.Printf("%s:%d|c\n", c.Name, n) +} + +type Gauge struct { + Name string +} + +func (g *Gauge) Update(n int) { + fmt.Printf("%s:%d|g\n", g.Name, n) +} + +type Timer struct { + Name string +} + +func (t *Timer) Update(d time.Duration) { + fmt.Printf("%s:%d|ms\n", t.Name, d/time.Millisecond) +} diff --git a/metrics4/label/metrics.go b/metrics4/label/metrics.go new file mode 100644 index 000000000..4b18356e0 --- /dev/null +++ b/metrics4/label/metrics.go @@ -0,0 +1,52 @@ +package label + +import ( + "fmt" + "time" + + "github.com/fabiolb/fabio/metrics4" + "github.com/fabiolb/fabio/metrics4/names" +) + +type Provider struct{} + +func (p *Provider) NewCounter(name string, labels ...string) metrics4.Counter { + return &Counter{Name: name, Labels: labels} +} + +func (p *Provider) NewGauge(name string, labels ...string) metrics4.Gauge { + return &Gauge{Name: name, Labels: labels} +} + +func (p *Provider) NewTimer(name string, labels ...string) metrics4.Timer { + return &Timer{Name: name, Labels: labels} +} + +func (p *Provider) Unregister(interface{}) {} + +type Counter struct { + Name string + Labels []string +} + +func (c *Counter) Count(n int) { + fmt.Printf("%s:%d|c%s\n", c.Name, n, names.Labels(c.Labels, "|#", ":", ",")) +} + +type Gauge struct { + Name string + Labels []string +} + +func (g *Gauge) Update(n int) { + fmt.Printf("%s:%d|g%s\n", g.Name, n, names.Labels(g.Labels, "|#", ":", ",")) +} + +type Timer struct { + Name string + Labels []string +} + +func (t *Timer) Update(d time.Duration) { + fmt.Printf("%s:%d|ns%s\n", t.Name, d.Nanoseconds(), names.Labels(t.Labels, "|#", ":", ",")) +} diff --git a/metrics4/metrics.go b/metrics4/metrics.go new file mode 100644 index 000000000..bfa79caa9 --- /dev/null +++ b/metrics4/metrics.go @@ -0,0 +1,117 @@ +package metrics4 + +import ( + "time" +) + +// Provider is an abstraction of a metrics backend. +type Provider interface { + // NewCounter creates a new counter object. + NewCounter(name string, labels ...string) Counter + + // NewGauge creates a new gauge object. + NewGauge(name string, labels ...string) Gauge + + // NewTimer creates a new timer object. + NewTimer(name string, labels ...string) Timer + + // Unregister removes a previously registered + // name or metric. Required for go-metrics and + // service pruning. This signature is probably not + // correct. + Unregister(v interface{}) +} + +// MultiProvider wraps zero or more providers. +type MultiProvider struct { + p []Provider +} + +func NewMultiProvider(p []Provider) *MultiProvider { + return &MultiProvider{p} +} + +// NewCounter creates a MultiCounter with counter objects for all registered +// providers. +func (mp *MultiProvider) NewCounter(name string, labels ...string) Counter { + var c []Counter + for _, p := range mp.p { + c = append(c, p.NewCounter(name, labels...)) + } + return &MultiCounter{c} +} + +// NewGauge creates a MultiGauge with gauge objects for all registered +// providers. +func (mp *MultiProvider) NewGauge(name string, labels ...string) Gauge { + var v []Gauge + for _, p := range mp.p { + v = append(v, p.NewGauge(name, labels...)) + } + return &MultiGauge{v} +} + +// NewTimer creates a MultiTimer with timer objects for all registered +// providers. +func (mp *MultiProvider) NewTimer(name string, labels ...string) Timer { + var t []Timer + for _, p := range mp.p { + t = append(t, p.NewTimer(name, labels...)) + } + return &MultiTimer{t} +} + +// Unregister removes the metric object from all registered providers. +func (mp *MultiProvider) Unregister(v interface{}) { + for _, p := range mp.p { + p.Unregister(v) + } +} + +// Count measures a number. +type Counter interface { + Count(int) +} + +// MultiCounter wraps zero or more counters. +type MultiCounter struct { + c []Counter +} + +func (mc *MultiCounter) Count(n int) { + for _, c := range mc.c { + c.Count(n) + } +} + +// Gauge measures a value. +type Gauge interface { + Update(int) +} + +// MultiGauge wraps zero or more gauges. +type MultiGauge struct { + v []Gauge +} + +func (m *MultiGauge) Update(n int) { + for _, v := range m.v { + v.Update(n) + } +} + +// Timer measures the time of an event. +type Timer interface { + Update(time.Duration) +} + +// MultTimer wraps zero or more timers. +type MultiTimer struct { + t []Timer +} + +func (mt *MultiTimer) Update(d time.Duration) { + for _, t := range mt.t { + t.Update(d) + } +} diff --git a/metrics4/names/names.go b/metrics4/names/names.go new file mode 100644 index 000000000..8ec1b64d1 --- /dev/null +++ b/metrics4/names/names.go @@ -0,0 +1,43 @@ +package names + +import ( + "net/url" + "strings" +) + +type Service struct { + Service string + Host string + Path string + TargetURL *url.URL +} + +func (s Service) String() string { + return s.Service +} + +const DotSeparator = "." +const PipeSeparator = "|" + +func Flatten(name string, labels []string, separator string) string { + if len(labels) == 0 { + return name + } + return name + separator + strings.Join(labels, separator) +} + +// todo(fs): this function probably allocates like crazy. If on the stack then it might be ok. +// todo(fs): otherwise, give some love. +func Labels(labels []string, prefix, fieldsep, recsep string) string { + if len(labels) == 0 { + return "" + } + if len(labels)%2 != 0 { + labels = append(labels, "???") + } + var fields []string + for i := 0; i < len(labels); i += 2 { + fields = append(fields, labels[i]+fieldsep+labels[i+1]) + } + return prefix + strings.Join(fields, recsep) +} diff --git a/metrics4/statsdraw/statsd.go b/metrics4/statsdraw/statsd.go new file mode 100644 index 000000000..d67906578 --- /dev/null +++ b/metrics4/statsdraw/statsd.go @@ -0,0 +1,73 @@ +package statsdraw + +import ( + "time" + + "github.com/alexcesaro/statsd" + "github.com/fabiolb/fabio/metrics4" + "github.com/fabiolb/fabio/metrics4/names" +) + +type Provider struct { + c *statsd.Client +} + +func NewProvider(prefix, addr string, interval time.Duration) (*Provider, error) { + opts := []statsd.Option{ + statsd.Address(addr), + statsd.FlushPeriod(interval), + } + if prefix != "" { + opts = append(opts, statsd.Prefix(prefix)) + } + + c, err := statsd.New(opts...) + if err != nil { + return nil, err + } + return &Provider{c}, nil +} + +func (p *Provider) NewCounter(name string, labels ...string) metrics4.Counter { + return &Counter{c: p.c, name: name, labels: labels} +} + +func (p *Provider) NewGauge(name string, labels ...string) metrics4.Gauge { + return &Gauge{c: p.c, name: name, labels: labels} +} + +func (p *Provider) NewTimer(name string, labels ...string) metrics4.Timer { + return &Timer{c: p.c, name: name, labels: labels} +} + +func (p *Provider) Unregister(interface{}) {} + +type Counter struct { + c *statsd.Client + name string + labels []string +} + +func (v *Counter) Count(n int) { + v.c.Count(names.Flatten(v.name, v.labels, names.DotSeparator), n) +} + +type Gauge struct { + c *statsd.Client + name string + labels []string +} + +func (v *Gauge) Update(n int) { + v.c.Gauge(names.Flatten(v.name, v.labels, names.DotSeparator), n) +} + +type Timer struct { + c *statsd.Client + name string + labels []string +} + +func (v *Timer) Update(d time.Duration) { + v.c.Timing(names.Flatten(v.name, v.labels, names.DotSeparator), d) +} diff --git a/metrics/circonus.go b/metrics_old/circonus.go similarity index 100% rename from metrics/circonus.go rename to metrics_old/circonus.go diff --git a/metrics/circonus_test.go b/metrics_old/circonus_test.go similarity index 100% rename from metrics/circonus_test.go rename to metrics_old/circonus_test.go diff --git a/metrics/gometrics.go b/metrics_old/gometrics.go similarity index 100% rename from metrics/gometrics.go rename to metrics_old/gometrics.go diff --git a/metrics/metrics.go b/metrics_old/metrics.go similarity index 100% rename from metrics/metrics.go rename to metrics_old/metrics.go diff --git a/metrics/metrics_test.go b/metrics_old/metrics_test.go similarity index 100% rename from metrics/metrics_test.go rename to metrics_old/metrics_test.go diff --git a/metrics/noop.go b/metrics_old/noop.go similarity index 100% rename from metrics/noop.go rename to metrics_old/noop.go diff --git a/metrics/registry.go b/metrics_old/registry.go similarity index 100% rename from metrics/registry.go rename to metrics_old/registry.go diff --git a/proxy/grpc_handler.go b/proxy/grpc_handler.go index 863d834d8..86471d880 100644 --- a/proxy/grpc_handler.go +++ b/proxy/grpc_handler.go @@ -8,12 +8,11 @@ import ( "net" "net/http" "net/url" - "strings" "sync" "time" "github.com/fabiolb/fabio/config" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" "github.com/fabiolb/fabio/route" grpc_proxy "github.com/mwitkow/grpc-proxy/proxy" "google.golang.org/grpc" @@ -99,7 +98,7 @@ func (g GrpcProxyInterceptor) Stream(srv interface{}, stream grpc.ServerStream, } if target == nil { - g.StatsHandler.NoRoute.Inc(1) + g.StatsHandler.NoRoute.Count(1) log.Println("[WARN] grpc: no route found for", info.FullMethod) return status.Error(codes.NotFound, "no route found") } @@ -158,9 +157,10 @@ func (g GrpcProxyInterceptor) lookup(ctx context.Context, fullMethodName string) } type GrpcStatsHandler struct { - Connect metrics.Counter - Request metrics.Timer - NoRoute metrics.Counter + Connect metrics4.Counter + Request metrics4.Timer + NoRoute metrics4.Counter + Metrics metrics4.Provider } type connCtxKey struct{} @@ -175,6 +175,11 @@ func (h *GrpcStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) c } func (h *GrpcStatsHandler) HandleRPC(ctx context.Context, rpc stats.RPCStats) { + metrics := h.Metrics + if metrics == nil { + metrics = &metrics4.MultiProvider{} + } + rpcStats, _ := rpc.(*stats.End) if rpcStats == nil { @@ -186,7 +191,7 @@ func (h *GrpcStatsHandler) HandleRPC(ctx context.Context, rpc stats.RPCStats) { h.Request.Update(dur) s, _ := status.FromError(rpcStats.Error) - metrics.DefaultRegistry.GetTimer(fmt.Sprintf("grpc.status.%s", strings.ToLower(s.Code().String()))) + metrics.NewTimer("grpc.status", "code", s.Code().String()).Update(dur) } // HandleConn processes the Conn stats. @@ -194,7 +199,7 @@ func (h *GrpcStatsHandler) HandleConn(ctx context.Context, conn stats.ConnStats) connBegin, _ := conn.(*stats.ConnBegin) if connBegin != nil { - h.Connect.Inc(1) + h.Connect.Count(1) } } diff --git a/proxy/http_proxy.go b/proxy/http_proxy.go index 889d071fa..b49957e72 100644 --- a/proxy/http_proxy.go +++ b/proxy/http_proxy.go @@ -15,7 +15,7 @@ import ( "github.com/fabiolb/fabio/auth" "github.com/fabiolb/fabio/config" "github.com/fabiolb/fabio/logger" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" "github.com/fabiolb/fabio/noroute" "github.com/fabiolb/fabio/proxy/gzip" "github.com/fabiolb/fabio/route" @@ -46,11 +46,17 @@ type HTTPProxy struct { Lookup func(*http.Request) *route.Target // Requests is a timer metric which is updated for every request. - Requests metrics.Timer + Requests metrics4.Timer // Noroute is a counter metric which is updated for every request // where Lookup() returns nil. - Noroute metrics.Counter + Noroute metrics4.Counter + + // WSConn counts the number of open web socket connections. + WSConn metrics4.Gauge + + // Metrics is the configured metrics backend provider. + Metrics metrics4.Provider // Logger is the access logger for the requests. Logger logger.Logger @@ -71,6 +77,11 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { panic("no lookup function") } + metrics := p.Metrics + if metrics == nil { + metrics = &metrics4.MultiProvider{} + } + if p.Config.RequestID != "" { id := p.UUID if id == nil { @@ -122,7 +133,7 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { if t.Timer != nil { t.Timer.Update(0) } - metrics.DefaultRegistry.GetTimer(key(t.RedirectCode)).Update(0) + metrics.NewCounter("http.status", "code", strconv.Itoa(t.RedirectCode)).Count(1) return } @@ -184,9 +195,9 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { if targetURL.Scheme == "https" || targetURL.Scheme == "wss" { h = newWSHandler(targetURL.Host, func(network, address string) (net.Conn, error) { return tls.Dial(network, address, tr.(*http.Transport).TLSClientConfig) - }) + }, p.WSConn) } else { - h = newWSHandler(targetURL.Host, net.Dial) + h = newWSHandler(targetURL.Host, net.Dial, p.WSConn) } case accept == "text/event-stream": @@ -223,7 +234,7 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - metrics.DefaultRegistry.GetTimer(key(rw.code)).Update(dur) + metrics.NewTimer("http.status", "code", strconv.Itoa(rw.code)).Update(dur) // write access log if p.Logger != nil { diff --git a/proxy/tcp/copy_buffer.go b/proxy/tcp/copy_buffer.go index c06c50fd1..ce7395b14 100644 --- a/proxy/tcp/copy_buffer.go +++ b/proxy/tcp/copy_buffer.go @@ -3,12 +3,12 @@ package tcp import ( "io" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" ) // copyBuffer is an adapted version of io.copyBuffer which updates a // counter instead of returning the total bytes written. -func copyBuffer(dst io.Writer, src io.Reader, c metrics.Counter) (err error) { +func copyBuffer(dst io.Writer, src io.Reader, c metrics4.Counter) (err error) { buf := make([]byte, 32*1024) for { nr, er := src.Read(buf) @@ -16,7 +16,7 @@ func copyBuffer(dst io.Writer, src io.Reader, c metrics.Counter) (err error) { nw, ew := dst.Write(buf[0:nr]) if nw > 0 { if c != nil { - c.Inc(int64(nw)) + c.Count(nw) } } if ew != nil { diff --git a/proxy/tcp/sni_proxy.go b/proxy/tcp/sni_proxy.go index 6a40ee8a1..4cd184b5c 100644 --- a/proxy/tcp/sni_proxy.go +++ b/proxy/tcp/sni_proxy.go @@ -7,7 +7,7 @@ import ( "net" "time" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" "github.com/fabiolb/fabio/route" ) @@ -26,20 +26,28 @@ type SNIProxy struct { Lookup func(host string) *route.Target // Conn counts the number of connections. - Conn metrics.Counter + Conn metrics4.Counter // ConnFail counts the failed upstream connection attempts. - ConnFail metrics.Counter + ConnFail metrics4.Counter // Noroute counts the failed Lookup() calls. - Noroute metrics.Counter + Noroute metrics4.Counter + + // Metrics is the configured metrics backend provider. + Metrics metrics4.Provider } func (p *SNIProxy) ServeTCP(in net.Conn) error { defer in.Close() + metrics := p.Metrics + if metrics == nil { + metrics = &metrics4.MultiProvider{} + } + if p.Conn != nil { - p.Conn.Inc(1) + p.Conn.Count(1) } tlsReader := bufio.NewReader(in) @@ -47,7 +55,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if err != nil { log.Print("[DEBUG] tcp+sni: TLS handshake failed (failed to peek data)") if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return err } @@ -56,7 +64,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if err != nil { log.Printf("[DEBUG] tcp+sni: TLS handshake failed (%s)", err) if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return err } @@ -66,7 +74,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if err != nil { log.Printf("[DEBUG] tcp+sni: TLS handshake failed (%s)", err) if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return err } @@ -77,7 +85,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if !ok { log.Print("[DEBUG] tcp+sni: TLS handshake failed (unable to parse client hello)") if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return nil } @@ -85,7 +93,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if host == "" { log.Print("[DEBUG] tcp+sni: server_name missing") if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return nil } @@ -93,7 +101,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { t := p.Lookup(host) if t == nil { if p.Noroute != nil { - p.Noroute.Inc(1) + p.Noroute.Count(1) } return nil } @@ -107,7 +115,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if err != nil { log.Print("[WARN] tcp+sni: cannot connect to upstream ", addr) if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return err } @@ -119,7 +127,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if err != nil { log.Print("[WARN] tcp+sni: write proxy protocol header failed. ", err) if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return err } @@ -130,23 +138,23 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if err != nil { log.Print("[WARN] tcp+sni: copy client hello failed. ", err) if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return err } errc := make(chan error, 2) - cp := func(dst io.Writer, src io.Reader, c metrics.Counter) { + cp := func(dst io.Writer, src io.Reader, c metrics4.Counter) { errc <- copyBuffer(dst, src, c) } // rx measures the traffic to the upstream server (in <- out) // tx measures the traffic from the upstream server (out <- in) - rx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".rx") - tx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".tx") + rx := metrics.NewCounter(t.TimerName.String() + ".rx") + tx := metrics.NewCounter(t.TimerName.String() + ".tx") // we've received the ClientHello already - rx.Inc(int64(n)) + rx.Count(n) go cp(in, out, rx) go cp(out, in, tx) diff --git a/proxy/tcp/tcp_dynamic_proxy.go b/proxy/tcp/tcp_dynamic_proxy.go index 2cd4ad8b3..3c49fe19c 100644 --- a/proxy/tcp/tcp_dynamic_proxy.go +++ b/proxy/tcp/tcp_dynamic_proxy.go @@ -6,7 +6,7 @@ import ( "net" "time" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" "github.com/fabiolb/fabio/route" ) @@ -21,26 +21,34 @@ type DynamicProxy struct { Lookup func(host string) *route.Target // Conn counts the number of connections. - Conn metrics.Counter + Conn metrics4.Counter // ConnFail counts the failed upstream connection attempts. - ConnFail metrics.Counter + ConnFail metrics4.Counter // Noroute counts the failed Lookup() calls. - Noroute metrics.Counter + Noroute metrics4.Counter + + // Metrics is the configured metrics backend provider. + Metrics metrics4.Provider } func (p *DynamicProxy) ServeTCP(in net.Conn) error { defer in.Close() + metrics := p.Metrics + if metrics == nil { + metrics = &metrics4.MultiProvider{} + } + if p.Conn != nil { - p.Conn.Inc(1) + p.Conn.Count(1) } target := in.LocalAddr().String() t := p.Lookup(target) if t == nil { if p.Noroute != nil { - p.Noroute.Inc(1) + p.Noroute.Count(1) } return nil } @@ -55,21 +63,21 @@ func (p *DynamicProxy) ServeTCP(in net.Conn) error { if err != nil { log.Print("[WARN] tcp: cannot connect to upstream ", addr) if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return err } defer out.Close() errc := make(chan error, 2) - cp := func(dst io.Writer, src io.Reader, c metrics.Counter) { + cp := func(dst io.Writer, src io.Reader, c metrics4.Counter) { errc <- copyBuffer(dst, src, c) } // rx measures the traffic to the upstream server (in <- out) // tx measures the traffic from the upstream server (out <- in) - rx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".rx") - tx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".tx") + rx := metrics.NewCounter(t.TimerName.String() + ".rx") + tx := metrics.NewCounter(t.TimerName.String() + ".tx") go cp(in, out, rx) go cp(out, in, tx) diff --git a/proxy/tcp/tcp_proxy.go b/proxy/tcp/tcp_proxy.go index a6c238937..15953cfe3 100644 --- a/proxy/tcp/tcp_proxy.go +++ b/proxy/tcp/tcp_proxy.go @@ -6,7 +6,7 @@ import ( "net" "time" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" "github.com/fabiolb/fabio/route" ) @@ -21,20 +21,28 @@ type Proxy struct { Lookup func(host string) *route.Target // Conn counts the number of connections. - Conn metrics.Counter + Conn metrics4.Counter // ConnFail counts the failed upstream connection attempts. - ConnFail metrics.Counter + ConnFail metrics4.Counter // Noroute counts the failed Lookup() calls. - Noroute metrics.Counter + Noroute metrics4.Counter + + // Metrics is the configured metrics backend provider. + Metrics metrics4.Provider } func (p *Proxy) ServeTCP(in net.Conn) error { defer in.Close() + metrics := p.Metrics + if metrics == nil { + metrics = &metrics4.MultiProvider{} + } + if p.Conn != nil { - p.Conn.Inc(1) + p.Conn.Count(1) } _, port, _ := net.SplitHostPort(in.LocalAddr().String()) @@ -42,7 +50,7 @@ func (p *Proxy) ServeTCP(in net.Conn) error { t := p.Lookup(port) if t == nil { if p.Noroute != nil { - p.Noroute.Inc(1) + p.Noroute.Count(1) } return nil } @@ -56,7 +64,7 @@ func (p *Proxy) ServeTCP(in net.Conn) error { if err != nil { log.Print("[WARN] tcp: cannot connect to upstream ", addr) if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return err } @@ -68,21 +76,21 @@ func (p *Proxy) ServeTCP(in net.Conn) error { if err != nil { log.Print("[WARN] tcp: write proxy protocol header failed. ", err) if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Count(1) } return err } } errc := make(chan error, 2) - cp := func(dst io.Writer, src io.Reader, c metrics.Counter) { + cp := func(dst io.Writer, src io.Reader, c metrics4.Counter) { errc <- copyBuffer(dst, src, c) } // rx measures the traffic to the upstream server (in <- out) // tx measures the traffic from the upstream server (out <- in) - rx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".rx") - tx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".tx") + rx := metrics.NewCounter(t.TimerName.String() + ".rx") + tx := metrics.NewCounter(t.TimerName.String() + ".tx") go cp(in, out, rx) go cp(out, in, tx) diff --git a/proxy/ws_handler.go b/proxy/ws_handler.go index 830f6c15e..85188b364 100644 --- a/proxy/ws_handler.go +++ b/proxy/ws_handler.go @@ -7,13 +7,14 @@ import ( "net" "net/http" "strings" + "sync/atomic" "time" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" ) -// conn measures the number of open web socket connections -var conn = metrics.DefaultRegistry.GetCounter("ws.conn") +// conns keeps track of the number of open ws connections +var conns int64 type dialFunc func(network, address string) (net.Conn, error) @@ -21,10 +22,14 @@ type dialFunc func(network, address string) (net.Conn, error) // an incoming and outgoing websocket connection. It checks whether // the handshake was completed successfully before forwarding data // between the client and server. -func newWSHandler(host string, dial dialFunc) http.Handler { +func newWSHandler(host string, dial dialFunc, conn metrics4.Gauge) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - conn.Inc(1) - defer func() { conn.Inc(-1) }() + if conn != nil { + conn.Update(int(atomic.AddInt64(&conns, 1))) + defer func() { + conn.Update(int(atomic.AddInt64(&conns, -1))) + }() + } hj, ok := w.(http.Hijacker) if !ok { diff --git a/route/route.go b/route/route.go index 69e865e97..3740b4c22 100644 --- a/route/route.go +++ b/route/route.go @@ -9,7 +9,7 @@ import ( "strconv" "strings" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4/names" "github.com/gobwas/glob" ) @@ -54,22 +54,21 @@ func (r *Route) addTarget(service string, targetURL *url.URL, fixedWeight float6 } } - name, err := metrics.TargetName(service, r.Host, r.Path, targetURL) - if err != nil { - log.Printf("[ERROR] Invalid metrics name: %s", err) - name = "unknown" - } - t := &Target{ Service: service, Tags: tags, Opts: opts, URL: targetURL, FixedWeight: fixedWeight, - Timer: ServiceRegistry.GetTimer(name), - TimerName: name, + TimerName: names.Service{ + Service: service, + Host: r.Host, + Path: r.Path, + TargetURL: targetURL, + }, } + var err error if opts != nil { t.StripPath = opts["strip"] t.TLSSkipVerify = opts["tlsskipverify"] == "true" diff --git a/route/table.go b/route/table.go index e6e59bc22..f6e0b26af 100644 --- a/route/table.go +++ b/route/table.go @@ -10,10 +10,8 @@ import ( "net/url" "sort" "strings" - "sync" "sync/atomic" - "github.com/fabiolb/fabio/metrics" "github.com/gobwas/glob" ) @@ -24,9 +22,6 @@ var errNoMatch = errors.New("route: no target match") // table stores the active routing table. Must never be nil. var table atomic.Value -// ServiceRegistry stores the metrics for the services. -var ServiceRegistry metrics.Registry = metrics.NoopRegistry{} - // init initializes the routing table. func init() { table.Store(make(Table)) @@ -39,9 +34,6 @@ func GetTable() Table { return table.Load().(Table) } -// mu guards table and registry in SetTable. -var mu sync.Mutex - // SetTable sets the active routing table. A nil value // logs a warning and is ignored. The function is safe // to be called from multiple goroutines. @@ -50,42 +42,7 @@ func SetTable(t Table) { log.Print("[WARN] Ignoring nil routing table") return } - mu.Lock() table.Store(t) - syncRegistry(t) - mu.Unlock() -} - -// syncRegistry unregisters all inactive timers. -// It assumes that all timers of the table have -// already been registered. -func syncRegistry(t Table) { - timers := map[string]bool{} - - // get all registered timers - for _, name := range ServiceRegistry.Names() { - timers[name] = false - } - - // mark the ones from this table as active. - // this can also add new entries but we do not - // really care since we are only interested in the - // inactive ones. - for _, routes := range t { - for _, r := range routes { - for _, tg := range r.Targets { - timers[tg.TimerName] = true - } - } - } - - // unregister inactive timers - for name, active := range timers { - if !active { - ServiceRegistry.Unregister(name) - log.Printf("[INFO] Unregistered timer %s", name) - } - } } // Table contains a set of routes grouped by host. diff --git a/route/table_registry_test.go b/route/table_registry_test.go index 3616dd9c5..5607fe3fe 100644 --- a/route/table_registry_test.go +++ b/route/table_registry_test.go @@ -1,63 +1,55 @@ package route -import ( - "reflect" - "sort" - "testing" - - "github.com/fabiolb/fabio/metrics" -) - -func TestSyncRegistry(t *testing.T) { - oldRegistry := ServiceRegistry - ServiceRegistry = newStubRegistry() - defer func() { ServiceRegistry = oldRegistry }() - - tbl := make(Table) - tbl.addRoute(&RouteDef{Service: "svc-a", Src: "/aaa", Dst: "http://localhost:1234", Weight: 1}) - tbl.addRoute(&RouteDef{Service: "svc-b", Src: "/bbb", Dst: "http://localhost:5678", Weight: 1}) - if got, want := ServiceRegistry.Names(), []string{"svc-a._./aaa.localhost_1234", "svc-b._./bbb.localhost_5678"}; !reflect.DeepEqual(got, want) { - t.Fatalf("got %v want %v", got, want) - } - - tbl.delRoute(&RouteDef{Service: "svc-b", Src: "/bbb", Dst: "http://localhost:5678"}) - syncRegistry(tbl) - if got, want := ServiceRegistry.Names(), []string{"svc-a._./aaa.localhost_1234"}; !reflect.DeepEqual(got, want) { - t.Fatalf("got %v want %v", got, want) - } -} - -func newStubRegistry() metrics.Registry { - return &stubRegistry{names: make(map[string]bool)} -} - -type stubRegistry struct { - names map[string]bool -} - -func (p *stubRegistry) Names() []string { - n := []string{} - for k := range p.names { - n = append(n, k) - } - sort.Strings(n) - return n -} - -func (p *stubRegistry) Unregister(name string) { - delete(p.names, name) -} - -func (p *stubRegistry) UnregisterAll() { - p.names = map[string]bool{} -} - -func (p *stubRegistry) GetCounter(name string) metrics.Counter { - p.names[name] = true - return metrics.NoopCounter{} -} - -func (p *stubRegistry) GetTimer(name string) metrics.Timer { - p.names[name] = true - return metrics.NoopTimer{} -} +// func TestSyncRegistry(t *testing.T) { +// oldRegistry := ServiceRegistry +// ServiceRegistry = newStubRegistry() +// defer func() { ServiceRegistry = oldRegistry }() +// +// tbl := make(Table) +// tbl.addRoute(&RouteDef{Service: "svc-a", Src: "/aaa", Dst: "http://localhost:1234", Weight: 1}) +// tbl.addRoute(&RouteDef{Service: "svc-b", Src: "/bbb", Dst: "http://localhost:5678", Weight: 1}) +// if got, want := ServiceRegistry.Names(), []string{"svc-a._./aaa.localhost_1234", "svc-b._./bbb.localhost_5678"}; !reflect.DeepEqual(got, want) { +// t.Fatalf("got %v want %v", got, want) +// } +// +// tbl.delRoute(&RouteDef{Service: "svc-b", Src: "/bbb", Dst: "http://localhost:5678"}) +// syncRegistry(tbl) +// if got, want := ServiceRegistry.Names(), []string{"svc-a._./aaa.localhost_1234"}; !reflect.DeepEqual(got, want) { +// t.Fatalf("got %v want %v", got, want) +// } +// } +// +// func newStubRegistry() metrics.Registry { +// return &stubRegistry{names: make(map[string]bool)} +// } +// +// type stubRegistry struct { +// names map[string]bool +// } +// +// func (p *stubRegistry) Names() []string { +// n := []string{} +// for k := range p.names { +// n = append(n, k) +// } +// sort.Strings(n) +// return n +// } +// +// func (p *stubRegistry) Unregister(name string) { +// delete(p.names, name) +// } +// +// func (p *stubRegistry) UnregisterAll() { +// p.names = map[string]bool{} +// } +// +// func (p *stubRegistry) GetCounter(name string) metrics.Counter { +// p.names[name] = true +// return metrics.NoopCounter{} +// } +// +// func (p *stubRegistry) GetTimer(name string) metrics.Timer { +// p.names[name] = true +// return metrics.NoopTimer{} +// } diff --git a/route/target.go b/route/target.go index 24341142a..f3064ab1d 100644 --- a/route/target.go +++ b/route/target.go @@ -4,7 +4,8 @@ import ( "net/url" "strings" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" + "github.com/fabiolb/fabio/metrics4/names" ) type Target struct { @@ -50,10 +51,10 @@ type Target struct { Weight float64 // Timer measures throughput and latency of this target - Timer metrics.Timer + Timer metrics4.Timer // TimerName is the name of the timer in the metrics registry - TimerName string + TimerName names.Service // accessRules is map of access information for the target. accessRules map[string][]interface{} diff --git a/vendor/github.com/alexcesaro/statsd/.travis.yml b/vendor/github.com/alexcesaro/statsd/.travis.yml new file mode 100644 index 000000000..48915e737 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/.travis.yml @@ -0,0 +1,9 @@ +language: go + +go: + - 1.2 + - 1.3 + - 1.4 + - 1.5 + - 1.6 + - tip diff --git a/vendor/github.com/alexcesaro/statsd/CHANGELOG.md b/vendor/github.com/alexcesaro/statsd/CHANGELOG.md new file mode 100644 index 000000000..04d811b71 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/CHANGELOG.md @@ -0,0 +1,64 @@ +# Change Log +All notable changes to this project will be documented in this file. +This project adheres to [Semantic Versioning](http://semver.org/). + +## [2.0.0] - 2016-03-20 + +- `New` signature changed. The default address used is now ":8125". To use + another address use the `Address` option: + + Before: + ``` + statsd.New(":8125") + statsd.New(":9000") + ``` + + After + ``` + statsd.New() + statsd.New(statsd.Address(":9000")) + ``` + +- The `rate` parameter has been removed from the `Count` and `Timing` methods. + Use the new `SampleRate` option instead. + +- `Count`, `Gauge` and `Timing` now accept a `interface{}` instead of an int as + the value parameter. So you can now use any type of integer or float in these + functions. + +- The `WithInfluxDBTags` and `WithDatadogTags` options were replaced by the + `TagsFormat` and `Tags` options: + + Before: + ``` + statsd.New(statsd.WithInfluxDBTags("tag", "value")) + statsd.New(statsd.WithDatadogTags("tag", "value")) + ``` + + After + ``` + statsd.New(statsd.TagsFormat(statsd.InfluxDB), statsd.Tags("tag", "value")) + statsd.New(statsd.TagsFormat(statsd.Datadog), statsd.Tags("tag", "value")) + ``` + +- All options whose named began by `With` had the `With` stripped: + + Before: + ``` + statsd.New(statsd.WithMaxPacketSize(65000)) + ``` + + After + ``` + statsd.New(statsd.MaxPacketSize(65000)) + ``` + +- `ChangeGauge` has been removed as it is a bad practice: UDP packets can be + lost so using relative changes can cause unreliable values in the long term. + Use `Gauge` instead which sends an absolute value. + +- The `Histogram` method has been added. + +- The `Clone` method was added to the `Client`, it allows to create a new + `Client` with different rate / prefix / tags parameters while still using the + same connection. diff --git a/vendor/github.com/alexcesaro/statsd/LICENSE b/vendor/github.com/alexcesaro/statsd/LICENSE new file mode 100644 index 000000000..4ec7268d5 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2015 Alexandre Cesaro + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/alexcesaro/statsd/README.md b/vendor/github.com/alexcesaro/statsd/README.md new file mode 100644 index 000000000..774a1c687 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/README.md @@ -0,0 +1,50 @@ +# statsd +[![Build Status](https://travis-ci.org/alexcesaro/statsd.svg?branch=v2)](https://travis-ci.org/alexcesaro/statsd) [![Code Coverage](http://gocover.io/_badge/gopkg.in/alexcesaro/statsd.v2)](http://gocover.io/gopkg.in/alexcesaro/statsd.v2) [![Documentation](https://godoc.org/gopkg.in/alexcesaro/statsd.v2?status.svg)](https://godoc.org/gopkg.in/alexcesaro/statsd.v2) + +## Introduction + +statsd is a simple and efficient [Statsd](https://github.com/etsy/statsd) +client. + +See the [benchmark](https://github.com/alexcesaro/statsdbench) for a comparison +with other Go StatsD clients. + +## Features + +- Supports all StatsD metrics: counter, gauge, timing and set +- Supports InfluxDB and Datadog tags +- Fast and GC-friendly: all functions for sending metrics do not allocate +- Efficient: metrics are buffered by default +- Simple and clean API +- 100% test coverage +- Versioned API using gopkg.in + + +## Documentation + +https://godoc.org/gopkg.in/alexcesaro/statsd.v2 + + +## Download + + go get gopkg.in/alexcesaro/statsd.v2 + + +## Example + +See the [examples in the documentation](https://godoc.org/gopkg.in/alexcesaro/statsd.v2#example-package). + + +## License + +[MIT](LICENSE) + + +## Contribute + +Do you have any question the documentation does not answer? Is there a use case +that you feel is common and is not well-addressed by the current API? + +If so you are more than welcome to ask questions in the +[thread on golang-nuts](https://groups.google.com/d/topic/golang-nuts/Tz6t4_iLgnw/discussion) +or open an issue or send a pull-request here on Github. diff --git a/vendor/github.com/alexcesaro/statsd/conn.go b/vendor/github.com/alexcesaro/statsd/conn.go new file mode 100644 index 000000000..4dbda6309 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/conn.go @@ -0,0 +1,270 @@ +package statsd + +import ( + "io" + "math/rand" + "net" + "strconv" + "sync" + "time" +) + +type conn struct { + // Fields settable with options at Client's creation. + addr string + errorHandler func(error) + flushPeriod time.Duration + maxPacketSize int + network string + tagFormat TagFormat + + mu sync.Mutex + // Fields guarded by the mutex. + closed bool + w io.WriteCloser + buf []byte + rateCache map[float32]string +} + +func newConn(conf connConfig, muted bool) (*conn, error) { + c := &conn{ + addr: conf.Addr, + errorHandler: conf.ErrorHandler, + flushPeriod: conf.FlushPeriod, + maxPacketSize: conf.MaxPacketSize, + network: conf.Network, + tagFormat: conf.TagFormat, + } + + if muted { + return c, nil + } + + var err error + c.w, err = dialTimeout(c.network, c.addr, 5*time.Second) + if err != nil { + return c, err + } + // When using UDP do a quick check to see if something is listening on the + // given port to return an error as soon as possible. + if c.network[:3] == "udp" { + for i := 0; i < 2; i++ { + _, err = c.w.Write(nil) + if err != nil { + _ = c.w.Close() + c.w = nil + return c, err + } + } + } + + // To prevent a buffer overflow add some capacity to the buffer to allow for + // an additional metric. + c.buf = make([]byte, 0, c.maxPacketSize+200) + + if c.flushPeriod > 0 { + go func() { + ticker := time.NewTicker(c.flushPeriod) + for _ = range ticker.C { + c.mu.Lock() + if c.closed { + ticker.Stop() + c.mu.Unlock() + return + } + c.flush(0) + c.mu.Unlock() + } + }() + } + + return c, nil +} + +func (c *conn) metric(prefix, bucket string, n interface{}, typ string, rate float32, tags string) { + c.mu.Lock() + l := len(c.buf) + c.appendBucket(prefix, bucket, tags) + c.appendNumber(n) + c.appendType(typ) + c.appendRate(rate) + c.closeMetric(tags) + c.flushIfBufferFull(l) + c.mu.Unlock() +} + +func (c *conn) gauge(prefix, bucket string, value interface{}, tags string) { + c.mu.Lock() + l := len(c.buf) + // To set a gauge to a negative value we must first set it to 0. + // https://github.com/etsy/statsd/blob/master/docs/metric_types.md#gauges + if isNegative(value) { + c.appendBucket(prefix, bucket, tags) + c.appendGauge(0, tags) + } + c.appendBucket(prefix, bucket, tags) + c.appendGauge(value, tags) + c.flushIfBufferFull(l) + c.mu.Unlock() +} + +func (c *conn) appendGauge(value interface{}, tags string) { + c.appendNumber(value) + c.appendType("g") + c.closeMetric(tags) +} + +func (c *conn) unique(prefix, bucket string, value string, tags string) { + c.mu.Lock() + l := len(c.buf) + c.appendBucket(prefix, bucket, tags) + c.appendString(value) + c.appendType("s") + c.closeMetric(tags) + c.flushIfBufferFull(l) + c.mu.Unlock() +} + +func (c *conn) appendByte(b byte) { + c.buf = append(c.buf, b) +} + +func (c *conn) appendString(s string) { + c.buf = append(c.buf, s...) +} + +func (c *conn) appendNumber(v interface{}) { + switch n := v.(type) { + case int: + c.buf = strconv.AppendInt(c.buf, int64(n), 10) + case uint: + c.buf = strconv.AppendUint(c.buf, uint64(n), 10) + case int64: + c.buf = strconv.AppendInt(c.buf, n, 10) + case uint64: + c.buf = strconv.AppendUint(c.buf, n, 10) + case int32: + c.buf = strconv.AppendInt(c.buf, int64(n), 10) + case uint32: + c.buf = strconv.AppendUint(c.buf, uint64(n), 10) + case int16: + c.buf = strconv.AppendInt(c.buf, int64(n), 10) + case uint16: + c.buf = strconv.AppendUint(c.buf, uint64(n), 10) + case int8: + c.buf = strconv.AppendInt(c.buf, int64(n), 10) + case uint8: + c.buf = strconv.AppendUint(c.buf, uint64(n), 10) + case float64: + c.buf = strconv.AppendFloat(c.buf, n, 'f', -1, 64) + case float32: + c.buf = strconv.AppendFloat(c.buf, float64(n), 'f', -1, 32) + } +} + +func isNegative(v interface{}) bool { + switch n := v.(type) { + case int: + return n < 0 + case uint: + return n < 0 + case int64: + return n < 0 + case uint64: + return n < 0 + case int32: + return n < 0 + case uint32: + return n < 0 + case int16: + return n < 0 + case uint16: + return n < 0 + case int8: + return n < 0 + case uint8: + return n < 0 + case float64: + return n < 0 + case float32: + return n < 0 + } + return false +} + +func (c *conn) appendBucket(prefix, bucket string, tags string) { + c.appendString(prefix) + c.appendString(bucket) + if c.tagFormat == InfluxDB { + c.appendString(tags) + } + c.appendByte(':') +} + +func (c *conn) appendType(t string) { + c.appendByte('|') + c.appendString(t) +} + +func (c *conn) appendRate(rate float32) { + if rate == 1 { + return + } + if c.rateCache == nil { + c.rateCache = make(map[float32]string) + } + + c.appendString("|@") + if s, ok := c.rateCache[rate]; ok { + c.appendString(s) + } else { + s = strconv.FormatFloat(float64(rate), 'f', -1, 32) + c.rateCache[rate] = s + c.appendString(s) + } +} + +func (c *conn) closeMetric(tags string) { + if c.tagFormat == Datadog { + c.appendString(tags) + } + c.appendByte('\n') +} + +func (c *conn) flushIfBufferFull(lastSafeLen int) { + if len(c.buf) > c.maxPacketSize { + c.flush(lastSafeLen) + } +} + +// flush flushes the first n bytes of the buffer. +// If n is 0, the whole buffer is flushed. +func (c *conn) flush(n int) { + if len(c.buf) == 0 { + return + } + if n == 0 { + n = len(c.buf) + } + + // Trim the last \n, StatsD does not like it. + _, err := c.w.Write(c.buf[:n-1]) + c.handleError(err) + if n < len(c.buf) { + copy(c.buf, c.buf[n:]) + } + c.buf = c.buf[:len(c.buf)-n] +} + +func (c *conn) handleError(err error) { + if err != nil && c.errorHandler != nil { + c.errorHandler(err) + } +} + +// Stubbed out for testing. +var ( + dialTimeout = net.DialTimeout + now = time.Now + randFloat = rand.Float32 +) diff --git a/vendor/github.com/alexcesaro/statsd/doc.go b/vendor/github.com/alexcesaro/statsd/doc.go new file mode 100644 index 000000000..bb7b986e3 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/doc.go @@ -0,0 +1,29 @@ +/* +Package statsd is a simple and efficient StatsD client. + + +Options + +Use options to configure the Client: target host/port, sampling rate, tags, etc. + +Whenever you want to use different options (e.g. other tags, different sampling +rate), you should use the Clone() method of the Client. + +Because when cloning a Client, the same connection is reused so this is way +cheaper and more efficient than creating another Client using New(). + + +Internals + +Client's methods buffer metrics. The buffer is flushed when either: + - the background goroutine flushes the buffer (every 100ms by default) + - the buffer is full (1440 bytes by default so that IP packets are not + fragmented) + +The background goroutine can be disabled using the FlushPeriod(0) option. + +Buffering can be disabled using the MaxPacketSize(0) option. + +StatsD homepage: https://github.com/etsy/statsd +*/ +package statsd diff --git a/vendor/github.com/alexcesaro/statsd/options.go b/vendor/github.com/alexcesaro/statsd/options.go new file mode 100644 index 000000000..ef95bb8c3 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/options.go @@ -0,0 +1,250 @@ +package statsd + +import ( + "bytes" + "strings" + "time" +) + +type config struct { + Conn connConfig + Client clientConfig +} + +type clientConfig struct { + Muted bool + Rate float32 + Prefix string + Tags []tag +} + +type connConfig struct { + Addr string + ErrorHandler func(error) + FlushPeriod time.Duration + MaxPacketSize int + Network string + TagFormat TagFormat +} + +// An Option represents an option for a Client. It must be used as an +// argument to New() or Client.Clone(). +type Option func(*config) + +// Address sets the address of the StatsD daemon. +// +// By default, ":8125" is used. This option is ignored in Client.Clone(). +func Address(addr string) Option { + return Option(func(c *config) { + c.Conn.Addr = addr + }) +} + +// ErrorHandler sets the function called when an error happens when sending +// metrics (e.g. the StatsD daemon is not listening anymore). +// +// By default, these errors are ignored. This option is ignored in +// Client.Clone(). +func ErrorHandler(h func(error)) Option { + return Option(func(c *config) { + c.Conn.ErrorHandler = h + }) +} + +// FlushPeriod sets how often the Client's buffer is flushed. If p is 0, the +// goroutine that periodically flush the buffer is not lauched and the buffer +// is only flushed when it is full. +// +// By default, the flush period is 100 ms. This option is ignored in +// Client.Clone(). +func FlushPeriod(p time.Duration) Option { + return Option(func(c *config) { + c.Conn.FlushPeriod = p + }) +} + +// MaxPacketSize sets the maximum packet size in bytes sent by the Client. +// +// By default, it is 1440 to avoid IP fragmentation. This option is ignored in +// Client.Clone(). +func MaxPacketSize(n int) Option { + return Option(func(c *config) { + c.Conn.MaxPacketSize = n + }) +} + +// Network sets the network (udp, tcp, etc) used by the client. See the +// net.Dial documentation (https://golang.org/pkg/net/#Dial) for the available +// network options. +// +// By default, network is udp. This option is ignored in Client.Clone(). +func Network(network string) Option { + return Option(func(c *config) { + c.Conn.Network = network + }) +} + +// Mute sets whether the Client is muted. All methods of a muted Client do +// nothing and return immedialtly. +// +// This option can be used in Client.Clone() only if the parent Client is not +// muted. The clones of a muted Client are always muted. +func Mute(b bool) Option { + return Option(func(c *config) { + c.Client.Muted = b + }) +} + +// SampleRate sets the sample rate of the Client. It allows sending the metrics +// less often which can be useful for performance intensive code paths. +func SampleRate(rate float32) Option { + return Option(func(c *config) { + c.Client.Rate = rate + }) +} + +// Prefix appends the prefix that will be used in every bucket name. +// +// Note that when used in cloned, the prefix of the parent Client is not +// replaced but is prepended to the given prefix. +func Prefix(p string) Option { + return Option(func(c *config) { + c.Client.Prefix += strings.TrimSuffix(p, ".") + "." + }) +} + +// TagFormat represents the format of tags sent by a Client. +type TagFormat uint8 + +// TagsFormat sets the format of tags. +func TagsFormat(tf TagFormat) Option { + return Option(func(c *config) { + c.Conn.TagFormat = tf + }) +} + +// Tags appends the given tags to the tags sent with every metrics. If a tag +// already exists, it is replaced. +// +// The tags must be set as key-value pairs. If the number of tags is not even, +// Tags panics. +// +// If the format of tags have not been set using the TagsFormat option, the tags +// will be ignored. +func Tags(tags ...string) Option { + if len(tags)%2 != 0 { + panic("statsd: Tags only accepts an even number of arguments") + } + + return Option(func(c *config) { + if len(tags) == 0 { + return + } + + newTags := make([]tag, len(tags)/2) + for i := 0; i < len(tags)/2; i++ { + newTags[i] = tag{K: tags[2*i], V: tags[2*i+1]} + } + + for _, newTag := range newTags { + exists := false + for _, oldTag := range c.Client.Tags { + if newTag.K == oldTag.K { + exists = true + oldTag.V = newTag.V + } + } + if !exists { + c.Client.Tags = append(c.Client.Tags, tag{ + K: newTag.K, + V: newTag.V, + }) + } + } + }) +} + +type tag struct { + K, V string +} + +func joinTags(tf TagFormat, tags []tag) string { + if len(tags) == 0 || tf == 0 { + return "" + } + join := joinFuncs[tf] + return join(tags) +} + +func splitTags(tf TagFormat, tags string) []tag { + if len(tags) == 0 || tf == 0 { + return nil + } + split := splitFuncs[tf] + return split(tags) +} + +const ( + // InfluxDB tag format. + // See https://influxdb.com/blog/2015/11/03/getting_started_with_influx_statsd.html + InfluxDB TagFormat = iota + 1 + // Datadog tag format. + // See http://docs.datadoghq.com/guides/metrics/#tags + Datadog +) + +var ( + joinFuncs = map[TagFormat]func([]tag) string{ + // InfluxDB tag format: ,tag1=payroll,region=us-west + // https://influxdb.com/blog/2015/11/03/getting_started_with_influx_statsd.html + InfluxDB: func(tags []tag) string { + var buf bytes.Buffer + for _, tag := range tags { + _ = buf.WriteByte(',') + _, _ = buf.WriteString(tag.K) + _ = buf.WriteByte('=') + _, _ = buf.WriteString(tag.V) + } + return buf.String() + }, + // Datadog tag format: |#tag1:value1,tag2:value2 + // http://docs.datadoghq.com/guides/dogstatsd/#datagram-format + Datadog: func(tags []tag) string { + buf := bytes.NewBufferString("|#") + first := true + for _, tag := range tags { + if first { + first = false + } else { + _ = buf.WriteByte(',') + } + _, _ = buf.WriteString(tag.K) + _ = buf.WriteByte(':') + _, _ = buf.WriteString(tag.V) + } + return buf.String() + }, + } + splitFuncs = map[TagFormat]func(string) []tag{ + InfluxDB: func(s string) []tag { + s = s[1:] + pairs := strings.Split(s, ",") + tags := make([]tag, len(pairs)) + for i, pair := range pairs { + kv := strings.Split(pair, "=") + tags[i] = tag{K: kv[0], V: kv[1]} + } + return tags + }, + Datadog: func(s string) []tag { + s = s[2:] + pairs := strings.Split(s, ",") + tags := make([]tag, len(pairs)) + for i, pair := range pairs { + kv := strings.Split(pair, ":") + tags[i] = tag{K: kv[0], V: kv[1]} + } + return tags + }, + } +) diff --git a/vendor/github.com/alexcesaro/statsd/statsd.go b/vendor/github.com/alexcesaro/statsd/statsd.go new file mode 100644 index 000000000..f19204d79 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/statsd.go @@ -0,0 +1,169 @@ +package statsd + +import "time" + +// A Client represents a StatsD client. +type Client struct { + conn *conn + muted bool + rate float32 + prefix string + tags string +} + +// New returns a new Client. +func New(opts ...Option) (*Client, error) { + // The default configuration. + conf := &config{ + Client: clientConfig{ + Rate: 1, + }, + Conn: connConfig{ + Addr: ":8125", + FlushPeriod: 100 * time.Millisecond, + // Worst-case scenario: + // Ethernet MTU - IPv6 Header - TCP Header = 1500 - 40 - 20 = 1440 + MaxPacketSize: 1440, + Network: "udp", + }, + } + for _, o := range opts { + o(conf) + } + + conn, err := newConn(conf.Conn, conf.Client.Muted) + c := &Client{ + conn: conn, + muted: conf.Client.Muted, + } + if err != nil { + c.muted = true + return c, err + } + c.rate = conf.Client.Rate + c.prefix = conf.Client.Prefix + c.tags = joinTags(conf.Conn.TagFormat, conf.Client.Tags) + return c, nil +} + +// Clone returns a clone of the Client. The cloned Client inherits its +// configuration from its parent. +// +// All cloned Clients share the same connection, so cloning a Client is a cheap +// operation. +func (c *Client) Clone(opts ...Option) *Client { + tf := c.conn.tagFormat + conf := &config{ + Client: clientConfig{ + Rate: c.rate, + Prefix: c.prefix, + Tags: splitTags(tf, c.tags), + }, + } + for _, o := range opts { + o(conf) + } + + clone := &Client{ + conn: c.conn, + muted: c.muted || conf.Client.Muted, + rate: conf.Client.Rate, + prefix: conf.Client.Prefix, + tags: joinTags(tf, conf.Client.Tags), + } + clone.conn = c.conn + return clone +} + +// Count adds n to bucket. +func (c *Client) Count(bucket string, n interface{}) { + if c.skip() { + return + } + c.conn.metric(c.prefix, bucket, n, "c", c.rate, c.tags) +} + +func (c *Client) skip() bool { + return c.muted || (c.rate != 1 && randFloat() > c.rate) +} + +// Increment increment the given bucket. It is equivalent to Count(bucket, 1). +func (c *Client) Increment(bucket string) { + c.Count(bucket, 1) +} + +// Gauge records an absolute value for the given bucket. +func (c *Client) Gauge(bucket string, value interface{}) { + if c.skip() { + return + } + c.conn.gauge(c.prefix, bucket, value, c.tags) +} + +// Timing sends a timing value to a bucket. +func (c *Client) Timing(bucket string, value interface{}) { + if c.skip() { + return + } + c.conn.metric(c.prefix, bucket, value, "ms", c.rate, c.tags) +} + +// Histogram sends an histogram value to a bucket. +func (c *Client) Histogram(bucket string, value interface{}) { + if c.skip() { + return + } + c.conn.metric(c.prefix, bucket, value, "h", c.rate, c.tags) +} + +// A Timing is an helper object that eases sending timing values. +type Timing struct { + start time.Time + c *Client +} + +// NewTiming creates a new Timing. +func (c *Client) NewTiming() Timing { + return Timing{start: now(), c: c} +} + +// Send sends the time elapsed since the creation of the Timing. +func (t Timing) Send(bucket string) { + t.c.Timing(bucket, int(t.Duration()/time.Millisecond)) +} + +// Duration returns the time elapsed since the creation of the Timing. +func (t Timing) Duration() time.Duration { + return now().Sub(t.start) +} + +// Unique sends the given value to a set bucket. +func (c *Client) Unique(bucket string, value string) { + if c.skip() { + return + } + c.conn.unique(c.prefix, bucket, value, c.tags) +} + +// Flush flushes the Client's buffer. +func (c *Client) Flush() { + if c.muted { + return + } + c.conn.mu.Lock() + c.conn.flush(0) + c.conn.mu.Unlock() +} + +// Close flushes the Client's buffer and releases the associated ressources. The +// Client and all the cloned Clients must not be used afterward. +func (c *Client) Close() { + if c.muted { + return + } + c.conn.mu.Lock() + c.conn.flush(0) + c.conn.handleError(c.conn.w.Close()) + c.conn.closed = true + c.conn.mu.Unlock() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 173423a37..20009cb8c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,5 +1,7 @@ # github.com/Shopify/sarama v1.19.0 github.com/Shopify/sarama +# github.com/alexcesaro/statsd v2.0.0+incompatible +github.com/alexcesaro/statsd # github.com/apache/thrift v0.13.0 github.com/apache/thrift/lib/go/thrift # github.com/armon/go-metrics v0.3.4