diff --git a/internal/http_api/api_response.go b/internal/http_api/api_response.go index 1eb6b04bc..664db786a 100644 --- a/internal/http_api/api_response.go +++ b/internal/http_api/api_response.go @@ -3,12 +3,17 @@ package http_api import ( "encoding/json" "fmt" + "io" "net/http" "strconv" + "time" + "github.com/bitly/nsq/internal/app" "github.com/julienschmidt/httprouter" ) +type Decorator func(APIHandler) APIHandler + type APIHandler func(http.ResponseWriter, *http.Request, httprouter.Params) (interface{}, error) type Err struct { @@ -28,8 +33,24 @@ func acceptVersion(req *http.Request) int { return 0 } -func NegotiateVersion(f APIHandler) httprouter.Handle { - return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func PlainText(f APIHandler) APIHandler { + return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { + code := 200 + data, err := f(w, req, ps) + if err != nil { + code = err.(Err).Code + data = err.Error() + } + response := data.(string) + w.Header().Set("Content-Length", strconv.Itoa(len(response))) + w.WriteHeader(code) + io.WriteString(w, response) + return nil, nil + } +} + +func NegotiateVersion(f APIHandler) APIHandler { + return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { data, err := f(w, req, ps) if err != nil { if acceptVersion(req) == 1 { @@ -38,24 +59,26 @@ func NegotiateVersion(f APIHandler) httprouter.Handle { // this handler always returns 500 for backwards compatibility Respond(w, 500, err.Error(), nil) } - return + return nil, nil } if acceptVersion(req) == 1 { RespondV1(w, 200, data) } else { Respond(w, 200, "OK", data) } + return nil, nil } } -func V1(f APIHandler) httprouter.Handle { - return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func V1(f APIHandler) APIHandler { + return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { data, err := f(w, req, ps) if err != nil { RespondV1(w, err.(Err).Code, err) - return + return nil, nil } RespondV1(w, 200, data) + return nil, nil } } @@ -125,3 +148,55 @@ func RespondV1(w http.ResponseWriter, code int, data interface{}) { w.WriteHeader(code) w.Write(response) } + +func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle { + decorated := f + for _, decorate := range ds { + decorated = decorate(decorated) + } + return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + decorated(w, req, ps) + } +} + +func Log(l app.Logger) Decorator { + return func(f APIHandler) APIHandler { + return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { + start := time.Now() + response, err := f(w, req, ps) + elapsed := time.Since(start) + status := 200 + if e, ok := err.(Err); ok { + status = e.Code + } + l.Output(2, fmt.Sprintf("%d %s %s (%s) %s", + status, req.Method, req.URL.RequestURI(), req.RemoteAddr, elapsed)) + return response, err + } + } +} + +func LogPanicHandler(l app.Logger) func(w http.ResponseWriter, req *http.Request, p interface{}) { + return func(w http.ResponseWriter, req *http.Request, p interface{}) { + l.Output(2, fmt.Sprintf("ERROR: panic in HTTP handler - %s", p)) + Decorate(func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { + return nil, Err{500, "INTERNAL_ERROR"} + }, Log(l), V1)(w, req, nil) + } +} + +func LogNotFoundHandler(l app.Logger) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + Decorate(func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { + return nil, Err{404, "NOT_FOUND"} + }, Log(l), V1)(w, req, nil) + }) +} + +func LogMethodNotAllowedHandler(l app.Logger) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + Decorate(func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { + return nil, Err{405, "METHOD_NOT_ALLOWED"} + }, Log(l), V1)(w, req, nil) + }) +} diff --git a/nsqadmin/http.go b/nsqadmin/http.go index e8fc378c3..d2f365d5f 100644 --- a/nsqadmin/http.go +++ b/nsqadmin/http.go @@ -3,7 +3,6 @@ package nsqadmin import ( "fmt" "html/template" - "io" "io/ioutil" "net" "net/http" @@ -74,8 +73,13 @@ func NewHTTPServer(ctx *Context) *httpServer { proxy = NewSingleHostReverseProxy(ctx.nsqadmin.graphiteURL, 20*time.Second) } + log := http_api.Log(ctx.nsqadmin.opts.Logger) + router := httprouter.New() router.HandleMethodNotAllowed = true + router.PanicHandler = http_api.LogPanicHandler(ctx.nsqadmin.opts.Logger) + router.NotFound = http_api.LogNotFoundHandler(ctx.nsqadmin.opts.Logger) + router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqadmin.opts.Logger) s := &httpServer{ ctx: ctx, counters: make(map[string]map[string]int64), @@ -83,29 +87,29 @@ func NewHTTPServer(ctx *Context) *httpServer { router: router, } - router.Handle("GET", "/ping", s.pingHandler) - - router.Handle("GET", "/", s.indexHandler) - router.Handle("GET", "/nodes", s.nodesHandler) - router.Handle("GET", "/node/:node", s.nodeHandler) - router.Handle("GET", "/topic/:topic", s.topicHandler) - router.Handle("GET", "/topic/:topic/:channel", s.channelHandler) - router.Handle("GET", "/static/:asset", s.embeddedAssetHandler) - router.Handle("GET", "/counter", s.counterHandler) - router.Handle("GET", "/counter/data", s.counterDataHandler) - router.Handle("GET", "/lookup", s.lookupHandler) - router.Handle("GET", "/graphite_data", s.graphiteDataHandler) - - router.Handle("POST", "/tombstone_topic_producer", s.tombstoneTopicProducerHandler) - router.Handle("POST", "/empty_topic", s.emptyTopicHandler) - router.Handle("POST", "/delete_topic", s.deleteTopicHandler) - router.Handle("POST", "/pause_topic", s.pauseTopicHandler) - router.Handle("POST", "/unpause_topic", s.pauseTopicHandler) - router.Handle("POST", "/empty_channel", s.emptyChannelHandler) - router.Handle("POST", "/delete_channel", s.deleteChannelHandler) - router.Handle("POST", "/pause_channel", s.pauseChannelHandler) - router.Handle("POST", "/unpause_channel", s.pauseChannelHandler) - router.Handle("POST", "/create_topic_channel", s.createTopicChannelHandler) + router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText)) + + router.Handle("GET", "/", http_api.Decorate(s.indexHandler, log)) + router.Handle("GET", "/nodes", http_api.Decorate(s.nodesHandler, log)) + router.Handle("GET", "/node/:node", http_api.Decorate(s.nodeHandler, log)) + router.Handle("GET", "/topic/:topic", http_api.Decorate(s.topicHandler, log)) + router.Handle("GET", "/topic/:topic/:channel", http_api.Decorate(s.channelHandler, log)) + router.Handle("GET", "/static/:asset", http_api.Decorate(s.embeddedAssetHandler, log)) + router.Handle("GET", "/counter", http_api.Decorate(s.counterHandler, log)) + router.Handle("GET", "/counter/data", http_api.Decorate(s.counterDataHandler, log)) + router.Handle("GET", "/lookup", http_api.Decorate(s.lookupHandler, log)) + router.Handle("GET", "/graphite_data", http_api.Decorate(s.graphiteDataHandler, log)) + + router.Handle("POST", "/tombstone_topic_producer", http_api.Decorate(s.tombstoneTopicProducerHandler, log)) + router.Handle("POST", "/empty_topic", http_api.Decorate(s.emptyTopicHandler, log)) + router.Handle("POST", "/delete_topic", http_api.Decorate(s.deleteTopicHandler, log)) + router.Handle("POST", "/pause_topic", http_api.Decorate(s.pauseTopicHandler, log)) + router.Handle("POST", "/unpause_topic", http_api.Decorate(s.pauseTopicHandler, log)) + router.Handle("POST", "/empty_channel", http_api.Decorate(s.emptyChannelHandler, log)) + router.Handle("POST", "/delete_channel", http_api.Decorate(s.deleteChannelHandler, log)) + router.Handle("POST", "/pause_channel", http_api.Decorate(s.pauseChannelHandler, log)) + router.Handle("POST", "/unpause_channel", http_api.Decorate(s.pauseChannelHandler, log)) + router.Handle("POST", "/create_topic_channel", http_api.Decorate(s.createTopicChannelHandler, log)) if s.ctx.nsqadmin.opts.ProxyGraphite { router.Handler("GET", "/render", s.proxy) @@ -118,38 +122,31 @@ func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { s.router.ServeHTTP(w, req) } -func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - w.Header().Set("Content-Length", "2") - io.WriteString(w, "OK") +func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { + return "OK", nil } -func (s *httpServer) embeddedAssetHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func (s *httpServer) embeddedAssetHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { assetName := ps.ByName("asset") - s.ctx.nsqadmin.logf("INFO: Requesting embedded asset - %s", assetName) asset, error := templates.Asset(assetName) if error != nil { - s.ctx.nsqadmin.logf("ERROR: embedded asset access - %s : %s", assetName, error) - http.NotFound(w, req) - return + return nil, http_api.Err{404, "NOT_FOUND"} } - assetLen := len(asset) if strings.HasSuffix(assetName, ".js") { w.Header().Set("Content-Type", "text/javascript") } else if strings.HasSuffix(assetName, ".css") { w.Header().Set("Content-Type", "text/css") } - w.Header().Set("Content-Length", fmt.Sprintf("%d", assetLen)) - w.Write(asset) + + return asset, nil } -func (s *httpServer) indexHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func (s *httpServer) indexHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { - s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) - http.Error(w, "INVALID_REQUEST", 500) - return + return nil, http_api.Err{400, "INVALID_REQUEST"} } var topics []string @@ -172,19 +169,18 @@ func (s *httpServer) indexHandler(w http.ResponseWriter, req *http.Request, ps h } err = templates.T.ExecuteTemplate(w, "index.html", p) if err != nil { - s.ctx.nsqadmin.logf("Template Error %s", err) - http.Error(w, "Template Error", 500) + s.ctx.nsqadmin.logf("ERROR: executing template - %s", err) + return nil, http_api.Err{500, "INTERNAL_ERROR"} } + return nil, nil } -func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { topicName := ps.ByName("topic") reqParams, err := http_api.NewReqParams(req) if err != nil { - s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) - http.Error(w, "INVALID_REQUEST", 500) - return + return nil, http_api.Err{400, "INVALID_REQUEST"} } producers := s.getProducers(topicName) @@ -228,30 +224,26 @@ func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps h } err = templates.T.ExecuteTemplate(w, "topic.html", p) if err != nil { - s.ctx.nsqadmin.logf("Template Error %s", err) - http.Error(w, "Template Error", 500) + s.ctx.nsqadmin.logf("ERROR: executing template - %s", err) + return nil, http_api.Err{500, "INTERNAL_ERROR"} } + return nil, nil } -func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { topicName := ps.ByName("topic") channelName := ps.ByName("channel") reqParams, err := http_api.NewReqParams(req) if err != nil { - s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) - http.Error(w, "INVALID_REQUEST", 500) - return + return nil, http_api.Err{400, "INVALID_REQUEST"} } producers := s.getProducers(topicName) _, allChannelStats, _ := lookupd.GetNSQDStats(producers, topicName) channelStats, ok := allChannelStats[channelName] - if !ok { - s.ctx.nsqadmin.logf("ERROR: channel stats do not exist") - http.Error(w, "INVALID_REQUEST", 500) - return + return nil, http_api.Err{404, "NOT_FOUND"} } hasE2eLatency := channelStats.E2eProcessingLatency != nil && @@ -286,17 +278,16 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps err = templates.T.ExecuteTemplate(w, "channel.html", p) if err != nil { - s.ctx.nsqadmin.logf("Template Error %s", err) - http.Error(w, "Template Error", 500) + s.ctx.nsqadmin.logf("ERROR: executing template - %s", err) + return nil, http_api.Err{500, "INTERNAL_ERROR"} } + return nil, nil } -func (s *httpServer) lookupHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func (s *httpServer) lookupHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { - s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) - http.Error(w, "INVALID_REQUEST", 500) - return + return nil, http_api.Err{400, "INVALID_REQUEST"} } channels := make(map[string][]string) @@ -325,24 +316,23 @@ func (s *httpServer) lookupHandler(w http.ResponseWriter, req *http.Request, ps } err = templates.T.ExecuteTemplate(w, "lookup.html", p) if err != nil { - s.ctx.nsqadmin.logf("Template Error %s", err) - http.Error(w, "Template Error", 500) + s.ctx.nsqadmin.logf("ERROR: executing template - %s", err) + return nil, http_api.Err{500, "INTERNAL_ERROR"} } + return nil, nil } -func (s *httpServer) createTopicChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func (s *httpServer) createTopicChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams := &http_api.PostParams{req} topicName, err := reqParams.Get("topic") if err != nil || !protocol.IsValidTopicName(topicName) { - http.Error(w, "INVALID_TOPIC", 500) - return + return nil, http_api.Err{400, "INVALID_ARG_TOPIC"} } channelName, err := reqParams.Get("channel") if err != nil || (len(channelName) > 0 && !protocol.IsValidChannelName(channelName)) { - http.Error(w, "INVALID_CHANNEL", 500) - return + return nil, http_api.Err{400, "INVALID_ARG_CHANNEL"} } for _, addr := range s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses { @@ -402,21 +392,20 @@ func (s *httpServer) createTopicChannelHandler(w http.ResponseWriter, req *http. } http.Redirect(w, req, "/lookup", 302) + return nil, nil } -func (s *httpServer) tombstoneTopicProducerHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func (s *httpServer) tombstoneTopicProducerHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams := &http_api.PostParams{req} topicName, err := reqParams.Get("topic") if err != nil { - http.Error(w, "MISSING_ARG_TOPIC", 500) - return + return nil, http_api.Err{400, "INVALID_ARG_TOPIC"} } node, err := reqParams.Get("node") if err != nil { - http.Error(w, "MISSING_ARG_NODE", 500) - return + return nil, http_api.Err{400, "INVALID_ARG_NODE"} } rd, _ := reqParams.Get("rd") @@ -468,15 +457,15 @@ func (s *httpServer) tombstoneTopicProducerHandler(w http.ResponseWriter, req *h s.notifyAdminAction("tombstone_topic_producer", topicName, "", node, req) http.Redirect(w, req, rd, 302) + return nil, nil } -func (s *httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func (s *httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams := &http_api.PostParams{req} topicName, err := reqParams.Get("topic") if err != nil { - http.Error(w, "MISSING_ARG_TOPIC", 500) - return + return nil, http_api.Err{400, "INVALID_ARG_TOPIC"} } rd, _ := reqParams.Get("rd") @@ -518,15 +507,15 @@ func (s *httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request s.notifyAdminAction("delete_topic", topicName, "", "", req) http.Redirect(w, req, rd, 302) + return nil, nil } -func (s *httpServer) deleteChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func (s *httpServer) deleteChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams := &http_api.PostParams{req} topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams) if err != nil { - http.Error(w, err.Error(), 500) - return + return nil, http_api.Err{400, err.Error()} } rd, _ := reqParams.Get("rd") @@ -569,15 +558,15 @@ func (s *httpServer) deleteChannelHandler(w http.ResponseWriter, req *http.Reque s.notifyAdminAction("delete_channel", topicName, channelName, "", req) http.Redirect(w, req, rd, 302) + return nil, nil } -func (s *httpServer) emptyTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func (s *httpServer) emptyTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams := &http_api.PostParams{req} topicName, err := reqParams.Get("topic") if err != nil { - http.Error(w, "MISSING_ARG_TOPIC", 500) - return + return nil, http_api.Err{400, "INVALID_ARG_TOPIC"} } producerAddrs := s.getProducers(topicName) @@ -592,15 +581,15 @@ func (s *httpServer) emptyTopicHandler(w http.ResponseWriter, req *http.Request, s.notifyAdminAction("empty_topic", topicName, "", "", req) http.Redirect(w, req, fmt.Sprintf("/topic/%s", url.QueryEscape(topicName)), 302) + return nil, nil } -func (s *httpServer) pauseTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func (s *httpServer) pauseTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams := &http_api.PostParams{req} topicName, err := reqParams.Get("topic") if err != nil { - http.Error(w, "MISSING_ARG_TOPIC", 500) - return + return nil, http_api.Err{400, "INVALID_ARG_TOPIC"} } verb := "pause" @@ -620,15 +609,15 @@ func (s *httpServer) pauseTopicHandler(w http.ResponseWriter, req *http.Request, s.notifyAdminAction(verb+"_topic", topicName, "", "", req) http.Redirect(w, req, fmt.Sprintf("/topic/%s", url.QueryEscape(topicName)), 302) + return nil, nil } -func (s *httpServer) emptyChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func (s *httpServer) emptyChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams := &http_api.PostParams{req} topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams) if err != nil { - http.Error(w, err.Error(), 500) - return + return nil, http_api.Err{400, err.Error()} } producerAddrs := s.getProducers(topicName) @@ -644,15 +633,15 @@ func (s *httpServer) emptyChannelHandler(w http.ResponseWriter, req *http.Reques http.Redirect(w, req, fmt.Sprintf("/topic/%s/%s", url.QueryEscape(topicName), url.QueryEscape(channelName)), 302) + return nil, nil } -func (s *httpServer) pauseChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func (s *httpServer) pauseChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams := &http_api.PostParams{req} topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams) if err != nil { - http.Error(w, err.Error(), 500) - return + return nil, http_api.Err{400, err.Error()} } verb := "pause" @@ -672,16 +661,15 @@ func (s *httpServer) pauseChannelHandler(w http.ResponseWriter, req *http.Reques s.notifyAdminAction(verb+"_channel", topicName, channelName, "", req) http.Redirect(w, req, fmt.Sprintf("/topic/%s/%s", url.QueryEscape(topicName), url.QueryEscape(channelName)), 302) + return nil, nil } -func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { node := ps.ByName("node") reqParams, err := http_api.NewReqParams(req) if err != nil { - s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) - http.Error(w, "INVALID_REQUEST", 500) - return + return nil, http_api.Err{400, "INVALID_REQUEST"} } found := false @@ -699,8 +687,7 @@ func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps ht } } if !found { - http.Error(w, "INVALID_NODE", 500) - return + return nil, http_api.Err{404, "NOT_FOUND"} } topicStats, channelStats, _ := lookupd.GetNSQDStats([]string{node}, "") @@ -735,17 +722,16 @@ func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps ht } err = templates.T.ExecuteTemplate(w, "node.html", p) if err != nil { - s.ctx.nsqadmin.logf("Template Error %s", err) - http.Error(w, "Template Error", 500) + s.ctx.nsqadmin.logf("ERROR: executing template - %s", err) + return nil, http_api.Err{500, "INTERNAL_ERROR"} } + return nil, nil } -func (s *httpServer) nodesHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func (s *httpServer) nodesHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { - s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) - http.Error(w, "INVALID_REQUEST", 500) - return + return nil, http_api.Err{400, "INVALID_REQUEST"} } producers, _ := lookupd.GetLookupdProducers(s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses) @@ -764,9 +750,10 @@ func (s *httpServer) nodesHandler(w http.ResponseWriter, req *http.Request, ps h } err = templates.T.ExecuteTemplate(w, "nodes.html", p) if err != nil { - s.ctx.nsqadmin.logf("Template Error %s", err) - http.Error(w, "Template Error", 500) + s.ctx.nsqadmin.logf("ERROR: executing template - %s", err) + return nil, http_api.Err{500, "INTERNAL_ERROR"} } + return nil, nil } type counterTarget struct{} @@ -779,12 +766,10 @@ func (c counterTarget) Host() string { return "*" } -func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { - s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) - http.Error(w, "INVALID_REQUEST", 500) - return + return nil, http_api.Err{400, "INVALID_REQUEST"} } p := struct { Title string @@ -799,21 +784,20 @@ func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request, ps } err = templates.T.ExecuteTemplate(w, "counter.html", p) if err != nil { - s.ctx.nsqadmin.logf("Template Error %s", err) - http.Error(w, "Template Error", 500) + s.ctx.nsqadmin.logf("ERROR: executing template - %s", err) + return nil, http_api.Err{500, "INTERNAL_ERROR"} } + return nil, nil } // this endpoint works by giving out an ID that maps to a stats dictionary // The initial request is the number of messages processed since each nsqd started up. // Subsequent requsts pass that ID and get an updated delta based on each individual channel/nsqd message count // That ID must be re-requested or it will be expired. -func (s *httpServer) counterDataHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func (s *httpServer) counterDataHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { - s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) - http_api.Respond(w, 500, "INVALID_REQUEST", nil) - return + return nil, http_api.Err{400, "INVALID_REQUEST"} } statsID, _ := reqParams.Get("id") @@ -852,39 +836,27 @@ func (s *httpServer) counterDataHandler(w http.ResponseWriter, req *http.Request } s.counters[statsID] = newStats - data := make(map[string]interface{}) - data["new_messages"] = newMessages - data["total_messages"] = totalMessages - data["id"] = statsID - http_api.Respond(w, 200, "OK", data) + return struct { + NewMessages int64 `json:"new_messages"` + TotalMessages int64 `json:"total_messages"` + ID string `json:"id"` + }{newMessages, totalMessages, statsID}, nil } -func (s *httpServer) graphiteDataHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func (s *httpServer) graphiteDataHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { - s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) - http.Error(w, "INVALID_REQUEST", 500) - return + return nil, http_api.Err{400, "INVALID_REQUEST"} } metric, err := reqParams.Get("metric") - if err != nil { - s.ctx.nsqadmin.logf("ERROR: missing metric param - %s", err) - http.Error(w, "MISSING_METRIC_PARAM", 500) - return + if err != nil || metric != "rate" { + return nil, http_api.Err{404, "INVALID_ARG_METRIC"} } target, err := reqParams.Get("target") if err != nil { - s.ctx.nsqadmin.logf("ERROR: missing target param - %s", err) - http.Error(w, "MISSING_TARGET_PARAM", 500) - return - } - - if metric != "rate" { - s.ctx.nsqadmin.logf("ERROR: unknown metric value %s", metric) - http.Error(w, "INVALID_METRIC_PARAM", 500) - return + return nil, http_api.Err{404, "INVALID_ARG_TARGET"} } query := rateQuery(target, s.ctx.nsqadmin.opts.StatsdInterval) @@ -892,21 +864,18 @@ func (s *httpServer) graphiteDataHandler(w http.ResponseWriter, req *http.Reques s.ctx.nsqadmin.logf("GRAPHITE: %s", url) response, err := graphiteGet(url) if err != nil { - s.ctx.nsqadmin.logf("ERROR: graphite request failed %s", err) - http.Error(w, "GRAPHITE_FAILED", 500) - return + s.ctx.nsqadmin.logf("ERROR: graphite request failed - %s", err) + return nil, http_api.Err{500, "INTERNAL_ERROR"} } resp, err := parseRateResponse(response, s.ctx.nsqadmin.opts.StatsdInterval) if err != nil { - s.ctx.nsqadmin.logf("ERROR: response formating failed - %s", err) - http.Error(w, "INVALID_GRAPHITE_RESPONSE", 500) - return + s.ctx.nsqadmin.logf("ERROR: response formatting failed - %s", err) + return nil, http_api.Err{500, "INTERNAL_ERROR"} } w.Header().Set("Content-Type", "application/json") - w.Write(resp) - return + return resp, nil } func graphiteGet(url string) ([]byte, error) { diff --git a/nsqd/http.go b/nsqd/http.go index 9a92dbaa3..df2e7bdb0 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -30,8 +30,13 @@ type httpServer struct { } func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer { + log := http_api.Log(ctx.nsqd.getOpts().Logger) + router := httprouter.New() router.HandleMethodNotAllowed = true + router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.getOpts().Logger) + router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.getOpts().Logger) + router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.getOpts().Logger) s := &httpServer{ ctx: ctx, tlsEnabled: tlsEnabled, @@ -39,50 +44,51 @@ func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer router: router, } + router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText)) + // v1 negotiate - router.Handle("POST", "/pub", http_api.NegotiateVersion(s.doPUB)) - router.Handle("POST", "/mpub", http_api.NegotiateVersion(s.doMPUB)) - router.Handle("GET", "/stats", http_api.NegotiateVersion(s.doStats)) - router.Handle("GET", "/ping", s.pingHandler) + router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.NegotiateVersion)) + router.Handle("POST", "/mpub", http_api.Decorate(s.doMPUB, http_api.NegotiateVersion)) + router.Handle("GET", "/stats", http_api.Decorate(s.doStats, log, http_api.NegotiateVersion)) // only v1 - router.Handle("POST", "/topic/create", http_api.V1(s.doCreateTopic)) - router.Handle("POST", "/topic/delete", http_api.V1(s.doDeleteTopic)) - router.Handle("POST", "/topic/empty", http_api.V1(s.doEmptyTopic)) - router.Handle("POST", "/topic/pause", http_api.V1(s.doPauseTopic)) - router.Handle("POST", "/topic/unpause", http_api.V1(s.doPauseTopic)) - router.Handle("POST", "/channel/create", http_api.V1(s.doCreateChannel)) - router.Handle("POST", "/channel/delete", http_api.V1(s.doDeleteChannel)) - router.Handle("POST", "/channel/empty", http_api.V1(s.doEmptyChannel)) - router.Handle("POST", "/channel/pause", http_api.V1(s.doPauseChannel)) - router.Handle("POST", "/channel/unpause", http_api.V1(s.doPauseChannel)) - router.Handle("GET", "/config/:opt", http_api.V1(s.doConfig)) - router.Handle("PUT", "/config/:opt", http_api.V1(s.doConfig)) + router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1)) + router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1)) + router.Handle("POST", "/topic/empty", http_api.Decorate(s.doEmptyTopic, log, http_api.V1)) + router.Handle("POST", "/topic/pause", http_api.Decorate(s.doPauseTopic, log, http_api.V1)) + router.Handle("POST", "/topic/unpause", http_api.Decorate(s.doPauseTopic, log, http_api.V1)) + router.Handle("POST", "/channel/create", http_api.Decorate(s.doCreateChannel, log, http_api.V1)) + router.Handle("POST", "/channel/delete", http_api.Decorate(s.doDeleteChannel, log, http_api.V1)) + router.Handle("POST", "/channel/empty", http_api.Decorate(s.doEmptyChannel, log, http_api.V1)) + router.Handle("POST", "/channel/pause", http_api.Decorate(s.doPauseChannel, log, http_api.V1)) + router.Handle("POST", "/channel/unpause", http_api.Decorate(s.doPauseChannel, log, http_api.V1)) + router.Handle("GET", "/config/:opt", http_api.Decorate(s.doConfig, log, http_api.V1)) + router.Handle("PUT", "/config/:opt", http_api.Decorate(s.doConfig, log, http_api.V1)) // deprecated, v1 negotiate - router.Handle("POST", "/put", http_api.NegotiateVersion(s.doPUB)) - router.Handle("POST", "/mput", http_api.NegotiateVersion(s.doMPUB)) - router.Handle("GET", "/info", http_api.NegotiateVersion(s.doInfo)) - router.Handle("POST", "/create_topic", http_api.NegotiateVersion(s.doCreateTopic)) - router.Handle("POST", "/delete_topic", http_api.NegotiateVersion(s.doDeleteTopic)) - router.Handle("POST", "/empty_topic", http_api.NegotiateVersion(s.doEmptyTopic)) - router.Handle("POST", "/pause_topic", http_api.NegotiateVersion(s.doPauseTopic)) - router.Handle("POST", "/unpause_topic", http_api.NegotiateVersion(s.doPauseTopic)) - router.Handle("POST", "/create_channel", http_api.NegotiateVersion(s.doCreateChannel)) - router.Handle("POST", "/delete_channel", http_api.NegotiateVersion(s.doDeleteChannel)) - router.Handle("POST", "/empty_channel", http_api.NegotiateVersion(s.doEmptyChannel)) - router.Handle("POST", "/pause_channel", http_api.NegotiateVersion(s.doPauseChannel)) - router.Handle("POST", "/unpause_channel", http_api.NegotiateVersion(s.doPauseChannel)) - router.Handle("GET", "/create_topic", http_api.NegotiateVersion(s.doCreateTopic)) - router.Handle("GET", "/delete_topic", http_api.NegotiateVersion(s.doDeleteTopic)) - router.Handle("GET", "/empty_topic", http_api.NegotiateVersion(s.doEmptyTopic)) - router.Handle("GET", "/pause_topic", http_api.NegotiateVersion(s.doPauseTopic)) - router.Handle("GET", "/unpause_topic", http_api.NegotiateVersion(s.doPauseTopic)) - router.Handle("GET", "/create_channel", http_api.NegotiateVersion(s.doCreateChannel)) - router.Handle("GET", "/delete_channel", http_api.NegotiateVersion(s.doDeleteChannel)) - router.Handle("GET", "/empty_channel", http_api.NegotiateVersion(s.doEmptyChannel)) - router.Handle("GET", "/pause_channel", http_api.NegotiateVersion(s.doPauseChannel)) - router.Handle("GET", "/unpause_channel", http_api.NegotiateVersion(s.doPauseChannel)) + router.Handle("POST", "/put", http_api.Decorate(s.doPUB, http_api.NegotiateVersion)) + router.Handle("POST", "/mput", http_api.Decorate(s.doMPUB, http_api.NegotiateVersion)) + router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.NegotiateVersion)) + router.Handle("POST", "/create_topic", http_api.Decorate(s.doCreateTopic, log, http_api.NegotiateVersion)) + router.Handle("POST", "/delete_topic", http_api.Decorate(s.doDeleteTopic, log, http_api.NegotiateVersion)) + router.Handle("POST", "/empty_topic", http_api.Decorate(s.doEmptyTopic, log, http_api.NegotiateVersion)) + router.Handle("POST", "/pause_topic", http_api.Decorate(s.doPauseTopic, log, http_api.NegotiateVersion)) + router.Handle("POST", "/unpause_topic", http_api.Decorate(s.doPauseTopic, log, http_api.NegotiateVersion)) + router.Handle("POST", "/create_channel", http_api.Decorate(s.doCreateChannel, log, http_api.NegotiateVersion)) + router.Handle("POST", "/delete_channel", http_api.Decorate(s.doDeleteChannel, log, http_api.NegotiateVersion)) + router.Handle("POST", "/empty_channel", http_api.Decorate(s.doEmptyChannel, log, http_api.NegotiateVersion)) + router.Handle("POST", "/pause_channel", http_api.Decorate(s.doPauseChannel, log, http_api.NegotiateVersion)) + router.Handle("POST", "/unpause_channel", http_api.Decorate(s.doPauseChannel, log, http_api.NegotiateVersion)) + router.Handle("GET", "/create_topic", http_api.Decorate(s.doCreateTopic, log, http_api.NegotiateVersion)) + router.Handle("GET", "/delete_topic", http_api.Decorate(s.doDeleteTopic, log, http_api.NegotiateVersion)) + router.Handle("GET", "/empty_topic", http_api.Decorate(s.doEmptyTopic, log, http_api.NegotiateVersion)) + router.Handle("GET", "/pause_topic", http_api.Decorate(s.doPauseTopic, log, http_api.NegotiateVersion)) + router.Handle("GET", "/unpause_topic", http_api.Decorate(s.doPauseTopic, log, http_api.NegotiateVersion)) + router.Handle("GET", "/create_channel", http_api.Decorate(s.doCreateChannel, log, http_api.NegotiateVersion)) + router.Handle("GET", "/delete_channel", http_api.Decorate(s.doDeleteChannel, log, http_api.NegotiateVersion)) + router.Handle("GET", "/empty_channel", http_api.Decorate(s.doEmptyChannel, log, http_api.NegotiateVersion)) + router.Handle("GET", "/pause_channel", http_api.Decorate(s.doPauseChannel, log, http_api.NegotiateVersion)) + router.Handle("GET", "/unpause_channel", http_api.Decorate(s.doPauseChannel, log, http_api.NegotiateVersion)) // debug router.HandlerFunc("GET", "/debug/pprof", pprof.Index) @@ -105,15 +111,12 @@ func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { s.router.ServeHTTP(w, req) } -func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { health := s.ctx.nsqd.GetHealth() - code := 200 if !s.ctx.nsqd.IsHealthy() { - code = 500 + return nil, http_api.Err{500, health} } - w.Header().Set("Content-Length", strconv.Itoa(len(health))) - w.WriteHeader(code) - io.WriteString(w, health) + return health, nil } func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { diff --git a/nsqd/http_test.go b/nsqd/http_test.go index 67d125e3a..7bbd83eee 100644 --- a/nsqd/http_test.go +++ b/nsqd/http_test.go @@ -760,3 +760,27 @@ func TestHTTPconfig(t *testing.T) { equal(t, resp.StatusCode, 200) equal(t, string(body), addrs) } + +func TestHTTPerrors(t *testing.T) { + opts := NewOptions() + opts.Logger = newTestLogger(t) + _, httpAddr, nsqd := mustStartNSQD(opts) + defer os.RemoveAll(opts.DataPath) + defer nsqd.Exit() + + url := fmt.Sprintf("http://%s/stats", httpAddr) + resp, err := http.Post(url, "text/plain", nil) + equal(t, err, nil) + defer resp.Body.Close() + body, _ := ioutil.ReadAll(resp.Body) + equal(t, resp.StatusCode, 405) + equal(t, string(body), `{"message":"METHOD_NOT_ALLOWED"}`) + + url = fmt.Sprintf("http://%s/not_found", httpAddr) + resp, err = http.Get(url) + equal(t, err, nil) + defer resp.Body.Close() + body, _ = ioutil.ReadAll(resp.Body) + equal(t, resp.StatusCode, 404) + equal(t, string(body), `{"message":"NOT_FOUND"}`) +} diff --git a/nsqlookupd/http.go b/nsqlookupd/http.go index f3f5e9d2b..5e73117cf 100644 --- a/nsqlookupd/http.go +++ b/nsqlookupd/http.go @@ -2,7 +2,6 @@ package nsqlookupd import ( "fmt" - "io" "net/http" "net/http/pprof" "sync/atomic" @@ -19,40 +18,46 @@ type httpServer struct { } func newHTTPServer(ctx *Context) *httpServer { + log := http_api.Log(ctx.nsqlookupd.opts.Logger) + router := httprouter.New() router.HandleMethodNotAllowed = true + router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.opts.Logger) + router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.opts.Logger) + router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlookupd.opts.Logger) s := &httpServer{ ctx: ctx, router: router, } + router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText)) + // v1 negotiate - router.Handle("GET", "/ping", s.pingHandler) - router.Handle("GET", "/debug", http_api.NegotiateVersion(s.doDebug)) - router.Handle("GET", "/lookup", http_api.NegotiateVersion(s.doLookup)) - router.Handle("GET", "/topics", http_api.NegotiateVersion(s.doTopics)) - router.Handle("GET", "/channels", http_api.NegotiateVersion(s.doChannels)) - router.Handle("GET", "/nodes", http_api.NegotiateVersion(s.doNodes)) + router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api.NegotiateVersion)) + router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.NegotiateVersion)) + router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.NegotiateVersion)) + router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, http_api.NegotiateVersion)) + router.Handle("GET", "/nodes", http_api.Decorate(s.doNodes, log, http_api.NegotiateVersion)) // only v1 - router.Handle("POST", "/topic/create", http_api.V1(s.doCreateTopic)) - router.Handle("POST", "/topic/delete", http_api.V1(s.doDeleteTopic)) - router.Handle("POST", "/channel/create", http_api.V1(s.doCreateChannel)) - router.Handle("POST", "/channel/delete", http_api.V1(s.doDeleteChannel)) - router.Handle("POST", "/topic/tombstone", http_api.V1(s.doTombstoneTopicProducer)) + router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1)) + router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1)) + router.Handle("POST", "/channel/create", http_api.Decorate(s.doCreateChannel, log, http_api.V1)) + router.Handle("POST", "/channel/delete", http_api.Decorate(s.doDeleteChannel, log, http_api.V1)) + router.Handle("POST", "/topic/tombstone", http_api.Decorate(s.doTombstoneTopicProducer, log, http_api.V1)) // deprecated, v1 negotiate - router.Handle("GET", "/info", http_api.NegotiateVersion(s.doInfo)) - router.Handle("POST", "/create_topic", http_api.NegotiateVersion(s.doCreateTopic)) - router.Handle("POST", "/delete_topic", http_api.NegotiateVersion(s.doDeleteTopic)) - router.Handle("POST", "/create_channel", http_api.NegotiateVersion(s.doCreateChannel)) - router.Handle("POST", "/delete_channel", http_api.NegotiateVersion(s.doDeleteChannel)) - router.Handle("POST", "/tombstone_topic_producer", http_api.NegotiateVersion(s.doTombstoneTopicProducer)) - router.Handle("GET", "/create_topic", http_api.NegotiateVersion(s.doCreateTopic)) - router.Handle("GET", "/delete_topic", http_api.NegotiateVersion(s.doDeleteTopic)) - router.Handle("GET", "/create_channel", http_api.NegotiateVersion(s.doCreateChannel)) - router.Handle("GET", "/delete_channel", http_api.NegotiateVersion(s.doDeleteChannel)) - router.Handle("GET", "/tombstone_topic_producer", http_api.NegotiateVersion(s.doTombstoneTopicProducer)) + router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.NegotiateVersion)) + router.Handle("POST", "/create_topic", http_api.Decorate(s.doCreateTopic, log, http_api.NegotiateVersion)) + router.Handle("POST", "/delete_topic", http_api.Decorate(s.doDeleteTopic, log, http_api.NegotiateVersion)) + router.Handle("POST", "/create_channel", http_api.Decorate(s.doCreateChannel, log, http_api.NegotiateVersion)) + router.Handle("POST", "/delete_channel", http_api.Decorate(s.doDeleteChannel, log, http_api.NegotiateVersion)) + router.Handle("POST", "/tombstone_topic_producer", http_api.Decorate(s.doTombstoneTopicProducer, log, http_api.NegotiateVersion)) + router.Handle("GET", "/create_topic", http_api.Decorate(s.doCreateTopic, log, http_api.NegotiateVersion)) + router.Handle("GET", "/delete_topic", http_api.Decorate(s.doDeleteTopic, log, http_api.NegotiateVersion)) + router.Handle("GET", "/create_channel", http_api.Decorate(s.doCreateChannel, log, http_api.NegotiateVersion)) + router.Handle("GET", "/delete_channel", http_api.Decorate(s.doDeleteChannel, log, http_api.NegotiateVersion)) + router.Handle("GET", "/tombstone_topic_producer", http_api.Decorate(s.doTombstoneTopicProducer, log, http_api.NegotiateVersion)) // debug router.HandlerFunc("GET", "/debug/pprof", pprof.Index) @@ -71,9 +76,8 @@ func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { s.router.ServeHTTP(w, req) } -func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - w.Header().Set("Content-Length", "2") - io.WriteString(w, "OK") +func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { + return "OK", nil } func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {