From 7e1ff4882e1b24ec5a7823a565354da548809bf4 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Thu, 16 Jul 2015 17:58:12 -0700 Subject: [PATCH] nsq*: cleanup HTTP routing (+httprouter) --- Godeps | 1 + internal/http_api/api_response.go | 51 +++--- internal/http_api/http_server.go | 2 +- nsqadmin/http.go | 226 ++++++++------------------- nsqadmin/nsqadmin.go | 2 +- nsqd/http.go | 249 +++++++++++------------------- nsqd/nsqd.go | 16 +- nsqlookupd/http.go | 183 ++++++++-------------- nsqlookupd/nsqlookupd.go | 4 +- 9 files changed, 248 insertions(+), 486 deletions(-) diff --git a/Godeps b/Godeps index 9064cb966..f1e24771c 100644 --- a/Godeps +++ b/Godeps @@ -7,3 +7,4 @@ github.com/mreiferson/go-options 2cf7eb1fdd83e2bb3375fef6fdadb04c3ad564da github.com/mreiferson/go-snappystream 028eae7ab5c4c9e2d1cb4c4ca1e53259bbe7e504 # v0.2.3 github.com/bitly/timer_metrics afad1794bb13e2a094720aeb27c088aa64564895 github.com/blang/semver 9bf7bff48b0388cb75991e58c6df7d13e982f1f2 +github.com/julienschmidt/httprouter 6aacfd5ab513e34f7e64ea9627ab9670371b34e7 diff --git a/internal/http_api/api_response.go b/internal/http_api/api_response.go index 73af8b859..1eb6b04bc 100644 --- a/internal/http_api/api_response.go +++ b/internal/http_api/api_response.go @@ -5,8 +5,12 @@ import ( "fmt" "net/http" "strconv" + + "github.com/julienschmidt/httprouter" ) +type APIHandler func(http.ResponseWriter, *http.Request, httprouter.Params) (interface{}, error) + type Err struct { Code int Text string @@ -24,40 +28,35 @@ func acceptVersion(req *http.Request) int { return 0 } -func RequirePOST(req *http.Request, f func() (interface{}, error)) func() (interface{}, error) { - if req.Method != "POST" { - return func() (interface{}, error) { - return nil, Err{405, "INVALID_REQUEST"} +func NegotiateVersion(f APIHandler) httprouter.Handle { + return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + data, err := f(w, req, ps) + if err != nil { + if acceptVersion(req) == 1 { + RespondV1(w, err.(Err).Code, err) + } else { + // this handler always returns 500 for backwards compatibility + Respond(w, 500, err.Error(), nil) + } + return } - } - return f -} - -func NegotiateVersionWrapper(w http.ResponseWriter, req *http.Request, f func() (interface{}, error)) { - data, err := f() - if err != nil { if acceptVersion(req) == 1 { - RespondV1(w, err.(Err).Code, err) + RespondV1(w, 200, data) } else { - // this handler always returns 500 for backwards compatibility - Respond(w, 500, err.Error(), nil) + Respond(w, 200, "OK", data) } - return - } - if acceptVersion(req) == 1 { - RespondV1(w, 200, data) - } else { - Respond(w, 200, "OK", data) } } -func V1Wrapper(w http.ResponseWriter, req *http.Request, f func() (interface{}, error)) { - data, err := f() - if err != nil { - RespondV1(w, err.(Err).Code, err) - return +func V1(f APIHandler) httprouter.Handle { + return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + data, err := f(w, req, ps) + if err != nil { + RespondV1(w, err.(Err).Code, err) + return + } + RespondV1(w, 200, data) } - RespondV1(w, 200, data) } func Respond(w http.ResponseWriter, statusCode int, statusTxt string, data interface{}) { diff --git a/internal/http_api/http_server.go b/internal/http_api/http_server.go index 3cbac8852..726f27c23 100644 --- a/internal/http_api/http_server.go +++ b/internal/http_api/http_server.go @@ -9,7 +9,7 @@ import ( "github.com/bitly/nsq/internal/app" ) -func Serve(listener net.Listener, handler http.Handler, l app.Logger, proto string) { +func Serve(listener net.Listener, handler http.Handler, proto string, l app.Logger) { l.Output(2, fmt.Sprintf("%s: listening on %s", proto, listener.Addr())) server := &http.Server{ diff --git a/nsqadmin/http.go b/nsqadmin/http.go index d59c1d7da..e8fc378c3 100644 --- a/nsqadmin/http.go +++ b/nsqadmin/http.go @@ -9,7 +9,6 @@ import ( "net/http" "net/http/httputil" "net/url" - "regexp" "strconv" "strings" "time" @@ -21,6 +20,7 @@ import ( "github.com/bitly/nsq/internal/version" "github.com/bitly/nsq/nsqadmin/templates" "github.com/blang/semver" + "github.com/julienschmidt/httprouter" ) var v1EndpointVersion semver.Version @@ -49,6 +49,7 @@ type httpServer struct { ctx *Context counters map[string]map[string]int64 proxy *httputil.ReverseProxy + router http.Handler } func NewHTTPServer(ctx *Context) *httpServer { @@ -73,92 +74,57 @@ func NewHTTPServer(ctx *Context) *httpServer { proxy = NewSingleHostReverseProxy(ctx.nsqadmin.graphiteURL, 20*time.Second) } - return &httpServer{ + router := httprouter.New() + router.HandleMethodNotAllowed = true + s := &httpServer{ ctx: ctx, counters: make(map[string]map[string]int64), proxy: proxy, - } + router: router, + } + + router.Handle("GET", "/ping", s.pingHandler) + + router.Handle("GET", "/", s.indexHandler) + router.Handle("GET", "/nodes", s.nodesHandler) + router.Handle("GET", "/node/:node", s.nodeHandler) + router.Handle("GET", "/topic/:topic", s.topicHandler) + router.Handle("GET", "/topic/:topic/:channel", s.channelHandler) + router.Handle("GET", "/static/:asset", s.embeddedAssetHandler) + router.Handle("GET", "/counter", s.counterHandler) + router.Handle("GET", "/counter/data", s.counterDataHandler) + router.Handle("GET", "/lookup", s.lookupHandler) + router.Handle("GET", "/graphite_data", s.graphiteDataHandler) + + router.Handle("POST", "/tombstone_topic_producer", s.tombstoneTopicProducerHandler) + router.Handle("POST", "/empty_topic", s.emptyTopicHandler) + router.Handle("POST", "/delete_topic", s.deleteTopicHandler) + router.Handle("POST", "/pause_topic", s.pauseTopicHandler) + router.Handle("POST", "/unpause_topic", s.pauseTopicHandler) + router.Handle("POST", "/empty_channel", s.emptyChannelHandler) + router.Handle("POST", "/delete_channel", s.deleteChannelHandler) + router.Handle("POST", "/pause_channel", s.pauseChannelHandler) + router.Handle("POST", "/unpause_channel", s.pauseChannelHandler) + router.Handle("POST", "/create_topic_channel", s.createTopicChannelHandler) + + if s.ctx.nsqadmin.opts.ProxyGraphite { + router.Handler("GET", "/render", s.proxy) + } + + return s } func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { - - if strings.HasPrefix(req.URL.Path, "/node/") { - s.nodeHandler(w, req) - return - } else if strings.HasPrefix(req.URL.Path, "/topic/") { - s.topicHandler(w, req) - return - } else if strings.HasPrefix(req.URL.Path, "/static/") { - if req.Method != "GET" { - s.ctx.nsqadmin.logf("ERROR: invalid %s to GET only method", req.Method) - http.Error(w, "INVALID_REQUEST", 500) - } else { - s.embeddedAssetHandler(w, req) - } - return - } - - switch req.URL.Path { - case "/": - s.indexHandler(w, req) - case "/ping": - s.pingHandler(w, req) - case "/nodes": - s.nodesHandler(w, req) - case "/tombstone_topic_producer": - s.tombstoneTopicProducerHandler(w, req) - case "/empty_topic": - s.emptyTopicHandler(w, req) - case "/delete_topic": - s.deleteTopicHandler(w, req) - case "/pause_topic": - s.pauseTopicHandler(w, req) - case "/unpause_topic": - s.pauseTopicHandler(w, req) - case "/delete_channel": - s.deleteChannelHandler(w, req) - case "/empty_channel": - s.emptyChannelHandler(w, req) - case "/pause_channel": - s.pauseChannelHandler(w, req) - case "/unpause_channel": - s.pauseChannelHandler(w, req) - case "/counter/data": - s.counterDataHandler(w, req) - case "/counter": - s.counterHandler(w, req) - case "/lookup": - s.lookupHandler(w, req) - case "/create_topic_channel": - s.createTopicChannelHandler(w, req) - case "/graphite_data": - s.graphiteDataHandler(w, req) - case "/render": - if !s.ctx.nsqadmin.opts.ProxyGraphite { - http.NotFound(w, req) - return - } - s.proxy.ServeHTTP(w, req) - default: - s.ctx.nsqadmin.logf("ERROR: 404 %s", req.URL.Path) - http.NotFound(w, req) - } + s.router.ServeHTTP(w, req) } -func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request) { +func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { w.Header().Set("Content-Length", "2") io.WriteString(w, "OK") } -func (s *httpServer) embeddedAssetHandler(w http.ResponseWriter, req *http.Request) { - var urlRegex = regexp.MustCompile(`^/static/(.+)$`) - matches := urlRegex.FindStringSubmatch(req.URL.Path) - if len(matches) == 0 { - s.ctx.nsqadmin.logf("ERROR: No embedded asset name for url - %s", req.URL.Path) - http.NotFound(w, req) - return - } - assetName := matches[1] +func (s *httpServer) embeddedAssetHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + assetName := ps.ByName("asset") s.ctx.nsqadmin.logf("INFO: Requesting embedded asset - %s", assetName) asset, error := templates.Asset(assetName) @@ -178,7 +144,7 @@ func (s *httpServer) embeddedAssetHandler(w http.ResponseWriter, req *http.Reque w.Write(asset) } -func (s *httpServer) indexHandler(w http.ResponseWriter, req *http.Request) { +func (s *httpServer) indexHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) @@ -211,28 +177,8 @@ func (s *httpServer) indexHandler(w http.ResponseWriter, req *http.Request) { } } -func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request) { - var urlRegex = regexp.MustCompile(`^/topic/(.*)$`) - matches := urlRegex.FindStringSubmatch(req.URL.Path) - if len(matches) == 0 { - http.Error(w, "INVALID_TOPIC", 500) - return - } - parts := strings.Split(matches[1], "/") - topicName := parts[0] - if !protocol.IsValidTopicName(topicName) { - http.Error(w, "INVALID_TOPIC", 500) - return - } - if len(parts) == 2 { - channelName := parts[1] - if !protocol.IsValidChannelName(channelName) { - http.Error(w, "INVALID_CHANNEL", 500) - } else { - s.channelHandler(w, req, topicName, channelName) - } - return - } +func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + topicName := ps.ByName("topic") reqParams, err := http_api.NewReqParams(req) if err != nil { @@ -287,7 +233,10 @@ func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request) { } } -func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, topicName string, channelName string) { +func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + topicName := ps.ByName("topic") + channelName := ps.ByName("channel") + reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) @@ -342,7 +291,7 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, to } } -func (s *httpServer) lookupHandler(w http.ResponseWriter, req *http.Request) { +func (s *httpServer) lookupHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) @@ -381,12 +330,7 @@ func (s *httpServer) lookupHandler(w http.ResponseWriter, req *http.Request) { } } -func (s *httpServer) createTopicChannelHandler(w http.ResponseWriter, req *http.Request) { - if req.Method != "POST" { - s.ctx.nsqadmin.logf("ERROR: invalid %s to POST only method", req.Method) - http.Error(w, "INVALID_REQUEST", 500) - return - } +func (s *httpServer) createTopicChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams := &http_api.PostParams{req} topicName, err := reqParams.Get("topic") @@ -460,12 +404,7 @@ func (s *httpServer) createTopicChannelHandler(w http.ResponseWriter, req *http. http.Redirect(w, req, "/lookup", 302) } -func (s *httpServer) tombstoneTopicProducerHandler(w http.ResponseWriter, req *http.Request) { - if req.Method != "POST" { - s.ctx.nsqadmin.logf("ERROR: invalid %s to POST only method", req.Method) - http.Error(w, "INVALID_REQUEST", 500) - return - } +func (s *httpServer) tombstoneTopicProducerHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams := &http_api.PostParams{req} topicName, err := reqParams.Get("topic") @@ -531,12 +470,7 @@ func (s *httpServer) tombstoneTopicProducerHandler(w http.ResponseWriter, req *h http.Redirect(w, req, rd, 302) } -func (s *httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request) { - if req.Method != "POST" { - s.ctx.nsqadmin.logf("ERROR: invalid %s to POST only method", req.Method) - http.Error(w, "INVALID_REQUEST", 500) - return - } +func (s *httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams := &http_api.PostParams{req} topicName, err := reqParams.Get("topic") @@ -586,12 +520,7 @@ func (s *httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request http.Redirect(w, req, rd, 302) } -func (s *httpServer) deleteChannelHandler(w http.ResponseWriter, req *http.Request) { - if req.Method != "POST" { - s.ctx.nsqadmin.logf("ERROR: invalid %s to POST only method", req.Method) - http.Error(w, "INVALID_REQUEST", 500) - return - } +func (s *httpServer) deleteChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams := &http_api.PostParams{req} topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams) @@ -642,12 +571,7 @@ func (s *httpServer) deleteChannelHandler(w http.ResponseWriter, req *http.Reque http.Redirect(w, req, rd, 302) } -func (s *httpServer) emptyTopicHandler(w http.ResponseWriter, req *http.Request) { - if req.Method != "POST" { - s.ctx.nsqadmin.logf("ERROR: invalid %s to POST only method", req.Method) - http.Error(w, "INVALID_REQUEST", 500) - return - } +func (s *httpServer) emptyTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams := &http_api.PostParams{req} topicName, err := reqParams.Get("topic") @@ -670,12 +594,7 @@ func (s *httpServer) emptyTopicHandler(w http.ResponseWriter, req *http.Request) http.Redirect(w, req, fmt.Sprintf("/topic/%s", url.QueryEscape(topicName)), 302) } -func (s *httpServer) pauseTopicHandler(w http.ResponseWriter, req *http.Request) { - if req.Method != "POST" { - s.ctx.nsqadmin.logf("ERROR: invalid %s to POST only method", req.Method) - http.Error(w, "INVALID_REQUEST", 500) - return - } +func (s *httpServer) pauseTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams := &http_api.PostParams{req} topicName, err := reqParams.Get("topic") @@ -703,12 +622,7 @@ func (s *httpServer) pauseTopicHandler(w http.ResponseWriter, req *http.Request) http.Redirect(w, req, fmt.Sprintf("/topic/%s", url.QueryEscape(topicName)), 302) } -func (s *httpServer) emptyChannelHandler(w http.ResponseWriter, req *http.Request) { - if req.Method != "POST" { - s.ctx.nsqadmin.logf("ERROR: invalid %s to POST only method", req.Method) - http.Error(w, "INVALID_REQUEST", 500) - return - } +func (s *httpServer) emptyChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams := &http_api.PostParams{req} topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams) @@ -732,12 +646,7 @@ func (s *httpServer) emptyChannelHandler(w http.ResponseWriter, req *http.Reques url.QueryEscape(topicName), url.QueryEscape(channelName)), 302) } -func (s *httpServer) pauseChannelHandler(w http.ResponseWriter, req *http.Request) { - if req.Method != "POST" { - s.ctx.nsqadmin.logf("ERROR: invalid %s to POST only method", req.Method) - http.Error(w, "INVALID_REQUEST", 500) - return - } +func (s *httpServer) pauseChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams := &http_api.PostParams{req} topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams) @@ -765,7 +674,9 @@ func (s *httpServer) pauseChannelHandler(w http.ResponseWriter, req *http.Reques http.Redirect(w, req, fmt.Sprintf("/topic/%s/%s", url.QueryEscape(topicName), url.QueryEscape(channelName)), 302) } -func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request) { +func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + node := ps.ByName("node") + reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) @@ -773,15 +684,6 @@ func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request) { return } - var urlRegex = regexp.MustCompile(`^/node/(.*)$`) - matches := urlRegex.FindStringSubmatch(req.URL.Path) - if len(matches) == 0 { - http.Error(w, "INVALID_NODE", 500) - return - } - parts := strings.Split(matches[1], "/") - node := parts[0] - found := false for _, n := range s.ctx.nsqadmin.opts.NSQDHTTPAddresses { if node == n { @@ -838,7 +740,7 @@ func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request) { } } -func (s *httpServer) nodesHandler(w http.ResponseWriter, req *http.Request) { +func (s *httpServer) nodesHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) @@ -877,7 +779,7 @@ func (c counterTarget) Host() string { return "*" } -func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request) { +func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) @@ -906,7 +808,7 @@ func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request) { // The initial request is the number of messages processed since each nsqd started up. // Subsequent requsts pass that ID and get an updated delta based on each individual channel/nsqd message count // That ID must be re-requested or it will be expired. -func (s *httpServer) counterDataHandler(w http.ResponseWriter, req *http.Request) { +func (s *httpServer) counterDataHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) @@ -957,7 +859,7 @@ func (s *httpServer) counterDataHandler(w http.ResponseWriter, req *http.Request http_api.Respond(w, 200, "OK", data) } -func (s *httpServer) graphiteDataHandler(w http.ResponseWriter, req *http.Request) { +func (s *httpServer) graphiteDataHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) diff --git a/nsqadmin/nsqadmin.go b/nsqadmin/nsqadmin.go index 6cec943fa..c06e2a366 100644 --- a/nsqadmin/nsqadmin.go +++ b/nsqadmin/nsqadmin.go @@ -115,7 +115,7 @@ func (n *NSQAdmin) Main() { n.Unlock() httpServer := NewHTTPServer(&Context{n}) n.waitGroup.Wrap(func() { - http_api.Serve(n.httpListener, httpServer, n.opts.Logger, "HTTP") + http_api.Serve(n.httpListener, httpServer, "HTTP", n.opts.Logger) }) n.waitGroup.Wrap(func() { n.handleAdminActions() }) } diff --git a/nsqd/http.go b/nsqd/http.go index 8efaba01d..9a92dbaa3 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -9,10 +9,9 @@ import ( "io/ioutil" "net" "net/http" - httpprof "net/http/pprof" + "net/http/pprof" "net/url" "reflect" - "regexp" "strconv" "strings" "time" @@ -20,12 +19,82 @@ import ( "github.com/bitly/nsq/internal/http_api" "github.com/bitly/nsq/internal/protocol" "github.com/bitly/nsq/internal/version" + "github.com/julienschmidt/httprouter" ) type httpServer struct { ctx *context tlsEnabled bool tlsRequired bool + router http.Handler +} + +func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer { + router := httprouter.New() + router.HandleMethodNotAllowed = true + s := &httpServer{ + ctx: ctx, + tlsEnabled: tlsEnabled, + tlsRequired: tlsRequired, + router: router, + } + + // v1 negotiate + router.Handle("POST", "/pub", http_api.NegotiateVersion(s.doPUB)) + router.Handle("POST", "/mpub", http_api.NegotiateVersion(s.doMPUB)) + router.Handle("GET", "/stats", http_api.NegotiateVersion(s.doStats)) + router.Handle("GET", "/ping", s.pingHandler) + + // only v1 + router.Handle("POST", "/topic/create", http_api.V1(s.doCreateTopic)) + router.Handle("POST", "/topic/delete", http_api.V1(s.doDeleteTopic)) + router.Handle("POST", "/topic/empty", http_api.V1(s.doEmptyTopic)) + router.Handle("POST", "/topic/pause", http_api.V1(s.doPauseTopic)) + router.Handle("POST", "/topic/unpause", http_api.V1(s.doPauseTopic)) + router.Handle("POST", "/channel/create", http_api.V1(s.doCreateChannel)) + router.Handle("POST", "/channel/delete", http_api.V1(s.doDeleteChannel)) + router.Handle("POST", "/channel/empty", http_api.V1(s.doEmptyChannel)) + router.Handle("POST", "/channel/pause", http_api.V1(s.doPauseChannel)) + router.Handle("POST", "/channel/unpause", http_api.V1(s.doPauseChannel)) + router.Handle("GET", "/config/:opt", http_api.V1(s.doConfig)) + router.Handle("PUT", "/config/:opt", http_api.V1(s.doConfig)) + + // deprecated, v1 negotiate + router.Handle("POST", "/put", http_api.NegotiateVersion(s.doPUB)) + router.Handle("POST", "/mput", http_api.NegotiateVersion(s.doMPUB)) + router.Handle("GET", "/info", http_api.NegotiateVersion(s.doInfo)) + router.Handle("POST", "/create_topic", http_api.NegotiateVersion(s.doCreateTopic)) + router.Handle("POST", "/delete_topic", http_api.NegotiateVersion(s.doDeleteTopic)) + router.Handle("POST", "/empty_topic", http_api.NegotiateVersion(s.doEmptyTopic)) + router.Handle("POST", "/pause_topic", http_api.NegotiateVersion(s.doPauseTopic)) + router.Handle("POST", "/unpause_topic", http_api.NegotiateVersion(s.doPauseTopic)) + router.Handle("POST", "/create_channel", http_api.NegotiateVersion(s.doCreateChannel)) + router.Handle("POST", "/delete_channel", http_api.NegotiateVersion(s.doDeleteChannel)) + router.Handle("POST", "/empty_channel", http_api.NegotiateVersion(s.doEmptyChannel)) + router.Handle("POST", "/pause_channel", http_api.NegotiateVersion(s.doPauseChannel)) + router.Handle("POST", "/unpause_channel", http_api.NegotiateVersion(s.doPauseChannel)) + router.Handle("GET", "/create_topic", http_api.NegotiateVersion(s.doCreateTopic)) + router.Handle("GET", "/delete_topic", http_api.NegotiateVersion(s.doDeleteTopic)) + router.Handle("GET", "/empty_topic", http_api.NegotiateVersion(s.doEmptyTopic)) + router.Handle("GET", "/pause_topic", http_api.NegotiateVersion(s.doPauseTopic)) + router.Handle("GET", "/unpause_topic", http_api.NegotiateVersion(s.doPauseTopic)) + router.Handle("GET", "/create_channel", http_api.NegotiateVersion(s.doCreateChannel)) + router.Handle("GET", "/delete_channel", http_api.NegotiateVersion(s.doDeleteChannel)) + router.Handle("GET", "/empty_channel", http_api.NegotiateVersion(s.doEmptyChannel)) + router.Handle("GET", "/pause_channel", http_api.NegotiateVersion(s.doPauseChannel)) + router.Handle("GET", "/unpause_channel", http_api.NegotiateVersion(s.doPauseChannel)) + + // debug + router.HandlerFunc("GET", "/debug/pprof", pprof.Index) + router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline) + router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol) + router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile) + router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap")) + router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine")) + router.Handler("GET", "/debug/pprof/block", pprof.Handler("block")) + router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate")) + + return s } func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -33,149 +102,10 @@ func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { http_api.Respond(w, 403, "TLS_REQUIRED", nil) return } - - err := s.v1Router(w, req) - if err == nil { - return - } - - err = s.deprecatedRouter(w, req) - if err == nil { - return - } - - err = s.debugRouter(w, req) - if err != nil { - s.ctx.nsqd.logf("ERROR: %s", err) - http_api.Respond(w, 404, "NOT_FOUND", nil) - } + s.router.ServeHTTP(w, req) } -func (s *httpServer) debugRouter(w http.ResponseWriter, req *http.Request) error { - switch req.URL.Path { - case "/debug/pprof": - httpprof.Index(w, req) - case "/debug/pprof/cmdline": - httpprof.Cmdline(w, req) - case "/debug/pprof/symbol": - httpprof.Symbol(w, req) - case "/debug/pprof/heap": - httpprof.Handler("heap").ServeHTTP(w, req) - case "/debug/pprof/goroutine": - httpprof.Handler("goroutine").ServeHTTP(w, req) - case "/debug/pprof/profile": - httpprof.Profile(w, req) - case "/debug/pprof/block": - httpprof.Handler("block").ServeHTTP(w, req) - case "/debug/pprof/threadcreate": - httpprof.Handler("threadcreate").ServeHTTP(w, req) - default: - return fmt.Errorf("404 %s", req.URL.Path) - } - return nil -} - -func (s *httpServer) v1Router(w http.ResponseWriter, req *http.Request) error { - switch req.URL.Path { - case "/pub": - http_api.NegotiateVersionWrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doPUB(req) })) - case "/mpub": - http_api.NegotiateVersionWrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doMPUB(req) })) - - case "/stats": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doStats(req) }) - case "/ping": - s.pingHandler(w, req) - - case "/topic/create": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doCreateTopic(req) })) - case "/topic/delete": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doDeleteTopic(req) })) - case "/topic/empty": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doEmptyTopic(req) })) - case "/topic/pause": - fallthrough - case "/topic/unpause": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doPauseTopic(req) })) - - case "/channel/create": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doCreateChannel(req) })) - case "/channel/delete": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doDeleteChannel(req) })) - case "/channel/empty": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doEmptyChannel(req) })) - case "/channel/pause": - fallthrough - case "/channel/unpause": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doPauseChannel(req) })) - - default: - if strings.HasPrefix(req.URL.Path, "/config/") { - http_api.V1Wrapper(w, req, func() (interface{}, error) { return s.doConfig(req) }) - return nil - } - return fmt.Errorf("404 %s", req.URL.Path) - } - return nil -} - -func (s *httpServer) deprecatedRouter(w http.ResponseWriter, req *http.Request) error { - switch req.URL.Path { - case "/put": - http_api.NegotiateVersionWrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doPUB(req) })) - case "/mput": - http_api.NegotiateVersionWrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doMPUB(req) })) - case "/info": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doInfo(req) }) - case "/empty_topic": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doEmptyTopic(req) }) - case "/delete_topic": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doDeleteTopic(req) }) - case "/pause_topic": - fallthrough - case "/unpause_topic": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doPauseTopic(req) }) - case "/empty_channel": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doEmptyChannel(req) }) - case "/delete_channel": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doDeleteChannel(req) }) - case "/pause_channel": - fallthrough - case "/unpause_channel": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doPauseChannel(req) }) - case "/create_topic": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doCreateTopic(req) }) - case "/create_channel": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doCreateChannel(req) }) - default: - return fmt.Errorf("404 %s", req.URL.Path) - } - return nil -} - -func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request) { +func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { health := s.ctx.nsqd.GetHealth() code := 200 if !s.ctx.nsqd.IsHealthy() { @@ -186,7 +116,7 @@ func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request) { io.WriteString(w, health) } -func (s *httpServer) doInfo(req *http.Request) (interface{}, error) { +func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { return struct { Version string `json:"version"` }{ @@ -234,7 +164,7 @@ func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, e return reqParams, s.ctx.nsqd.GetTopic(topicName), nil } -func (s *httpServer) doPUB(req *http.Request) (interface{}, error) { +func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { // TODO: one day I'd really like to just error on chunked requests // to be able to fail "too big" requests before we even read @@ -283,7 +213,7 @@ func (s *httpServer) doPUB(req *http.Request) (interface{}, error) { return "OK", nil } -func (s *httpServer) doMPUB(req *http.Request) (interface{}, error) { +func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { var msgs []*Message var exit bool @@ -353,12 +283,12 @@ func (s *httpServer) doMPUB(req *http.Request) (interface{}, error) { return "OK", nil } -func (s *httpServer) doCreateTopic(req *http.Request) (interface{}, error) { +func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { _, _, err := s.getTopicFromQuery(req) return nil, err } -func (s *httpServer) doEmptyTopic(req *http.Request) (interface{}, error) { +func (s *httpServer) doEmptyTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqd.logf("ERROR: failed to parse request params - %s", err) @@ -387,7 +317,7 @@ func (s *httpServer) doEmptyTopic(req *http.Request) (interface{}, error) { return nil, nil } -func (s *httpServer) doDeleteTopic(req *http.Request) (interface{}, error) { +func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqd.logf("ERROR: failed to parse request params - %s", err) @@ -407,7 +337,7 @@ func (s *httpServer) doDeleteTopic(req *http.Request) (interface{}, error) { return nil, nil } -func (s *httpServer) doPauseTopic(req *http.Request) (interface{}, error) { +func (s *httpServer) doPauseTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqd.logf("ERROR: failed to parse request params - %s", err) @@ -442,7 +372,7 @@ func (s *httpServer) doPauseTopic(req *http.Request) (interface{}, error) { return nil, nil } -func (s *httpServer) doCreateChannel(req *http.Request) (interface{}, error) { +func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { _, topic, channelName, err := s.getExistingTopicFromQuery(req) if err != nil { return nil, err @@ -451,7 +381,7 @@ func (s *httpServer) doCreateChannel(req *http.Request) (interface{}, error) { return nil, nil } -func (s *httpServer) doEmptyChannel(req *http.Request) (interface{}, error) { +func (s *httpServer) doEmptyChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { _, topic, channelName, err := s.getExistingTopicFromQuery(req) if err != nil { return nil, err @@ -470,7 +400,7 @@ func (s *httpServer) doEmptyChannel(req *http.Request) (interface{}, error) { return nil, nil } -func (s *httpServer) doDeleteChannel(req *http.Request) (interface{}, error) { +func (s *httpServer) doDeleteChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { _, topic, channelName, err := s.getExistingTopicFromQuery(req) if err != nil { return nil, err @@ -484,7 +414,7 @@ func (s *httpServer) doDeleteChannel(req *http.Request) (interface{}, error) { return nil, nil } -func (s *httpServer) doPauseChannel(req *http.Request) (interface{}, error) { +func (s *httpServer) doPauseChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { _, topic, channelName, err := s.getExistingTopicFromQuery(req) if err != nil { return nil, err @@ -513,7 +443,7 @@ func (s *httpServer) doPauseChannel(req *http.Request) (interface{}, error) { return nil, nil } -func (s *httpServer) doStats(req *http.Request) (interface{}, error) { +func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqd.logf("ERROR: failed to parse request params - %s", err) @@ -604,13 +534,8 @@ func (s *httpServer) printStats(stats []TopicStats, health string, startTime tim return buf.Bytes() } -func (s *httpServer) doConfig(req *http.Request) (interface{}, error) { - var urlRegex = regexp.MustCompile(`^/config/([a-z0-9_]+)$`) - matches := urlRegex.FindStringSubmatch(req.URL.Path) - if len(matches) == 0 { - return nil, http_api.Err{400, "INVALID_OPTION"} - } - opt := matches[1] +func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { + opt := ps.ByName("opt") if req.Method == "PUT" { // add 1 so that it's greater than our max when we test for it diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index ddcd1a1c8..9133e66b9 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -239,13 +239,9 @@ func (n *NSQD) Main() { n.Lock() n.httpsListener = httpsListener n.Unlock() - httpsServer := &httpServer{ - ctx: ctx, - tlsEnabled: true, - tlsRequired: true, - } + httpsServer := newHTTPServer(ctx, true, true) n.waitGroup.Wrap(func() { - http_api.Serve(n.httpsListener, httpsServer, n.getOpts().Logger, "HTTPS") + http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.getOpts().Logger) }) } httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress) @@ -256,13 +252,9 @@ func (n *NSQD) Main() { n.Lock() n.httpListener = httpListener n.Unlock() - httpServer := &httpServer{ - ctx: ctx, - tlsEnabled: false, - tlsRequired: n.getOpts().TLSRequired == TLSRequired, - } + httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired) n.waitGroup.Wrap(func() { - http_api.Serve(n.httpListener, httpServer, n.getOpts().Logger, "HTTP") + http_api.Serve(n.httpListener, httpServer, "HTTP", n.getOpts().Logger) }) n.waitGroup.Wrap(func() { n.queueScanLoop() }) diff --git a/nsqlookupd/http.go b/nsqlookupd/http.go index a1259111a..f3f5e9d2b 100644 --- a/nsqlookupd/http.go +++ b/nsqlookupd/http.go @@ -4,136 +4,79 @@ import ( "fmt" "io" "net/http" - httpprof "net/http/pprof" + "net/http/pprof" "sync/atomic" "github.com/bitly/nsq/internal/http_api" "github.com/bitly/nsq/internal/protocol" "github.com/bitly/nsq/internal/version" + "github.com/julienschmidt/httprouter" ) type httpServer struct { - ctx *Context + ctx *Context + router http.Handler } -func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { - err := s.v1Router(w, req) - if err == nil { - return - } - - err = s.deprecatedRouter(w, req) - if err == nil { - return +func newHTTPServer(ctx *Context) *httpServer { + router := httprouter.New() + router.HandleMethodNotAllowed = true + s := &httpServer{ + ctx: ctx, + router: router, } - err = s.debugRouter(w, req) - if err != nil { - s.ctx.nsqlookupd.logf("ERROR: %s", err) - http_api.Respond(w, 404, "NOT_FOUND", nil) - } + // v1 negotiate + router.Handle("GET", "/ping", s.pingHandler) + router.Handle("GET", "/debug", http_api.NegotiateVersion(s.doDebug)) + router.Handle("GET", "/lookup", http_api.NegotiateVersion(s.doLookup)) + router.Handle("GET", "/topics", http_api.NegotiateVersion(s.doTopics)) + router.Handle("GET", "/channels", http_api.NegotiateVersion(s.doChannels)) + router.Handle("GET", "/nodes", http_api.NegotiateVersion(s.doNodes)) + + // only v1 + router.Handle("POST", "/topic/create", http_api.V1(s.doCreateTopic)) + router.Handle("POST", "/topic/delete", http_api.V1(s.doDeleteTopic)) + router.Handle("POST", "/channel/create", http_api.V1(s.doCreateChannel)) + router.Handle("POST", "/channel/delete", http_api.V1(s.doDeleteChannel)) + router.Handle("POST", "/topic/tombstone", http_api.V1(s.doTombstoneTopicProducer)) + + // deprecated, v1 negotiate + router.Handle("GET", "/info", http_api.NegotiateVersion(s.doInfo)) + router.Handle("POST", "/create_topic", http_api.NegotiateVersion(s.doCreateTopic)) + router.Handle("POST", "/delete_topic", http_api.NegotiateVersion(s.doDeleteTopic)) + router.Handle("POST", "/create_channel", http_api.NegotiateVersion(s.doCreateChannel)) + router.Handle("POST", "/delete_channel", http_api.NegotiateVersion(s.doDeleteChannel)) + router.Handle("POST", "/tombstone_topic_producer", http_api.NegotiateVersion(s.doTombstoneTopicProducer)) + router.Handle("GET", "/create_topic", http_api.NegotiateVersion(s.doCreateTopic)) + router.Handle("GET", "/delete_topic", http_api.NegotiateVersion(s.doDeleteTopic)) + router.Handle("GET", "/create_channel", http_api.NegotiateVersion(s.doCreateChannel)) + router.Handle("GET", "/delete_channel", http_api.NegotiateVersion(s.doDeleteChannel)) + router.Handle("GET", "/tombstone_topic_producer", http_api.NegotiateVersion(s.doTombstoneTopicProducer)) + + // debug + router.HandlerFunc("GET", "/debug/pprof", pprof.Index) + router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline) + router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol) + router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile) + router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap")) + router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine")) + router.Handler("GET", "/debug/pprof/block", pprof.Handler("block")) + router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate")) + + return s } -func (s *httpServer) debugRouter(w http.ResponseWriter, req *http.Request) error { - switch req.URL.Path { - case "/debug": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doDebug(req) }) - case "/debug/pprof": - httpprof.Index(w, req) - case "/debug/pprof/cmdline": - httpprof.Cmdline(w, req) - case "/debug/pprof/symbol": - httpprof.Symbol(w, req) - case "/debug/pprof/heap": - httpprof.Handler("heap").ServeHTTP(w, req) - case "/debug/pprof/goroutine": - httpprof.Handler("goroutine").ServeHTTP(w, req) - case "/debug/pprof/profile": - httpprof.Profile(w, req) - case "/debug/pprof/block": - httpprof.Handler("block").ServeHTTP(w, req) - case "/debug/pprof/threadcreate": - httpprof.Handler("threadcreate").ServeHTTP(w, req) - default: - return fmt.Errorf("404 %s", req.URL.Path) - } - return nil -} - -func (s *httpServer) v1Router(w http.ResponseWriter, req *http.Request) error { - switch req.URL.Path { - case "/ping": - s.pingHandler(w, req) - - case "/lookup": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doLookup(req) }) - case "/topics": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doTopics(req) }) - case "/channels": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doChannels(req) }) - case "/nodes": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doNodes(req) }) - - case "/topic/create": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doCreateTopic(req) })) - case "/topic/delete": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doDeleteTopic(req) })) - case "/topic/tombstone": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doTombstoneTopicProducer(req) })) - - case "/channel/create": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doCreateChannel(req) })) - case "/channel/delete": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doDeleteChannel(req) })) - - default: - return fmt.Errorf("404 %s", req.URL.Path) - } - return nil -} - -func (s *httpServer) deprecatedRouter(w http.ResponseWriter, req *http.Request) error { - switch req.URL.Path { - case "/info": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doInfo(req) }) - case "/delete_topic": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doDeleteTopic(req) }) - case "/delete_channel": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doDeleteChannel(req) }) - case "/tombstone_topic_producer": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doTombstoneTopicProducer(req) }) - case "/create_topic": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doCreateTopic(req) }) - case "/create_channel": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doCreateChannel(req) }) - default: - return fmt.Errorf("404 %s", req.URL.Path) - } - return nil +func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { + s.router.ServeHTTP(w, req) } -func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request) { +func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { w.Header().Set("Content-Length", "2") io.WriteString(w, "OK") } -func (s *httpServer) doInfo(req *http.Request) (interface{}, error) { +func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { return struct { Version string `json:"version"` }{ @@ -141,14 +84,14 @@ func (s *httpServer) doInfo(req *http.Request) (interface{}, error) { }, nil } -func (s *httpServer) doTopics(req *http.Request) (interface{}, error) { +func (s *httpServer) doTopics(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { topics := s.ctx.nsqlookupd.DB.FindRegistrations("topic", "*", "").Keys() return map[string]interface{}{ "topics": topics, }, nil } -func (s *httpServer) doChannels(req *http.Request) (interface{}, error) { +func (s *httpServer) doChannels(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { return nil, http_api.Err{400, "INVALID_REQUEST"} @@ -165,7 +108,7 @@ func (s *httpServer) doChannels(req *http.Request) (interface{}, error) { }, nil } -func (s *httpServer) doLookup(req *http.Request) (interface{}, error) { +func (s *httpServer) doLookup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { return nil, http_api.Err{400, "INVALID_REQUEST"} @@ -191,7 +134,7 @@ func (s *httpServer) doLookup(req *http.Request) (interface{}, error) { }, nil } -func (s *httpServer) doCreateTopic(req *http.Request) (interface{}, error) { +func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { return nil, http_api.Err{400, "INVALID_REQUEST"} @@ -213,7 +156,7 @@ func (s *httpServer) doCreateTopic(req *http.Request) (interface{}, error) { return nil, nil } -func (s *httpServer) doDeleteTopic(req *http.Request) (interface{}, error) { +func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { return nil, http_api.Err{400, "INVALID_REQUEST"} @@ -239,7 +182,7 @@ func (s *httpServer) doDeleteTopic(req *http.Request) (interface{}, error) { return nil, nil } -func (s *httpServer) doTombstoneTopicProducer(req *http.Request) (interface{}, error) { +func (s *httpServer) doTombstoneTopicProducer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { return nil, http_api.Err{400, "INVALID_REQUEST"} @@ -267,7 +210,7 @@ func (s *httpServer) doTombstoneTopicProducer(req *http.Request) (interface{}, e return nil, nil } -func (s *httpServer) doCreateChannel(req *http.Request) (interface{}, error) { +func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { return nil, http_api.Err{400, "INVALID_REQUEST"} @@ -289,7 +232,7 @@ func (s *httpServer) doCreateChannel(req *http.Request) (interface{}, error) { return nil, nil } -func (s *httpServer) doDeleteChannel(req *http.Request) (interface{}, error) { +func (s *httpServer) doDeleteChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { return nil, http_api.Err{400, "INVALID_REQUEST"} @@ -324,7 +267,7 @@ type node struct { Topics []string `json:"topics"` } -func (s *httpServer) doNodes(req *http.Request) (interface{}, error) { +func (s *httpServer) doNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { // dont filter out tombstoned nodes producers := s.ctx.nsqlookupd.DB.FindProducers("client", "", "").FilterByActive( s.ctx.nsqlookupd.opts.InactiveProducerTimeout, 0) @@ -361,7 +304,7 @@ func (s *httpServer) doNodes(req *http.Request) (interface{}, error) { }, nil } -func (s *httpServer) doDebug(req *http.Request) (interface{}, error) { +func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { s.ctx.nsqlookupd.DB.RLock() defer s.ctx.nsqlookupd.DB.RUnlock() diff --git a/nsqlookupd/nsqlookupd.go b/nsqlookupd/nsqlookupd.go index 57c743e78..3cfcf8be6 100644 --- a/nsqlookupd/nsqlookupd.go +++ b/nsqlookupd/nsqlookupd.go @@ -61,9 +61,9 @@ func (l *NSQLookupd) Main() { l.Lock() l.httpListener = httpListener l.Unlock() - httpServer := &httpServer{ctx: ctx} + httpServer := newHTTPServer(ctx) l.waitGroup.Wrap(func() { - http_api.Serve(httpListener, httpServer, l.opts.Logger, "HTTP") + http_api.Serve(httpListener, httpServer, "HTTP", l.opts.Logger) }) }