diff --git a/main.go b/main.go index e4ea3b252..719e8f132 100644 --- a/main.go +++ b/main.go @@ -177,17 +177,14 @@ func newHTTPProxy(cfg *config.Config) http.Handler { } } -func lookupHostFn(cfg *config.Config) func(string) string { +func lookupHostFn(cfg *config.Config) func(string) *route.Target { pick := route.Picker[cfg.Proxy.Strategy] - notFound := metrics.DefaultRegistry.GetCounter("notfound") - return func(host string) string { + return func(host string) *route.Target { t := route.GetTable().LookupHost(host, pick) if t == nil { - notFound.Inc(1) log.Print("[WARN] No route for ", host) - return "" } - return t.URL.Host + return t } } @@ -252,14 +249,26 @@ func startServers(cfg *config.Config) { }() case "tcp": go func() { - h := &tcp.Proxy{cfg.Proxy.DialTimeout, lookupHostFn(cfg)} + h := &tcp.Proxy{ + DialTimeout: cfg.Proxy.DialTimeout, + Lookup: lookupHostFn(cfg), + Conn: metrics.DefaultRegistry.GetCounter("tcp.conn"), + ConnFail: metrics.DefaultRegistry.GetCounter("tcp.connfail"), + Noroute: metrics.DefaultRegistry.GetCounter("tcp.noroute"), + } if err := proxy.ListenAndServeTCP(l, h, tlscfg); err != nil { exit.Fatal("[FATAL] ", err) } }() case "tcp+sni": go func() { - h := &tcp.SNIProxy{cfg.Proxy.DialTimeout, lookupHostFn(cfg)} + h := &tcp.SNIProxy{ + DialTimeout: cfg.Proxy.DialTimeout, + Lookup: lookupHostFn(cfg), + Conn: metrics.DefaultRegistry.GetCounter("tcp_sni.conn"), + ConnFail: metrics.DefaultRegistry.GetCounter("tcp_sni.connfail"), + Noroute: metrics.DefaultRegistry.GetCounter("tcp_sni.noroute"), + } if err := proxy.ListenAndServeTCP(l, h, tlscfg); err != nil { exit.Fatal("[FATAL] ", err) } diff --git a/proxy/tcp/copy_buffer.go b/proxy/tcp/copy_buffer.go new file mode 100644 index 000000000..c06c50fd1 --- /dev/null +++ b/proxy/tcp/copy_buffer.go @@ -0,0 +1,39 @@ +package tcp + +import ( + "io" + + "github.com/fabiolb/fabio/metrics" +) + +// copyBuffer is an adapted version of io.copyBuffer which updates a +// counter instead of returning the total bytes written. +func copyBuffer(dst io.Writer, src io.Reader, c metrics.Counter) (err error) { + buf := make([]byte, 32*1024) + for { + nr, er := src.Read(buf) + if nr > 0 { + nw, ew := dst.Write(buf[0:nr]) + if nw > 0 { + if c != nil { + c.Inc(int64(nw)) + } + } + if ew != nil { + err = ew + break + } + if nr != nw { + err = io.ErrShortWrite + break + } + } + if er != nil { + if er != io.EOF { + err = er + } + break + } + } + return err +} diff --git a/proxy/tcp/sni_proxy.go b/proxy/tcp/sni_proxy.go index 28c8b57d5..b75d06705 100644 --- a/proxy/tcp/sni_proxy.go +++ b/proxy/tcp/sni_proxy.go @@ -5,6 +5,9 @@ import ( "log" "net" "time" + + "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/route" ) // SNIProxy implements an SNI aware transparent TCP proxy which captures the @@ -19,16 +22,32 @@ type SNIProxy struct { // Lookup returns a target host for the given server name. // The proxy will panic if this value is nil. - Lookup func(host string) string + Lookup func(host string) *route.Target + + // Conn counts the number of connections. + Conn metrics.Counter + + // ConnFail counts the failed upstream connection attempts. + ConnFail metrics.Counter + + // Noroute counts the failed Lookup() calls. + Noroute metrics.Counter } func (p *SNIProxy) ServeTCP(in net.Conn) error { defer in.Close() + if p.Conn != nil { + p.Conn.Inc(1) + } + // capture client hello data := make([]byte, 1024) n, err := in.Read(data) if err != nil { + if p.ConnFail != nil { + p.ConnFail.Inc(1) + } return err } data = data[:n] @@ -36,41 +55,64 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { host, ok := readServerName(data) if !ok { log.Print("[DEBUG] tcp+sni: TLS handshake failed") + if p.ConnFail != nil { + p.ConnFail.Inc(1) + } return nil } if host == "" { log.Print("[DEBUG] tcp+sni: server_name missing") + if p.ConnFail != nil { + p.ConnFail.Inc(1) + } return nil } - addr := p.Lookup(host) - if addr == "" { + t := p.Lookup(host) + if t == nil { + if p.Noroute != nil { + p.Noroute.Inc(1) + } return nil } + addr := t.URL.Host out, err := net.DialTimeout("tcp", addr, p.DialTimeout) if err != nil { log.Print("[WARN] tcp+sni: cannot connect to upstream ", addr) + if p.ConnFail != nil { + p.ConnFail.Inc(1) + } return err } defer out.Close() // copy client hello - _, err = out.Write(data) + n, err = out.Write(data) if err != nil { log.Print("[WARN] tcp+sni: copy client hello failed. ", err) + if p.ConnFail != nil { + p.ConnFail.Inc(1) + } return err } errc := make(chan error, 2) - cp := func(dst io.Writer, src io.Reader) { - _, err := io.Copy(dst, src) - errc <- err + cp := func(dst io.Writer, src io.Reader, c metrics.Counter) { + errc <- copyBuffer(dst, src, c) } - go cp(out, in) - go cp(in, out) + // rx measures the traffic to the upstream server (in <- out) + // tx measures the traffic from the upstream server (out <- in) + rx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".rx") + tx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".tx") + + // we've received the ClientHello already + rx.Inc(int64(n)) + + go cp(in, out, rx) + go cp(out, in, tx) err = <-errc if err != nil && err != io.EOF { log.Print("[WARN]: tcp+sni: ", err) diff --git a/proxy/tcp/tcp_proxy.go b/proxy/tcp/tcp_proxy.go index b97b25873..5b14cc950 100644 --- a/proxy/tcp/tcp_proxy.go +++ b/proxy/tcp/tcp_proxy.go @@ -5,6 +5,9 @@ import ( "log" "net" "time" + + "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/route" ) // Proxy implements a generic TCP proxying handler. @@ -13,36 +16,60 @@ type Proxy struct { // connection. DialTimeout time.Duration - // Lookup returns a target host for the given server name. + // Lookup returns a target host for the given request. // The proxy will panic if this value is nil. - Lookup func(host string) string + Lookup func(host string) *route.Target + + // Conn counts the number of connections. + Conn metrics.Counter + + // ConnFail counts the failed upstream connection attempts. + ConnFail metrics.Counter + + // Noroute counts the failed Lookup() calls. + Noroute metrics.Counter } func (p *Proxy) ServeTCP(in net.Conn) error { defer in.Close() + if p.Conn != nil { + p.Conn.Inc(1) + } + _, port, _ := net.SplitHostPort(in.LocalAddr().String()) port = ":" + port - addr := p.Lookup(port) - if addr == "" { + t := p.Lookup(port) + if t == nil { + if p.Noroute != nil { + p.Noroute.Inc(1) + } return nil } + addr := t.URL.Host out, err := net.DialTimeout("tcp", addr, p.DialTimeout) if err != nil { log.Print("[WARN] tcp: cannot connect to upstream ", addr) + if p.ConnFail != nil { + p.ConnFail.Inc(1) + } return err } defer out.Close() errc := make(chan error, 2) - cp := func(dst io.Writer, src io.Reader) { - _, err := io.Copy(dst, src) - errc <- err + cp := func(dst io.Writer, src io.Reader, c metrics.Counter) { + errc <- copyBuffer(dst, src, c) } - go cp(out, in) - go cp(in, out) + // rx measures the traffic to the upstream server (in <- out) + // tx measures the traffic from the upstream server (out <- in) + rx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".rx") + tx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".tx") + + go cp(in, out, rx) + go cp(out, in, tx) err = <-errc if err != nil && err != io.EOF { log.Print("[WARN]: tcp: ", err) diff --git a/route/route.go b/route/route.go index 572423038..ad410197d 100644 --- a/route/route.go +++ b/route/route.go @@ -64,7 +64,7 @@ func (r *Route) addTarget(service string, targetURL *url.URL, fixedWeight float6 URL: targetURL, FixedWeight: fixedWeight, Timer: ServiceRegistry.GetTimer(name), - timerName: name, + TimerName: name, } if r.Opts != nil { t.StripPath = r.Opts["strip"] diff --git a/route/table.go b/route/table.go index 605b8acd5..e2af931a4 100644 --- a/route/table.go +++ b/route/table.go @@ -73,7 +73,7 @@ func syncRegistry(t Table) { for _, routes := range t { for _, r := range routes { for _, tg := range r.Targets { - timers[tg.timerName] = true + timers[tg.TimerName] = true } } } diff --git a/route/target.go b/route/target.go index caa6fed9f..6b4db5fdb 100644 --- a/route/target.go +++ b/route/target.go @@ -40,6 +40,6 @@ type Target struct { // Timer measures throughput and latency of this target Timer metrics.Timer - // timerName is the name of the timer in the metrics registry - timerName string + // TimerName is the name of the timer in the metrics registry + TimerName string }