From ba83ae2e1f286a49727e1937ce58b420c591c3f4 Mon Sep 17 00:00:00 2001 From: Craig Peterson Date: Wed, 16 Sep 2015 14:11:45 -0600 Subject: [PATCH 1/2] Relaying metadata as well. Also making relay that sets host header properly. --- cmd/bosun/main.go | 5 ++- cmd/bosun/web/web.go | 3 +- cmd/tsdbrelay/main.go | 73 ++++++++++++++++++++++++++++++++++--------- util/proxy.go | 26 +++++++++++++++ 4 files changed, 88 insertions(+), 19 deletions(-) create mode 100644 util/proxy.go diff --git a/cmd/bosun/main.go b/cmd/bosun/main.go index 0025d64d64..81155021e7 100644 --- a/cmd/bosun/main.go +++ b/cmd/bosun/main.go @@ -7,7 +7,6 @@ import ( "fmt" "net/http" "net/http/httptest" - "net/http/httputil" _ "net/http/pprof" "net/url" "os" @@ -106,7 +105,7 @@ func main() { if c.RelayListen != "" { go func() { mux := http.NewServeMux() - mux.Handle("/api/", httputil.NewSingleHostReverseProxy(httpListen)) + mux.Handle("/api/", util.NewSingleHostProxy(httpListen)) s := &http.Server{ Addr: c.RelayListen, Handler: mux, @@ -123,7 +122,7 @@ func main() { Host: c.TSDBHost, } if *flagReadonly { - rp := httputil.NewSingleHostReverseProxy(tsdbHost) + rp := util.NewSingleHostProxy(tsdbHost) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/api/put" { w.WriteHeader(204) diff --git a/cmd/bosun/web/web.go b/cmd/bosun/web/web.go index 3730ea60df..53b4746f62 100644 --- a/cmd/bosun/web/web.go +++ b/cmd/bosun/web/web.go @@ -24,6 +24,7 @@ import ( "bosun.org/metadata" "bosun.org/opentsdb" "bosun.org/slog" + "bosun.org/util" "bosun.org/version" ) @@ -163,7 +164,7 @@ func (rp *relayProxy) ServeHTTP(responseWriter http.ResponseWriter, r *http.Requ } func Relay(dest string) http.Handler { - return &relayProxy{ReverseProxy: httputil.NewSingleHostReverseProxy(&url.URL{ + return &relayProxy{ReverseProxy: util.NewSingleHostProxy(&url.URL{ Scheme: "http", Host: dest, })} diff --git a/cmd/tsdbrelay/main.go b/cmd/tsdbrelay/main.go index 970bf599cb..449f467e2e 100644 --- a/cmd/tsdbrelay/main.go +++ b/cmd/tsdbrelay/main.go @@ -17,6 +17,7 @@ import ( "bosun.org/cmd/tsdbrelay/denormalize" "bosun.org/opentsdb" + "bosun.org/util" ) var ( @@ -81,8 +82,6 @@ func main() { Path: "/api/index", } bosunIndexURL = u.String() - tsdbProxy := httputil.NewSingleHostReverseProxy(tsdbURL) - if *secondaryRelays != "" { for _, rUrl := range strings.Split(*secondaryRelays, ",") { u = url.URL{ @@ -94,10 +93,18 @@ func main() { } } - http.Handle("/api/put", &relayProxy{ - ReverseProxy: tsdbProxy, + tsdbProxy := util.NewSingleHostProxy(tsdbURL) + bosunProxy := util.NewSingleHostProxy(bosunURL) + rp := &relayProxy{ + TSDBProxy: tsdbProxy, + BosunProxy: bosunProxy, + } + http.HandleFunc("/api/put", func(w http.ResponseWriter, r *http.Request) { + rp.relayPut(w, r, true) + }) + http.HandleFunc("/api/metadata/put", func(w http.ResponseWriter, r *http.Request) { + rp.relayMetadata(w, r) }) - http.Handle("/api/metadata/put", httputil.NewSingleHostReverseProxy(bosunURL)) http.Handle("/", tsdbProxy) log.Fatal(http.ListenAndServe(*listenAddr, nil)) } @@ -109,7 +116,8 @@ func verbose(format string, a ...interface{}) { } type relayProxy struct { - *httputil.ReverseProxy + TSDBProxy *httputil.ReverseProxy + BosunProxy *httputil.ReverseProxy } type passthru struct { @@ -133,21 +141,17 @@ func (rw *relayWriter) WriteHeader(code int) { rw.ResponseWriter.WriteHeader(code) } -func (rp *relayProxy) ServeHTTP(responseWriter http.ResponseWriter, r *http.Request) { - rp.relayRequest(responseWriter, r, true) -} - var ( relayHeader = "X-Relayed-From" myHost string ) -func (rp *relayProxy) relayRequest(responseWriter http.ResponseWriter, r *http.Request, parse bool) { +func (rp *relayProxy) relayPut(responseWriter http.ResponseWriter, r *http.Request, parse bool) { isRelayed := r.Header.Get(relayHeader) != "" reader := &passthru{ReadCloser: r.Body} r.Body = reader w := &relayWriter{ResponseWriter: responseWriter} - rp.ReverseProxy.ServeHTTP(w, r) + rp.TSDBProxy.ServeHTTP(w, r) if w.code != 204 { verbose("got status", w.code) return @@ -179,12 +183,12 @@ func (rp *relayProxy) relayRequest(responseWriter http.ResponseWriter, r *http.R for _, relayUrl := range relayPutUrls { body := bytes.NewBuffer(reader.buf.Bytes()) req, err := http.NewRequest(r.Method, relayUrl, body) - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Content-Encoding", "gzip") if err != nil { verbose("%v", err) return } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Content-Encoding", "gzip") req.Header.Add(relayHeader, myHost) resp, err := http.DefaultClient.Do(req) if err != nil { @@ -245,7 +249,46 @@ func (rp *relayProxy) denormalize(body io.Reader) { req.Header.Set("Content-Encoding", "gzip") responseWriter := httptest.NewRecorder() - rp.relayRequest(responseWriter, req, false) + rp.relayPut(responseWriter, req, false) verbose("relayed %d denormalized data points. Tsdb response: %d", len(relayDps), responseWriter.Code) } + +func (rp *relayProxy) relayMetadata(responseWriter http.ResponseWriter, r *http.Request) { + + reader := &passthru{ReadCloser: r.Body} + r.Body = reader + w := &relayWriter{ResponseWriter: responseWriter} + rp.BosunProxy.ServeHTTP(w, r) + if w.code != 204 { + verbose("got status", w.code) + return + } + verbose("relayed metadata to bosun") + if r.Header.Get(relayHeader) != "" { + return + } + if len(relayPutUrls) != 0 { + go func() { + for _, relayUrl := range relayPutUrls { + relayUrl = strings.Replace(relayUrl, "/put", "/metadata/put", 1) + body := bytes.NewBuffer(reader.buf.Bytes()) + req, err := http.NewRequest(r.Method, relayUrl, body) + if err != nil { + verbose("%v", err) + return + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Content-Encoding", "gzip") + req.Header.Add(relayHeader, myHost) + resp, err := http.DefaultClient.Do(req) + if err != nil { + verbose("secondary relay error: %v", err) + return + } + resp.Body.Close() + verbose("secondary relay success") + } + }() + } +} diff --git a/util/proxy.go b/util/proxy.go new file mode 100644 index 0000000000..85a28f6835 --- /dev/null +++ b/util/proxy.go @@ -0,0 +1,26 @@ +package util + +import ( + "net/http" + "net/http/httputil" + "net/url" + "path" +) + +// Creates a new http Proxy that forwards requests to the specified url. +// Differs from httputil.NewSingleHostReverseProxy only in that it properly sets the host header. +func NewSingleHostProxy(target *url.URL) *httputil.ReverseProxy { + targetQuery := target.RawQuery + director := func(req *http.Request) { + req.URL.Scheme = target.Scheme + req.URL.Host = target.Host + req.URL.Path = path.Join(target.Path, req.URL.Path) + if targetQuery == "" || req.URL.RawQuery == "" { + req.URL.RawQuery = targetQuery + req.URL.RawQuery + } else { + req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery + } + req.Host = target.Host + } + return &httputil.ReverseProxy{Director: director} +} From 3b370172296679335eb9316d2031365584a7af4f Mon Sep 17 00:00:00 2001 From: Craig Peterson Date: Wed, 16 Sep 2015 14:14:59 -0600 Subject: [PATCH 2/2] copying a smidgen less code out of httputil --- util/proxy.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/util/proxy.go b/util/proxy.go index 85a28f6835..a06617df56 100644 --- a/util/proxy.go +++ b/util/proxy.go @@ -4,22 +4,14 @@ import ( "net/http" "net/http/httputil" "net/url" - "path" ) // Creates a new http Proxy that forwards requests to the specified url. // Differs from httputil.NewSingleHostReverseProxy only in that it properly sets the host header. func NewSingleHostProxy(target *url.URL) *httputil.ReverseProxy { - targetQuery := target.RawQuery + proxy := httputil.NewSingleHostReverseProxy(target) director := func(req *http.Request) { - req.URL.Scheme = target.Scheme - req.URL.Host = target.Host - req.URL.Path = path.Join(target.Path, req.URL.Path) - if targetQuery == "" || req.URL.RawQuery == "" { - req.URL.RawQuery = targetQuery + req.URL.RawQuery - } else { - req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery - } + proxy.Director(req) req.Host = target.Host } return &httputil.ReverseProxy{Director: director}