diff --git a/.travis.yml b/.travis.yml index 09713b7dc..51dd86d7a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,10 +1,10 @@ language: go go: - - 1.3.3 - - 1.4.1 + - 1.4.2 env: - GOARCH=amd64 - GOARCH=386 +sudo: false script: - curl -s https://raw.githubusercontent.com/pote/gpm/v1.2.3/bin/gpm > gpm - chmod +x gpm diff --git a/Godeps b/Godeps index 9064cb966..f1e24771c 100644 --- a/Godeps +++ b/Godeps @@ -7,3 +7,4 @@ github.com/mreiferson/go-options 2cf7eb1fdd83e2bb3375fef6fdadb04c3ad564da github.com/mreiferson/go-snappystream 028eae7ab5c4c9e2d1cb4c4ca1e53259bbe7e504 # v0.2.3 github.com/bitly/timer_metrics afad1794bb13e2a094720aeb27c088aa64564895 github.com/blang/semver 9bf7bff48b0388cb75991e58c6df7d13e982f1f2 +github.com/julienschmidt/httprouter 6aacfd5ab513e34f7e64ea9627ab9670371b34e7 diff --git a/apps/nsqd/nsqd.go b/apps/nsqd/nsqd.go index 70c08b20b..f8b23c9c7 100644 --- a/apps/nsqd/nsqd.go +++ b/apps/nsqd/nsqd.go @@ -199,9 +199,9 @@ func main() { } cfg.Validate() - opts := nsqd.NewNSQDOptions() + opts := nsqd.NewOptions() options.Resolve(opts, flagSet, cfg) - nsqd := nsqd.NewNSQD(opts) + nsqd := nsqd.New(opts) nsqd.LoadMetadata() err := nsqd.PersistMetadata() diff --git a/apps/nsqd/nsqd_test.go b/apps/nsqd/nsqd_test.go index f561dc6d9..96a44f736 100644 --- a/apps/nsqd/nsqd_test.go +++ b/apps/nsqd/nsqd_test.go @@ -22,9 +22,9 @@ func TestConfigFlagParsing(t *testing.T) { toml.DecodeReader(f, &cfg) cfg.Validate() - opts := nsqd.NewNSQDOptions() + opts := nsqd.NewOptions() options.Resolve(opts, flagSet, cfg) - nsqd.NewNSQD(opts) + nsqd.New(opts) if opts.TLSMinVersion != tls.VersionTLS10 { t.Errorf("min %#v not expected %#v", opts.TLSMinVersion, tls.VersionTLS10) diff --git a/apps/nsqlookupd/nsqlookupd.go b/apps/nsqlookupd/nsqlookupd.go index a97e10d31..54a4d5d90 100644 --- a/apps/nsqlookupd/nsqlookupd.go +++ b/apps/nsqlookupd/nsqlookupd.go @@ -49,9 +49,9 @@ func main() { } } - opts := nsqlookupd.NewNSQLookupdOptions() + opts := nsqlookupd.NewOptions() options.Resolve(opts, flagSet, cfg) - daemon := nsqlookupd.NewNSQLookupd(opts) + daemon := nsqlookupd.New(opts) daemon.Main() <-signalChan diff --git a/internal/http_api/api_response.go b/internal/http_api/api_response.go index 73af8b859..1eb6b04bc 100644 --- a/internal/http_api/api_response.go +++ b/internal/http_api/api_response.go @@ -5,8 +5,12 @@ import ( "fmt" "net/http" "strconv" + + "github.com/julienschmidt/httprouter" ) +type APIHandler func(http.ResponseWriter, *http.Request, httprouter.Params) (interface{}, error) + type Err struct { Code int Text string @@ -24,40 +28,35 @@ func acceptVersion(req *http.Request) int { return 0 } -func RequirePOST(req *http.Request, f func() (interface{}, error)) func() (interface{}, error) { - if req.Method != "POST" { - return func() (interface{}, error) { - return nil, Err{405, "INVALID_REQUEST"} +func NegotiateVersion(f APIHandler) httprouter.Handle { + return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + data, err := f(w, req, ps) + if err != nil { + if acceptVersion(req) == 1 { + RespondV1(w, err.(Err).Code, err) + } else { + // this handler always returns 500 for backwards compatibility + Respond(w, 500, err.Error(), nil) + } + return } - } - return f -} - -func NegotiateVersionWrapper(w http.ResponseWriter, req *http.Request, f func() (interface{}, error)) { - data, err := f() - if err != nil { if acceptVersion(req) == 1 { - RespondV1(w, err.(Err).Code, err) + RespondV1(w, 200, data) } else { - // this handler always returns 500 for backwards compatibility - Respond(w, 500, err.Error(), nil) + Respond(w, 200, "OK", data) } - return - } - if acceptVersion(req) == 1 { - RespondV1(w, 200, data) - } else { - Respond(w, 200, "OK", data) } } -func V1Wrapper(w http.ResponseWriter, req *http.Request, f func() (interface{}, error)) { - data, err := f() - if err != nil { - RespondV1(w, err.(Err).Code, err) - return +func V1(f APIHandler) httprouter.Handle { + return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + data, err := f(w, req, ps) + if err != nil { + RespondV1(w, err.(Err).Code, err) + return + } + RespondV1(w, 200, data) } - RespondV1(w, 200, data) } func Respond(w http.ResponseWriter, statusCode int, statusTxt string, data interface{}) { diff --git a/internal/http_api/http_server.go b/internal/http_api/http_server.go index 3cbac8852..726f27c23 100644 --- a/internal/http_api/http_server.go +++ b/internal/http_api/http_server.go @@ -9,7 +9,7 @@ import ( "github.com/bitly/nsq/internal/app" ) -func Serve(listener net.Listener, handler http.Handler, l app.Logger, proto string) { +func Serve(listener net.Listener, handler http.Handler, proto string, l app.Logger) { l.Output(2, fmt.Sprintf("%s: listening on %s", proto, listener.Addr())) server := &http.Server{ diff --git a/nsqadmin/http.go b/nsqadmin/http.go index d59c1d7da..e8fc378c3 100644 --- a/nsqadmin/http.go +++ b/nsqadmin/http.go @@ -9,7 +9,6 @@ import ( "net/http" "net/http/httputil" "net/url" - "regexp" "strconv" "strings" "time" @@ -21,6 +20,7 @@ import ( "github.com/bitly/nsq/internal/version" "github.com/bitly/nsq/nsqadmin/templates" "github.com/blang/semver" + "github.com/julienschmidt/httprouter" ) var v1EndpointVersion semver.Version @@ -49,6 +49,7 @@ type httpServer struct { ctx *Context counters map[string]map[string]int64 proxy *httputil.ReverseProxy + router http.Handler } func NewHTTPServer(ctx *Context) *httpServer { @@ -73,92 +74,57 @@ func NewHTTPServer(ctx *Context) *httpServer { proxy = NewSingleHostReverseProxy(ctx.nsqadmin.graphiteURL, 20*time.Second) } - return &httpServer{ + router := httprouter.New() + router.HandleMethodNotAllowed = true + s := &httpServer{ ctx: ctx, counters: make(map[string]map[string]int64), proxy: proxy, - } + router: router, + } + + router.Handle("GET", "/ping", s.pingHandler) + + router.Handle("GET", "/", s.indexHandler) + router.Handle("GET", "/nodes", s.nodesHandler) + router.Handle("GET", "/node/:node", s.nodeHandler) + router.Handle("GET", "/topic/:topic", s.topicHandler) + router.Handle("GET", "/topic/:topic/:channel", s.channelHandler) + router.Handle("GET", "/static/:asset", s.embeddedAssetHandler) + router.Handle("GET", "/counter", s.counterHandler) + router.Handle("GET", "/counter/data", s.counterDataHandler) + router.Handle("GET", "/lookup", s.lookupHandler) + router.Handle("GET", "/graphite_data", s.graphiteDataHandler) + + router.Handle("POST", "/tombstone_topic_producer", s.tombstoneTopicProducerHandler) + router.Handle("POST", "/empty_topic", s.emptyTopicHandler) + router.Handle("POST", "/delete_topic", s.deleteTopicHandler) + router.Handle("POST", "/pause_topic", s.pauseTopicHandler) + router.Handle("POST", "/unpause_topic", s.pauseTopicHandler) + router.Handle("POST", "/empty_channel", s.emptyChannelHandler) + router.Handle("POST", "/delete_channel", s.deleteChannelHandler) + router.Handle("POST", "/pause_channel", s.pauseChannelHandler) + router.Handle("POST", "/unpause_channel", s.pauseChannelHandler) + router.Handle("POST", "/create_topic_channel", s.createTopicChannelHandler) + + if s.ctx.nsqadmin.opts.ProxyGraphite { + router.Handler("GET", "/render", s.proxy) + } + + return s } func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { - - if strings.HasPrefix(req.URL.Path, "/node/") { - s.nodeHandler(w, req) - return - } else if strings.HasPrefix(req.URL.Path, "/topic/") { - s.topicHandler(w, req) - return - } else if strings.HasPrefix(req.URL.Path, "/static/") { - if req.Method != "GET" { - s.ctx.nsqadmin.logf("ERROR: invalid %s to GET only method", req.Method) - http.Error(w, "INVALID_REQUEST", 500) - } else { - s.embeddedAssetHandler(w, req) - } - return - } - - switch req.URL.Path { - case "/": - s.indexHandler(w, req) - case "/ping": - s.pingHandler(w, req) - case "/nodes": - s.nodesHandler(w, req) - case "/tombstone_topic_producer": - s.tombstoneTopicProducerHandler(w, req) - case "/empty_topic": - s.emptyTopicHandler(w, req) - case "/delete_topic": - s.deleteTopicHandler(w, req) - case "/pause_topic": - s.pauseTopicHandler(w, req) - case "/unpause_topic": - s.pauseTopicHandler(w, req) - case "/delete_channel": - s.deleteChannelHandler(w, req) - case "/empty_channel": - s.emptyChannelHandler(w, req) - case "/pause_channel": - s.pauseChannelHandler(w, req) - case "/unpause_channel": - s.pauseChannelHandler(w, req) - case "/counter/data": - s.counterDataHandler(w, req) - case "/counter": - s.counterHandler(w, req) - case "/lookup": - s.lookupHandler(w, req) - case "/create_topic_channel": - s.createTopicChannelHandler(w, req) - case "/graphite_data": - s.graphiteDataHandler(w, req) - case "/render": - if !s.ctx.nsqadmin.opts.ProxyGraphite { - http.NotFound(w, req) - return - } - s.proxy.ServeHTTP(w, req) - default: - s.ctx.nsqadmin.logf("ERROR: 404 %s", req.URL.Path) - http.NotFound(w, req) - } + s.router.ServeHTTP(w, req) } -func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request) { +func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { w.Header().Set("Content-Length", "2") io.WriteString(w, "OK") } -func (s *httpServer) embeddedAssetHandler(w http.ResponseWriter, req *http.Request) { - var urlRegex = regexp.MustCompile(`^/static/(.+)$`) - matches := urlRegex.FindStringSubmatch(req.URL.Path) - if len(matches) == 0 { - s.ctx.nsqadmin.logf("ERROR: No embedded asset name for url - %s", req.URL.Path) - http.NotFound(w, req) - return - } - assetName := matches[1] +func (s *httpServer) embeddedAssetHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + assetName := ps.ByName("asset") s.ctx.nsqadmin.logf("INFO: Requesting embedded asset - %s", assetName) asset, error := templates.Asset(assetName) @@ -178,7 +144,7 @@ func (s *httpServer) embeddedAssetHandler(w http.ResponseWriter, req *http.Reque w.Write(asset) } -func (s *httpServer) indexHandler(w http.ResponseWriter, req *http.Request) { +func (s *httpServer) indexHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) @@ -211,28 +177,8 @@ func (s *httpServer) indexHandler(w http.ResponseWriter, req *http.Request) { } } -func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request) { - var urlRegex = regexp.MustCompile(`^/topic/(.*)$`) - matches := urlRegex.FindStringSubmatch(req.URL.Path) - if len(matches) == 0 { - http.Error(w, "INVALID_TOPIC", 500) - return - } - parts := strings.Split(matches[1], "/") - topicName := parts[0] - if !protocol.IsValidTopicName(topicName) { - http.Error(w, "INVALID_TOPIC", 500) - return - } - if len(parts) == 2 { - channelName := parts[1] - if !protocol.IsValidChannelName(channelName) { - http.Error(w, "INVALID_CHANNEL", 500) - } else { - s.channelHandler(w, req, topicName, channelName) - } - return - } +func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + topicName := ps.ByName("topic") reqParams, err := http_api.NewReqParams(req) if err != nil { @@ -287,7 +233,10 @@ func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request) { } } -func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, topicName string, channelName string) { +func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + topicName := ps.ByName("topic") + channelName := ps.ByName("channel") + reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) @@ -342,7 +291,7 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, to } } -func (s *httpServer) lookupHandler(w http.ResponseWriter, req *http.Request) { +func (s *httpServer) lookupHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) @@ -381,12 +330,7 @@ func (s *httpServer) lookupHandler(w http.ResponseWriter, req *http.Request) { } } -func (s *httpServer) createTopicChannelHandler(w http.ResponseWriter, req *http.Request) { - if req.Method != "POST" { - s.ctx.nsqadmin.logf("ERROR: invalid %s to POST only method", req.Method) - http.Error(w, "INVALID_REQUEST", 500) - return - } +func (s *httpServer) createTopicChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams := &http_api.PostParams{req} topicName, err := reqParams.Get("topic") @@ -460,12 +404,7 @@ func (s *httpServer) createTopicChannelHandler(w http.ResponseWriter, req *http. http.Redirect(w, req, "/lookup", 302) } -func (s *httpServer) tombstoneTopicProducerHandler(w http.ResponseWriter, req *http.Request) { - if req.Method != "POST" { - s.ctx.nsqadmin.logf("ERROR: invalid %s to POST only method", req.Method) - http.Error(w, "INVALID_REQUEST", 500) - return - } +func (s *httpServer) tombstoneTopicProducerHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams := &http_api.PostParams{req} topicName, err := reqParams.Get("topic") @@ -531,12 +470,7 @@ func (s *httpServer) tombstoneTopicProducerHandler(w http.ResponseWriter, req *h http.Redirect(w, req, rd, 302) } -func (s *httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request) { - if req.Method != "POST" { - s.ctx.nsqadmin.logf("ERROR: invalid %s to POST only method", req.Method) - http.Error(w, "INVALID_REQUEST", 500) - return - } +func (s *httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams := &http_api.PostParams{req} topicName, err := reqParams.Get("topic") @@ -586,12 +520,7 @@ func (s *httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request http.Redirect(w, req, rd, 302) } -func (s *httpServer) deleteChannelHandler(w http.ResponseWriter, req *http.Request) { - if req.Method != "POST" { - s.ctx.nsqadmin.logf("ERROR: invalid %s to POST only method", req.Method) - http.Error(w, "INVALID_REQUEST", 500) - return - } +func (s *httpServer) deleteChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams := &http_api.PostParams{req} topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams) @@ -642,12 +571,7 @@ func (s *httpServer) deleteChannelHandler(w http.ResponseWriter, req *http.Reque http.Redirect(w, req, rd, 302) } -func (s *httpServer) emptyTopicHandler(w http.ResponseWriter, req *http.Request) { - if req.Method != "POST" { - s.ctx.nsqadmin.logf("ERROR: invalid %s to POST only method", req.Method) - http.Error(w, "INVALID_REQUEST", 500) - return - } +func (s *httpServer) emptyTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams := &http_api.PostParams{req} topicName, err := reqParams.Get("topic") @@ -670,12 +594,7 @@ func (s *httpServer) emptyTopicHandler(w http.ResponseWriter, req *http.Request) http.Redirect(w, req, fmt.Sprintf("/topic/%s", url.QueryEscape(topicName)), 302) } -func (s *httpServer) pauseTopicHandler(w http.ResponseWriter, req *http.Request) { - if req.Method != "POST" { - s.ctx.nsqadmin.logf("ERROR: invalid %s to POST only method", req.Method) - http.Error(w, "INVALID_REQUEST", 500) - return - } +func (s *httpServer) pauseTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams := &http_api.PostParams{req} topicName, err := reqParams.Get("topic") @@ -703,12 +622,7 @@ func (s *httpServer) pauseTopicHandler(w http.ResponseWriter, req *http.Request) http.Redirect(w, req, fmt.Sprintf("/topic/%s", url.QueryEscape(topicName)), 302) } -func (s *httpServer) emptyChannelHandler(w http.ResponseWriter, req *http.Request) { - if req.Method != "POST" { - s.ctx.nsqadmin.logf("ERROR: invalid %s to POST only method", req.Method) - http.Error(w, "INVALID_REQUEST", 500) - return - } +func (s *httpServer) emptyChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams := &http_api.PostParams{req} topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams) @@ -732,12 +646,7 @@ func (s *httpServer) emptyChannelHandler(w http.ResponseWriter, req *http.Reques url.QueryEscape(topicName), url.QueryEscape(channelName)), 302) } -func (s *httpServer) pauseChannelHandler(w http.ResponseWriter, req *http.Request) { - if req.Method != "POST" { - s.ctx.nsqadmin.logf("ERROR: invalid %s to POST only method", req.Method) - http.Error(w, "INVALID_REQUEST", 500) - return - } +func (s *httpServer) pauseChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams := &http_api.PostParams{req} topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams) @@ -765,7 +674,9 @@ func (s *httpServer) pauseChannelHandler(w http.ResponseWriter, req *http.Reques http.Redirect(w, req, fmt.Sprintf("/topic/%s/%s", url.QueryEscape(topicName), url.QueryEscape(channelName)), 302) } -func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request) { +func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + node := ps.ByName("node") + reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) @@ -773,15 +684,6 @@ func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request) { return } - var urlRegex = regexp.MustCompile(`^/node/(.*)$`) - matches := urlRegex.FindStringSubmatch(req.URL.Path) - if len(matches) == 0 { - http.Error(w, "INVALID_NODE", 500) - return - } - parts := strings.Split(matches[1], "/") - node := parts[0] - found := false for _, n := range s.ctx.nsqadmin.opts.NSQDHTTPAddresses { if node == n { @@ -838,7 +740,7 @@ func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request) { } } -func (s *httpServer) nodesHandler(w http.ResponseWriter, req *http.Request) { +func (s *httpServer) nodesHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) @@ -877,7 +779,7 @@ func (c counterTarget) Host() string { return "*" } -func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request) { +func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) @@ -906,7 +808,7 @@ func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request) { // The initial request is the number of messages processed since each nsqd started up. // Subsequent requsts pass that ID and get an updated delta based on each individual channel/nsqd message count // That ID must be re-requested or it will be expired. -func (s *httpServer) counterDataHandler(w http.ResponseWriter, req *http.Request) { +func (s *httpServer) counterDataHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) @@ -957,7 +859,7 @@ func (s *httpServer) counterDataHandler(w http.ResponseWriter, req *http.Request http_api.Respond(w, 200, "OK", data) } -func (s *httpServer) graphiteDataHandler(w http.ResponseWriter, req *http.Request) { +func (s *httpServer) graphiteDataHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err) diff --git a/nsqadmin/nsqadmin.go b/nsqadmin/nsqadmin.go index 6cec943fa..c06e2a366 100644 --- a/nsqadmin/nsqadmin.go +++ b/nsqadmin/nsqadmin.go @@ -115,7 +115,7 @@ func (n *NSQAdmin) Main() { n.Unlock() httpServer := NewHTTPServer(&Context{n}) n.waitGroup.Wrap(func() { - http_api.Serve(n.httpListener, httpServer, n.opts.Logger, "HTTP") + http_api.Serve(n.httpListener, httpServer, "HTTP", n.opts.Logger) }) n.waitGroup.Wrap(func() { n.handleAdminActions() }) } diff --git a/nsqd/buffer_pool_go13.go b/nsqd/buffer_pool.go similarity index 93% rename from nsqd/buffer_pool_go13.go rename to nsqd/buffer_pool.go index d2c4f6cc0..d47d06bf0 100644 --- a/nsqd/buffer_pool_go13.go +++ b/nsqd/buffer_pool.go @@ -1,5 +1,3 @@ -// +build go1.3 - package nsqd import ( diff --git a/nsqd/buffer_pool_go10.go b/nsqd/buffer_pool_go10.go deleted file mode 100644 index 2b50ee879..000000000 --- a/nsqd/buffer_pool_go10.go +++ /dev/null @@ -1,13 +0,0 @@ -// +build !go1.3 - -package nsqd - -import ( - "bytes" -) - -func bufferPoolGet() *bytes.Buffer { - return &bytes.Buffer{} -} - -func bufferPoolPut(b *bytes.Buffer) {} diff --git a/nsqd/channel.go b/nsqd/channel.go index d6662685a..c17f8bbdb 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -80,17 +80,17 @@ func NewChannel(topicName string, channelName string, ctx *context, c := &Channel{ topicName: topicName, name: channelName, - memoryMsgChan: make(chan *Message, ctx.nsqd.opts.MemQueueSize), + memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize), clientMsgChan: make(chan *Message), exitChan: make(chan int), clients: make(map[int64]Consumer), deleteCallback: deleteCallback, ctx: ctx, } - if len(ctx.nsqd.opts.E2EProcessingLatencyPercentiles) > 0 { + if len(ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 { c.e2eProcessingLatencyStream = quantile.New( - ctx.nsqd.opts.E2EProcessingLatencyWindowTime, - ctx.nsqd.opts.E2EProcessingLatencyPercentiles, + ctx.nsqd.getOpts().E2EProcessingLatencyWindowTime, + ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles, ) } @@ -103,13 +103,13 @@ func NewChannel(topicName string, channelName string, ctx *context, // backend names, for uniqueness, automatically include the topic... backendName := getBackendName(topicName, channelName) c.backend = newDiskQueue(backendName, - ctx.nsqd.opts.DataPath, - ctx.nsqd.opts.MaxBytesPerFile, + ctx.nsqd.getOpts().DataPath, + ctx.nsqd.getOpts().MaxBytesPerFile, int32(minValidMsgLength), - int32(ctx.nsqd.opts.MaxMsgSize), - ctx.nsqd.opts.SyncEvery, - ctx.nsqd.opts.SyncTimeout, - ctx.nsqd.opts.Logger) + int32(ctx.nsqd.getOpts().MaxMsgSize), + ctx.nsqd.getOpts().SyncEvery, + ctx.nsqd.getOpts().SyncTimeout, + ctx.nsqd.getOpts().Logger) } go c.messagePump() @@ -120,7 +120,7 @@ func NewChannel(topicName string, channelName string, ctx *context, } func (c *Channel) initPQ() { - pqSize := int(math.Max(1, float64(c.ctx.nsqd.opts.MemQueueSize)/10)) + pqSize := int(math.Max(1, float64(c.ctx.nsqd.getOpts().MemQueueSize)/10)) c.inFlightMessages = make(map[MessageID]*Message) c.deferredMessages = make(map[MessageID]*pqueue.Item) @@ -340,9 +340,9 @@ func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout ti newTimeout := time.Now().Add(clientMsgTimeout) if newTimeout.Sub(msg.deliveryTS) >= - c.ctx.nsqd.opts.MaxMsgTimeout { + c.ctx.nsqd.getOpts().MaxMsgTimeout { // we would have gone over, set to the max - newTimeout = msg.deliveryTS.Add(c.ctx.nsqd.opts.MaxMsgTimeout) + newTimeout = msg.deliveryTS.Add(c.ctx.nsqd.getOpts().MaxMsgTimeout) } msg.pri = newTimeout.UnixNano() diff --git a/nsqd/channel_test.go b/nsqd/channel_test.go index 4aece7a4d..14fbcd6b1 100644 --- a/nsqd/channel_test.go +++ b/nsqd/channel_test.go @@ -9,7 +9,7 @@ import ( // ensure that we can push a message through a topic and get it out of a channel func TestPutMessage(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) _, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -30,7 +30,7 @@ func TestPutMessage(t *testing.T) { // ensure that both channels get the same message func TestPutMessage2Chan(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) _, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -57,7 +57,7 @@ func TestPutMessage2Chan(t *testing.T) { func TestInFlightWorker(t *testing.T) { count := 250 - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.MsgTimeout = 100 * time.Millisecond opts.QueueScanRefreshInterval = 100 * time.Millisecond @@ -100,7 +100,7 @@ func TestInFlightWorker(t *testing.T) { } func TestChannelEmpty(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) _, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -133,7 +133,7 @@ func TestChannelEmpty(t *testing.T) { } func TestChannelEmptyConsumer(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) diff --git a/nsqd/client_tls_go10.go b/nsqd/client_tls_go10.go deleted file mode 100644 index 3a855ec61..000000000 --- a/nsqd/client_tls_go10.go +++ /dev/null @@ -1,7 +0,0 @@ -// +build !go1.3 - -package nsqd - -func (p *prettyConnectionState) GetVersion() string { - return "TLS1.0" -} diff --git a/nsqd/client_tls_go13.go b/nsqd/client_tls_go13.go deleted file mode 100644 index fa6823e65..000000000 --- a/nsqd/client_tls_go13.go +++ /dev/null @@ -1,23 +0,0 @@ -// +build go1.3 - -package nsqd - -import ( - "crypto/tls" - "fmt" -) - -func (p *prettyConnectionState) GetVersion() string { - switch p.Version { - case tls.VersionSSL30: - return "SSL30" - case tls.VersionTLS10: - return "TLS1.0" - case tls.VersionTLS11: - return "TLS1.1" - case tls.VersionTLS12: - return "TLS1.2" - default: - return fmt.Sprintf("Unknown %d", p.Version) - } -} diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index 27892bd05..e53f17a73 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -126,7 +126,7 @@ func newClientV2(id int64, conn net.Conn, ctx *context) *clientV2 { OutputBufferSize: defaultBufferSize, OutputBufferTimeout: 250 * time.Millisecond, - MsgTimeout: ctx.nsqd.opts.MsgTimeout, + MsgTimeout: ctx.nsqd.getOpts().MsgTimeout, // ReadyStateChan has a buffer of 1 to guarantee that in the event // there is a race the state update is not lost @@ -142,7 +142,7 @@ func newClientV2(id int64, conn net.Conn, ctx *context) *clientV2 { IdentifyEventChan: make(chan identifyEvent, 1), // heartbeats are client configurable but default to 30s - HeartbeatInterval: ctx.nsqd.opts.ClientTimeout / 2, + HeartbeatInterval: ctx.nsqd.getOpts().ClientTimeout / 2, } c.lenSlice = c.lenBuf[:] return c @@ -267,54 +267,51 @@ type prettyConnectionState struct { tls.ConnectionState } -// taken from http://golang.org/src/pkg/crypto/tls/cipher_suites.go -// to be compatible with older versions -const ( - local_TLS_RSA_WITH_RC4_128_SHA uint16 = 0x0005 - local_TLS_RSA_WITH_3DES_EDE_CBC_SHA uint16 = 0x000a - local_TLS_RSA_WITH_AES_128_CBC_SHA uint16 = 0x002f - local_TLS_RSA_WITH_AES_256_CBC_SHA uint16 = 0x0035 - local_TLS_ECDHE_ECDSA_WITH_RC4_128_SHA uint16 = 0xc007 - local_TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA uint16 = 0xc009 - local_TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA uint16 = 0xc00a - local_TLS_ECDHE_RSA_WITH_RC4_128_SHA uint16 = 0xc011 - local_TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA uint16 = 0xc012 - local_TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA uint16 = 0xc013 - local_TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA uint16 = 0xc014 - local_TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 uint16 = 0xc02f - local_TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256 uint16 = 0xc02b -) - func (p *prettyConnectionState) GetCipherSuite() string { switch p.CipherSuite { - case local_TLS_RSA_WITH_RC4_128_SHA: + case tls.TLS_RSA_WITH_RC4_128_SHA: return "TLS_RSA_WITH_RC4_128_SHA" - case local_TLS_RSA_WITH_3DES_EDE_CBC_SHA: + case tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA: return "TLS_RSA_WITH_3DES_EDE_CBC_SHA" - case local_TLS_RSA_WITH_AES_128_CBC_SHA: + case tls.TLS_RSA_WITH_AES_128_CBC_SHA: return "TLS_RSA_WITH_AES_128_CBC_SHA" - case local_TLS_RSA_WITH_AES_256_CBC_SHA: + case tls.TLS_RSA_WITH_AES_256_CBC_SHA: return "TLS_RSA_WITH_AES_256_CBC_SHA" - case local_TLS_ECDHE_ECDSA_WITH_RC4_128_SHA: + case tls.TLS_ECDHE_ECDSA_WITH_RC4_128_SHA: return "TLS_ECDHE_ECDSA_WITH_RC4_128_SHA" - case local_TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA: + case tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA: return "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA" - case local_TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA: + case tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA: return "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA" - case local_TLS_ECDHE_RSA_WITH_RC4_128_SHA: + case tls.TLS_ECDHE_RSA_WITH_RC4_128_SHA: return "TLS_ECDHE_RSA_WITH_RC4_128_SHA" - case local_TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA: + case tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA: return "TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA" - case local_TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA: + case tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA: return "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA" - case local_TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA: + case tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA: return "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA" - case local_TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256: + case tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256: return "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256" - case local_TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256: + case tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256: return "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256" } - return fmt.Sprintf("unkown %d", p.CipherSuite) + return fmt.Sprintf("Unknown %d", p.CipherSuite) +} + +func (p *prettyConnectionState) GetVersion() string { + switch p.Version { + case tls.VersionSSL30: + return "SSL30" + case tls.VersionTLS10: + return "TLS1.0" + case tls.VersionTLS11: + return "TLS1.1" + case tls.VersionTLS12: + return "TLS1.2" + default: + return fmt.Sprintf("Unknown %d", p.Version) + } } func (c *clientV2) IsReadyForMessages() bool { @@ -325,7 +322,7 @@ func (c *clientV2) IsReadyForMessages() bool { readyCount := atomic.LoadInt64(&c.ReadyCount) inFlightCount := atomic.LoadInt64(&c.InFlightCount) - if c.ctx.nsqd.opts.Verbose { + if c.ctx.nsqd.getOpts().Verbose { c.ctx.nsqd.logf("[%s] state rdy: %4d inflt: %4d", c, readyCount, inFlightCount) } @@ -404,7 +401,7 @@ func (c *clientV2) SetHeartbeatInterval(desiredInterval int) error { case desiredInterval == 0: // do nothing (use default) case desiredInterval >= 1000 && - desiredInterval <= int(c.ctx.nsqd.opts.MaxHeartbeatInterval/time.Millisecond): + desiredInterval <= int(c.ctx.nsqd.getOpts().MaxHeartbeatInterval/time.Millisecond): c.HeartbeatInterval = time.Duration(desiredInterval) * time.Millisecond default: return fmt.Errorf("heartbeat interval (%d) is invalid", desiredInterval) @@ -422,7 +419,7 @@ func (c *clientV2) SetOutputBufferSize(desiredSize int) error { size = 1 case desiredSize == 0: // do nothing (use default) - case desiredSize >= 64 && desiredSize <= int(c.ctx.nsqd.opts.MaxOutputBufferSize): + case desiredSize >= 64 && desiredSize <= int(c.ctx.nsqd.getOpts().MaxOutputBufferSize): size = desiredSize default: return fmt.Errorf("output buffer size (%d) is invalid", desiredSize) @@ -452,7 +449,7 @@ func (c *clientV2) SetOutputBufferTimeout(desiredTimeout int) error { case desiredTimeout == 0: // do nothing (use default) case desiredTimeout >= 1 && - desiredTimeout <= int(c.ctx.nsqd.opts.MaxOutputBufferTimeout/time.Millisecond): + desiredTimeout <= int(c.ctx.nsqd.getOpts().MaxOutputBufferTimeout/time.Millisecond): c.OutputBufferTimeout = time.Duration(desiredTimeout) * time.Millisecond default: return fmt.Errorf("output buffer timeout (%d) is invalid", desiredTimeout) @@ -477,7 +474,7 @@ func (c *clientV2) SetMsgTimeout(msgTimeout int) error { case msgTimeout == 0: // do nothing (use default) case msgTimeout >= 1000 && - msgTimeout <= int(c.ctx.nsqd.opts.MaxMsgTimeout/time.Millisecond): + msgTimeout <= int(c.ctx.nsqd.getOpts().MaxMsgTimeout/time.Millisecond): c.MsgTimeout = time.Duration(msgTimeout) * time.Millisecond default: return fmt.Errorf("msg timeout (%d) is invalid", msgTimeout) @@ -575,7 +572,7 @@ func (c *clientV2) QueryAuthd() error { tlsEnabled = "true" } - authState, err := auth.QueryAnyAuthd(c.ctx.nsqd.opts.AuthHTTPAddresses, + authState, err := auth.QueryAnyAuthd(c.ctx.nsqd.getOpts().AuthHTTPAddresses, remoteIP, tlsEnabled, c.AuthSecret) if err != nil { return err diff --git a/nsqd/http.go b/nsqd/http.go index b5da451f4..9a92dbaa3 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -3,13 +3,15 @@ package nsqd import ( "bufio" "bytes" + "encoding/json" "fmt" "io" "io/ioutil" "net" "net/http" - httpprof "net/http/pprof" + "net/http/pprof" "net/url" + "reflect" "strconv" "strings" "time" @@ -17,12 +19,82 @@ import ( "github.com/bitly/nsq/internal/http_api" "github.com/bitly/nsq/internal/protocol" "github.com/bitly/nsq/internal/version" + "github.com/julienschmidt/httprouter" ) type httpServer struct { ctx *context tlsEnabled bool tlsRequired bool + router http.Handler +} + +func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer { + router := httprouter.New() + router.HandleMethodNotAllowed = true + s := &httpServer{ + ctx: ctx, + tlsEnabled: tlsEnabled, + tlsRequired: tlsRequired, + router: router, + } + + // v1 negotiate + router.Handle("POST", "/pub", http_api.NegotiateVersion(s.doPUB)) + router.Handle("POST", "/mpub", http_api.NegotiateVersion(s.doMPUB)) + router.Handle("GET", "/stats", http_api.NegotiateVersion(s.doStats)) + router.Handle("GET", "/ping", s.pingHandler) + + // only v1 + router.Handle("POST", "/topic/create", http_api.V1(s.doCreateTopic)) + router.Handle("POST", "/topic/delete", http_api.V1(s.doDeleteTopic)) + router.Handle("POST", "/topic/empty", http_api.V1(s.doEmptyTopic)) + router.Handle("POST", "/topic/pause", http_api.V1(s.doPauseTopic)) + router.Handle("POST", "/topic/unpause", http_api.V1(s.doPauseTopic)) + router.Handle("POST", "/channel/create", http_api.V1(s.doCreateChannel)) + router.Handle("POST", "/channel/delete", http_api.V1(s.doDeleteChannel)) + router.Handle("POST", "/channel/empty", http_api.V1(s.doEmptyChannel)) + router.Handle("POST", "/channel/pause", http_api.V1(s.doPauseChannel)) + router.Handle("POST", "/channel/unpause", http_api.V1(s.doPauseChannel)) + router.Handle("GET", "/config/:opt", http_api.V1(s.doConfig)) + router.Handle("PUT", "/config/:opt", http_api.V1(s.doConfig)) + + // deprecated, v1 negotiate + router.Handle("POST", "/put", http_api.NegotiateVersion(s.doPUB)) + router.Handle("POST", "/mput", http_api.NegotiateVersion(s.doMPUB)) + router.Handle("GET", "/info", http_api.NegotiateVersion(s.doInfo)) + router.Handle("POST", "/create_topic", http_api.NegotiateVersion(s.doCreateTopic)) + router.Handle("POST", "/delete_topic", http_api.NegotiateVersion(s.doDeleteTopic)) + router.Handle("POST", "/empty_topic", http_api.NegotiateVersion(s.doEmptyTopic)) + router.Handle("POST", "/pause_topic", http_api.NegotiateVersion(s.doPauseTopic)) + router.Handle("POST", "/unpause_topic", http_api.NegotiateVersion(s.doPauseTopic)) + router.Handle("POST", "/create_channel", http_api.NegotiateVersion(s.doCreateChannel)) + router.Handle("POST", "/delete_channel", http_api.NegotiateVersion(s.doDeleteChannel)) + router.Handle("POST", "/empty_channel", http_api.NegotiateVersion(s.doEmptyChannel)) + router.Handle("POST", "/pause_channel", http_api.NegotiateVersion(s.doPauseChannel)) + router.Handle("POST", "/unpause_channel", http_api.NegotiateVersion(s.doPauseChannel)) + router.Handle("GET", "/create_topic", http_api.NegotiateVersion(s.doCreateTopic)) + router.Handle("GET", "/delete_topic", http_api.NegotiateVersion(s.doDeleteTopic)) + router.Handle("GET", "/empty_topic", http_api.NegotiateVersion(s.doEmptyTopic)) + router.Handle("GET", "/pause_topic", http_api.NegotiateVersion(s.doPauseTopic)) + router.Handle("GET", "/unpause_topic", http_api.NegotiateVersion(s.doPauseTopic)) + router.Handle("GET", "/create_channel", http_api.NegotiateVersion(s.doCreateChannel)) + router.Handle("GET", "/delete_channel", http_api.NegotiateVersion(s.doDeleteChannel)) + router.Handle("GET", "/empty_channel", http_api.NegotiateVersion(s.doEmptyChannel)) + router.Handle("GET", "/pause_channel", http_api.NegotiateVersion(s.doPauseChannel)) + router.Handle("GET", "/unpause_channel", http_api.NegotiateVersion(s.doPauseChannel)) + + // debug + router.HandlerFunc("GET", "/debug/pprof", pprof.Index) + router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline) + router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol) + router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile) + router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap")) + router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine")) + router.Handler("GET", "/debug/pprof/block", pprof.Handler("block")) + router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate")) + + return s } func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -30,145 +102,10 @@ func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { http_api.Respond(w, 403, "TLS_REQUIRED", nil) return } - - err := s.v1Router(w, req) - if err == nil { - return - } - - err = s.deprecatedRouter(w, req) - if err == nil { - return - } - - err = s.debugRouter(w, req) - if err != nil { - s.ctx.nsqd.logf("ERROR: %s", err) - http_api.Respond(w, 404, "NOT_FOUND", nil) - } + s.router.ServeHTTP(w, req) } -func (s *httpServer) debugRouter(w http.ResponseWriter, req *http.Request) error { - switch req.URL.Path { - case "/debug/pprof": - httpprof.Index(w, req) - case "/debug/pprof/cmdline": - httpprof.Cmdline(w, req) - case "/debug/pprof/symbol": - httpprof.Symbol(w, req) - case "/debug/pprof/heap": - httpprof.Handler("heap").ServeHTTP(w, req) - case "/debug/pprof/goroutine": - httpprof.Handler("goroutine").ServeHTTP(w, req) - case "/debug/pprof/profile": - httpprof.Profile(w, req) - case "/debug/pprof/block": - httpprof.Handler("block").ServeHTTP(w, req) - case "/debug/pprof/threadcreate": - httpprof.Handler("threadcreate").ServeHTTP(w, req) - default: - return fmt.Errorf("404 %s", req.URL.Path) - } - return nil -} - -func (s *httpServer) v1Router(w http.ResponseWriter, req *http.Request) error { - switch req.URL.Path { - case "/pub": - http_api.NegotiateVersionWrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doPUB(req) })) - case "/mpub": - http_api.NegotiateVersionWrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doMPUB(req) })) - - case "/stats": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doStats(req) }) - case "/ping": - s.pingHandler(w, req) - - case "/topic/create": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doCreateTopic(req) })) - case "/topic/delete": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doDeleteTopic(req) })) - case "/topic/empty": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doEmptyTopic(req) })) - case "/topic/pause": - fallthrough - case "/topic/unpause": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doPauseTopic(req) })) - - case "/channel/create": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doCreateChannel(req) })) - case "/channel/delete": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doDeleteChannel(req) })) - case "/channel/empty": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doEmptyChannel(req) })) - case "/channel/pause": - fallthrough - case "/channel/unpause": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doPauseChannel(req) })) - - default: - return fmt.Errorf("404 %s", req.URL.Path) - } - return nil -} - -func (s *httpServer) deprecatedRouter(w http.ResponseWriter, req *http.Request) error { - switch req.URL.Path { - case "/put": - http_api.NegotiateVersionWrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doPUB(req) })) - case "/mput": - http_api.NegotiateVersionWrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doMPUB(req) })) - case "/info": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doInfo(req) }) - case "/empty_topic": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doEmptyTopic(req) }) - case "/delete_topic": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doDeleteTopic(req) }) - case "/pause_topic": - fallthrough - case "/unpause_topic": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doPauseTopic(req) }) - case "/empty_channel": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doEmptyChannel(req) }) - case "/delete_channel": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doDeleteChannel(req) }) - case "/pause_channel": - fallthrough - case "/unpause_channel": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doPauseChannel(req) }) - case "/create_topic": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doCreateTopic(req) }) - case "/create_channel": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doCreateChannel(req) }) - default: - return fmt.Errorf("404 %s", req.URL.Path) - } - return nil -} - -func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request) { +func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { health := s.ctx.nsqd.GetHealth() code := 200 if !s.ctx.nsqd.IsHealthy() { @@ -179,7 +116,7 @@ func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request) { io.WriteString(w, health) } -func (s *httpServer) doInfo(req *http.Request) (interface{}, error) { +func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { return struct { Version string `json:"version"` }{ @@ -227,17 +164,17 @@ func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, e return reqParams, s.ctx.nsqd.GetTopic(topicName), nil } -func (s *httpServer) doPUB(req *http.Request) (interface{}, error) { +func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { // TODO: one day I'd really like to just error on chunked requests // to be able to fail "too big" requests before we even read - if req.ContentLength > s.ctx.nsqd.opts.MaxMsgSize { + if req.ContentLength > s.ctx.nsqd.getOpts().MaxMsgSize { return nil, http_api.Err{413, "MSG_TOO_BIG"} } // add 1 so that it's greater than our max when we test for it // (LimitReader returns a "fake" EOF) - readMax := s.ctx.nsqd.opts.MaxMsgSize + 1 + readMax := s.ctx.nsqd.getOpts().MaxMsgSize + 1 body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax)) if err != nil { return nil, http_api.Err{500, "INTERNAL_ERROR"} @@ -261,7 +198,7 @@ func (s *httpServer) doPUB(req *http.Request) (interface{}, error) { return nil, http_api.Err{400, "INVALID_DEFER"} } deferred = time.Duration(di) * time.Millisecond - if deferred < 0 || deferred > s.ctx.nsqd.opts.MaxReqTimeout { + if deferred < 0 || deferred > s.ctx.nsqd.getOpts().MaxReqTimeout { return nil, http_api.Err{400, "INVALID_DEFER"} } } @@ -276,14 +213,14 @@ func (s *httpServer) doPUB(req *http.Request) (interface{}, error) { return "OK", nil } -func (s *httpServer) doMPUB(req *http.Request) (interface{}, error) { +func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { var msgs []*Message var exit bool // TODO: one day I'd really like to just error on chunked requests // to be able to fail "too big" requests before we even read - if req.ContentLength > s.ctx.nsqd.opts.MaxBodySize { + if req.ContentLength > s.ctx.nsqd.getOpts().MaxBodySize { return nil, http_api.Err{413, "BODY_TOO_BIG"} } @@ -296,14 +233,14 @@ func (s *httpServer) doMPUB(req *http.Request) (interface{}, error) { if ok { tmp := make([]byte, 4) msgs, err = readMPUB(req.Body, tmp, s.ctx.nsqd.idChan, - s.ctx.nsqd.opts.MaxMsgSize) + s.ctx.nsqd.getOpts().MaxMsgSize) if err != nil { return nil, http_api.Err{413, err.(*protocol.FatalClientErr).Code[2:]} } } else { // add 1 so that it's greater than our max when we test for it // (LimitReader returns a "fake" EOF) - readMax := s.ctx.nsqd.opts.MaxBodySize + 1 + readMax := s.ctx.nsqd.getOpts().MaxBodySize + 1 rdr := bufio.NewReader(io.LimitReader(req.Body, readMax)) total := 0 for !exit { @@ -329,7 +266,7 @@ func (s *httpServer) doMPUB(req *http.Request) (interface{}, error) { continue } - if int64(len(block)) > s.ctx.nsqd.opts.MaxMsgSize { + if int64(len(block)) > s.ctx.nsqd.getOpts().MaxMsgSize { return nil, http_api.Err{413, "MSG_TOO_BIG"} } @@ -346,12 +283,12 @@ func (s *httpServer) doMPUB(req *http.Request) (interface{}, error) { return "OK", nil } -func (s *httpServer) doCreateTopic(req *http.Request) (interface{}, error) { +func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { _, _, err := s.getTopicFromQuery(req) return nil, err } -func (s *httpServer) doEmptyTopic(req *http.Request) (interface{}, error) { +func (s *httpServer) doEmptyTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqd.logf("ERROR: failed to parse request params - %s", err) @@ -380,7 +317,7 @@ func (s *httpServer) doEmptyTopic(req *http.Request) (interface{}, error) { return nil, nil } -func (s *httpServer) doDeleteTopic(req *http.Request) (interface{}, error) { +func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqd.logf("ERROR: failed to parse request params - %s", err) @@ -400,7 +337,7 @@ func (s *httpServer) doDeleteTopic(req *http.Request) (interface{}, error) { return nil, nil } -func (s *httpServer) doPauseTopic(req *http.Request) (interface{}, error) { +func (s *httpServer) doPauseTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqd.logf("ERROR: failed to parse request params - %s", err) @@ -435,7 +372,7 @@ func (s *httpServer) doPauseTopic(req *http.Request) (interface{}, error) { return nil, nil } -func (s *httpServer) doCreateChannel(req *http.Request) (interface{}, error) { +func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { _, topic, channelName, err := s.getExistingTopicFromQuery(req) if err != nil { return nil, err @@ -444,7 +381,7 @@ func (s *httpServer) doCreateChannel(req *http.Request) (interface{}, error) { return nil, nil } -func (s *httpServer) doEmptyChannel(req *http.Request) (interface{}, error) { +func (s *httpServer) doEmptyChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { _, topic, channelName, err := s.getExistingTopicFromQuery(req) if err != nil { return nil, err @@ -463,7 +400,7 @@ func (s *httpServer) doEmptyChannel(req *http.Request) (interface{}, error) { return nil, nil } -func (s *httpServer) doDeleteChannel(req *http.Request) (interface{}, error) { +func (s *httpServer) doDeleteChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { _, topic, channelName, err := s.getExistingTopicFromQuery(req) if err != nil { return nil, err @@ -477,7 +414,7 @@ func (s *httpServer) doDeleteChannel(req *http.Request) (interface{}, error) { return nil, nil } -func (s *httpServer) doPauseChannel(req *http.Request) (interface{}, error) { +func (s *httpServer) doPauseChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { _, topic, channelName, err := s.getExistingTopicFromQuery(req) if err != nil { return nil, err @@ -506,7 +443,7 @@ func (s *httpServer) doPauseChannel(req *http.Request) (interface{}, error) { return nil, nil } -func (s *httpServer) doStats(req *http.Request) (interface{}, error) { +func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { s.ctx.nsqd.logf("ERROR: failed to parse request params - %s", err) @@ -596,3 +533,66 @@ func (s *httpServer) printStats(stats []TopicStats, health string, startTime tim } return buf.Bytes() } + +func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { + opt := ps.ByName("opt") + + if req.Method == "PUT" { + // add 1 so that it's greater than our max when we test for it + // (LimitReader returns a "fake" EOF) + readMax := s.ctx.nsqd.getOpts().MaxMsgSize + 1 + body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax)) + if err != nil { + return nil, http_api.Err{500, "INTERNAL_ERROR"} + } + if int64(len(body)) == readMax || len(body) == 0 { + return nil, http_api.Err{413, "INVALID_VALUE"} + } + + opts := *s.ctx.nsqd.getOpts() + switch opt { + case "nsqlookupd_tcp_addresses": + err := json.Unmarshal(body, &opts.NSQLookupdTCPAddresses) + if err != nil { + return nil, http_api.Err{400, "INVALID_VALUE"} + } + case "verbose": + err := json.Unmarshal(body, &opts.Verbose) + if err != nil { + return nil, http_api.Err{400, "INVALID_VALUE"} + } + default: + return nil, http_api.Err{400, "INVALID_OPTION"} + } + s.ctx.nsqd.swapOpts(&opts) + s.ctx.nsqd.triggerOptsNotification() + } + + v, ok := getOptByCfgName(s.ctx.nsqd.getOpts(), opt) + if !ok { + return nil, http_api.Err{400, "INVALID_OPTION"} + } + + return v, nil +} + +func getOptByCfgName(opts interface{}, name string) (interface{}, bool) { + val := reflect.ValueOf(opts).Elem() + typ := val.Type() + for i := 0; i < typ.NumField(); i++ { + field := typ.Field(i) + flagName := field.Tag.Get("flag") + cfgName := field.Tag.Get("cfg") + if flagName == "" { + continue + } + if cfgName == "" { + cfgName = strings.Replace(flagName, "-", "_", -1) + } + if name != cfgName { + continue + } + return val.FieldByName(field.Name).Interface(), true + } + return nil, false +} diff --git a/nsqd/http_test.go b/nsqd/http_test.go index 087a0cc54..67d125e3a 100644 --- a/nsqd/http_test.go +++ b/nsqd/http_test.go @@ -16,10 +16,11 @@ import ( "github.com/bitly/go-nsq" "github.com/bitly/nsq/internal/version" + "github.com/bitly/nsq/nsqlookupd" ) func TestHTTPput(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) _, httpAddr, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -42,7 +43,7 @@ func TestHTTPput(t *testing.T) { } func TestHTTPputEmpty(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) _, httpAddr, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -66,7 +67,7 @@ func TestHTTPputEmpty(t *testing.T) { } func TestHTTPmput(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) _, httpAddr, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -95,7 +96,7 @@ func TestHTTPmput(t *testing.T) { } func TestHTTPmputEmpty(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) _, httpAddr, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -126,7 +127,7 @@ func TestHTTPmputEmpty(t *testing.T) { } func TestHTTPmputBinary(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) _, httpAddr, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -155,7 +156,7 @@ func TestHTTPmputBinary(t *testing.T) { } func TestHTTPpubDefer(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) _, httpAddr, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -182,7 +183,7 @@ func TestHTTPpubDefer(t *testing.T) { } func TestHTTPSRequire(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true opts.TLSCert = "./test/certs/server.pem" @@ -227,7 +228,7 @@ func TestHTTPSRequire(t *testing.T) { } func TestHTTPSRequireVerify(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true opts.TLSCert = "./test/certs/server.pem" @@ -291,7 +292,7 @@ func TestHTTPSRequireVerify(t *testing.T) { } func TestTLSRequireVerifyExceptHTTP(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true opts.TLSCert = "./test/certs/server.pem" @@ -321,7 +322,7 @@ func TestTLSRequireVerifyExceptHTTP(t *testing.T) { } func TestHTTPDeprecatedTopicChannel(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) _, httpAddr, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -418,7 +419,7 @@ func TestHTTPDeprecatedTopicChannel(t *testing.T) { } func TestHTTPTransitionTopicChannel(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) _, httpAddr, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -540,7 +541,7 @@ func TestHTTPTransitionTopicChannel(t *testing.T) { } func TestHTTPV1TopicChannel(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) _, httpAddr, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -647,7 +648,7 @@ func TestHTTPV1TopicChannel(t *testing.T) { func BenchmarkHTTPput(b *testing.B) { var wg sync.WaitGroup b.StopTimer() - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(b) opts.MemQueueSize = int64(b.N) _, httpAddr, nsqd := mustStartNSQD(opts) @@ -688,7 +689,7 @@ func BenchmarkHTTPput(b *testing.B) { func TestHTTPgetStatusJSON(t *testing.T) { testTime := time.Now() - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) _, httpAddr, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -708,7 +709,7 @@ func TestHTTPgetStatusJSON(t *testing.T) { func TestHTTPgetStatusText(t *testing.T) { testTime := time.Now() - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) _, httpAddr, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -724,3 +725,38 @@ func TestHTTPgetStatusText(t *testing.T) { equal(t, resp.StatusCode, 200) nequal(t, body, nil) } + +func TestHTTPconfig(t *testing.T) { + lopts := nsqlookupd.NewOptions() + lopts.Logger = newTestLogger(t) + _, _, lookupd1 := mustStartNSQLookupd(lopts) + defer lookupd1.Exit() + _, _, lookupd2 := mustStartNSQLookupd(lopts) + defer lookupd2.Exit() + + opts := NewOptions() + opts.Logger = newTestLogger(t) + _, httpAddr, nsqd := mustStartNSQD(opts) + defer os.RemoveAll(opts.DataPath) + defer nsqd.Exit() + + url := fmt.Sprintf("http://%s/config/nsqlookupd_tcp_addresses", httpAddr) + resp, err := http.Get(url) + equal(t, err, nil) + defer resp.Body.Close() + body, _ := ioutil.ReadAll(resp.Body) + equal(t, resp.StatusCode, 200) + equal(t, string(body), "[]") + + client := http.Client{} + addrs := fmt.Sprintf(`["%s","%s"]`, lookupd1.RealTCPAddr().String(), lookupd2.RealTCPAddr().String()) + url = fmt.Sprintf("http://%s/config/nsqlookupd_tcp_addresses", httpAddr) + req, err := http.NewRequest("PUT", url, bytes.NewBuffer([]byte(addrs))) + equal(t, err, nil) + resp, err = client.Do(req) + equal(t, err, nil) + defer resp.Body.Close() + body, _ = ioutil.ReadAll(resp.Body) + equal(t, resp.StatusCode, 200) + equal(t, string(body), addrs) +} diff --git a/nsqd/lookup.go b/nsqd/lookup.go index 8f193f377..4bed93f4b 100644 --- a/nsqd/lookup.go +++ b/nsqd/lookup.go @@ -12,8 +12,45 @@ import ( "github.com/bitly/nsq/internal/version" ) +func connectCallback(n *NSQD, hostname string, syncTopicChan chan *lookupPeer) func(*lookupPeer) { + return func(lp *lookupPeer) { + ci := make(map[string]interface{}) + ci["version"] = version.Binary + ci["tcp_port"] = n.RealTCPAddr().Port + ci["http_port"] = n.RealHTTPAddr().Port + ci["hostname"] = hostname + ci["broadcast_address"] = n.getOpts().BroadcastAddress + + cmd, err := nsq.Identify(ci) + if err != nil { + lp.Close() + return + } + resp, err := lp.Command(cmd) + if err != nil { + n.logf("LOOKUPD(%s): ERROR %s - %s", lp, cmd, err) + } else if bytes.Equal(resp, []byte("E_INVALID")) { + n.logf("LOOKUPD(%s): lookupd returned %s", lp, resp) + } else { + err = json.Unmarshal(resp, &lp.Info) + if err != nil { + n.logf("LOOKUPD(%s): ERROR parsing response - %s", lp, resp) + } else { + n.logf("LOOKUPD(%s): peer info %+v", lp, lp.Info) + } + } + + go func() { + syncTopicChan <- lp + }() + } +} + func (n *NSQD) lookupLoop() { + var lookupPeers []*lookupPeer + var lookupAddrs []string syncTopicChan := make(chan *lookupPeer) + connect := true hostname, err := os.Hostname() if err != nil { @@ -21,50 +58,29 @@ func (n *NSQD) lookupLoop() { os.Exit(1) } - for _, host := range n.opts.NSQLookupdTCPAddresses { - n.logf("LOOKUP: adding peer %s", host) - lookupPeer := newLookupPeer(host, n.opts.Logger, func(lp *lookupPeer) { - ci := make(map[string]interface{}) - ci["version"] = version.Binary - ci["tcp_port"] = n.RealTCPAddr().Port - ci["http_port"] = n.RealHTTPAddr().Port - ci["hostname"] = hostname - ci["broadcast_address"] = n.opts.BroadcastAddress - - cmd, err := nsq.Identify(ci) - if err != nil { - lp.Close() - return - } - resp, err := lp.Command(cmd) - if err != nil { - n.logf("LOOKUPD(%s): ERROR %s - %s", lp, cmd, err) - } else if bytes.Equal(resp, []byte("E_INVALID")) { - n.logf("LOOKUPD(%s): lookupd returned %s", lp, resp) - } else { - err = json.Unmarshal(resp, &lp.Info) - if err != nil { - n.logf("LOOKUPD(%s): ERROR parsing response - %s", lp, resp) - } else { - n.logf("LOOKUPD(%s): peer info %+v", lp, lp.Info) - } - } - - go func() { - syncTopicChan <- lp - }() - }) - lookupPeer.Command(nil) // start the connection - n.lookupPeers = append(n.lookupPeers, lookupPeer) - } - // for announcements, lookupd determines the host automatically ticker := time.Tick(15 * time.Second) for { + if connect { + for _, host := range n.getOpts().NSQLookupdTCPAddresses { + if in(host, lookupAddrs) { + continue + } + n.logf("LOOKUP(%s): adding peer", host) + lookupPeer := newLookupPeer(host, n.getOpts().Logger, + connectCallback(n, hostname, syncTopicChan)) + lookupPeer.Command(nil) // start the connection + lookupPeers = append(lookupPeers, lookupPeer) + lookupAddrs = append(lookupAddrs, host) + } + n.lookupPeers.Store(lookupPeers) + connect = false + } + select { case <-ticker: // send a heartbeat and read a response (read detects closed conns) - for _, lookupPeer := range n.lookupPeers { + for _, lookupPeer := range lookupPeers { n.logf("LOOKUPD(%s): sending heartbeat", lookupPeer) cmd := nsq.Ping() _, err := lookupPeer.Command(cmd) @@ -97,7 +113,7 @@ func (n *NSQD) lookupLoop() { } } - for _, lookupPeer := range n.lookupPeers { + for _, lookupPeer := range lookupPeers { n.logf("LOOKUPD(%s): %s %s", lookupPeer, branch, cmd) _, err := lookupPeer.Command(cmd) if err != nil { @@ -129,6 +145,21 @@ func (n *NSQD) lookupLoop() { break } } + case <-n.optsNotificationChan: + var tmpPeers []*lookupPeer + var tmpAddrs []string + for _, lp := range lookupPeers { + if in(lp.addr, n.getOpts().NSQLookupdTCPAddresses) { + tmpPeers = append(tmpPeers, lp) + tmpAddrs = append(tmpAddrs, lp.addr) + continue + } + n.logf("LOOKUP(%s): removing peer", lp) + lp.Close() + } + lookupPeers = tmpPeers + lookupAddrs = tmpAddrs + connect = true case <-n.exitChan: goto exit } @@ -138,9 +169,22 @@ exit: n.logf("LOOKUP: closing") } -func (n *NSQD) lookupHTTPAddrs() []string { +func in(s string, lst []string) bool { + for _, v := range lst { + if s == v { + return true + } + } + return false +} + +func (n *NSQD) lookupdHTTPAddrs() []string { var lookupHTTPAddrs []string - for _, lp := range n.lookupPeers { + lookupPeers := n.lookupPeers.Load() + if lookupPeers == nil { + return nil + } + for _, lp := range lookupPeers.([]*lookupPeer) { if len(lp.Info.BroadcastAddress) <= 0 { continue } diff --git a/nsqd/lookup_peer.go b/nsqd/lookup_peer.go index 112503c2e..ef6b57578 100644 --- a/nsqd/lookup_peer.go +++ b/nsqd/lookup_peer.go @@ -73,7 +73,10 @@ func (lp *lookupPeer) Write(data []byte) (int, error) { // Close implements the io.Closer interface func (lp *lookupPeer) Close() error { lp.state = stateDisconnected - return lp.conn.Close() + if lp.conn != nil { + return lp.conn.Close() + } + return nil } // Command performs a round-trip for the specified Command. diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 8b1deef1d..9133e66b9 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -43,7 +43,7 @@ type NSQD struct { sync.RWMutex - opts *nsqdOptions + opts atomic.Value flag int32 errMtx sync.RWMutex @@ -52,7 +52,7 @@ type NSQD struct { topicMap map[string]*Topic - lookupPeers []*lookupPeer + lookupPeers atomic.Value tcpListener net.Listener httpListener net.Listener @@ -61,22 +61,25 @@ type NSQD struct { poolSize int - idChan chan MessageID - notifyChan chan interface{} - exitChan chan int - waitGroup util.WaitGroupWrapper + idChan chan MessageID + notifyChan chan interface{} + optsNotificationChan chan struct{} + exitChan chan int + + waitGroup util.WaitGroupWrapper } -func NewNSQD(opts *nsqdOptions) *NSQD { +func New(opts *Options) *NSQD { n := &NSQD{ - opts: opts, - flag: flagHealthy, - startTime: time.Now(), - topicMap: make(map[string]*Topic), - idChan: make(chan MessageID, 4096), - exitChan: make(chan int), - notifyChan: make(chan interface{}), + flag: flagHealthy, + startTime: time.Now(), + topicMap: make(map[string]*Topic), + idChan: make(chan MessageID, 4096), + exitChan: make(chan int), + notifyChan: make(chan interface{}), + optsNotificationChan: make(chan struct{}, 1), } + n.swapOpts(opts) if opts.MaxDeflateLevel < 1 || opts.MaxDeflateLevel > 9 { n.logf("FATAL: --max-deflate-level must be [1,9]") @@ -111,23 +114,38 @@ func NewNSQD(opts *nsqdOptions) *NSQD { n.logf("FATAL: failed to build TLS config - %s", err) os.Exit(1) } - if tlsConfig == nil && n.opts.TLSRequired != TLSNotRequired { + if tlsConfig == nil && opts.TLSRequired != TLSNotRequired { n.logf("FATAL: cannot require TLS client connections without TLS key and cert") os.Exit(1) } n.tlsConfig = tlsConfig n.logf(version.String("nsqd")) - n.logf("ID: %d", n.opts.ID) + n.logf("ID: %d", opts.ID) return n } func (n *NSQD) logf(f string, args ...interface{}) { - if n.opts.Logger == nil { + if n.getOpts().Logger == nil { return } - n.opts.Logger.Output(2, fmt.Sprintf(f, args...)) + n.getOpts().Logger.Output(2, fmt.Sprintf(f, args...)) +} + +func (n *NSQD) getOpts() *Options { + return n.opts.Load().(*Options) +} + +func (n *NSQD) swapOpts(opts *Options) { + n.opts.Store(opts) +} + +func (n *NSQD) triggerOptsNotification() { + select { + case n.optsNotificationChan <- struct{}{}: + default: + } } func (n *NSQD) RealTCPAddr() *net.TCPAddr { @@ -199,9 +217,9 @@ func (n *NSQD) Main() { ctx := &context{n} - tcpListener, err := net.Listen("tcp", n.opts.TCPAddress) + tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress) if err != nil { - n.logf("FATAL: listen (%s) failed - %s", n.opts.TCPAddress, err) + n.logf("FATAL: listen (%s) failed - %s", n.getOpts().TCPAddress, err) os.Exit(1) } n.Lock() @@ -209,48 +227,40 @@ func (n *NSQD) Main() { n.Unlock() tcpServer := &tcpServer{ctx: ctx} n.waitGroup.Wrap(func() { - protocol.TCPServer(n.tcpListener, tcpServer, n.opts.Logger) + protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger) }) - if n.tlsConfig != nil && n.opts.HTTPSAddress != "" { - httpsListener, err = tls.Listen("tcp", n.opts.HTTPSAddress, n.tlsConfig) + if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { + httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig) if err != nil { - n.logf("FATAL: listen (%s) failed - %s", n.opts.HTTPSAddress, err) + n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPSAddress, err) os.Exit(1) } n.Lock() n.httpsListener = httpsListener n.Unlock() - httpsServer := &httpServer{ - ctx: ctx, - tlsEnabled: true, - tlsRequired: true, - } + httpsServer := newHTTPServer(ctx, true, true) n.waitGroup.Wrap(func() { - http_api.Serve(n.httpsListener, httpsServer, n.opts.Logger, "HTTPS") + http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.getOpts().Logger) }) } - httpListener, err = net.Listen("tcp", n.opts.HTTPAddress) + httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress) if err != nil { - n.logf("FATAL: listen (%s) failed - %s", n.opts.HTTPAddress, err) + n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPAddress, err) os.Exit(1) } n.Lock() n.httpListener = httpListener n.Unlock() - httpServer := &httpServer{ - ctx: ctx, - tlsEnabled: false, - tlsRequired: n.opts.TLSRequired == TLSRequired, - } + httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired) n.waitGroup.Wrap(func() { - http_api.Serve(n.httpListener, httpServer, n.opts.Logger, "HTTP") + http_api.Serve(n.httpListener, httpServer, "HTTP", n.getOpts().Logger) }) n.waitGroup.Wrap(func() { n.queueScanLoop() }) n.waitGroup.Wrap(func() { n.idPump() }) n.waitGroup.Wrap(func() { n.lookupLoop() }) - if n.opts.StatsdAddress != "" { + if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(func() { n.statsdLoop() }) } } @@ -258,7 +268,7 @@ func (n *NSQD) Main() { func (n *NSQD) LoadMetadata() { n.setFlag(flagLoading, true) defer n.setFlag(flagLoading, false) - fn := fmt.Sprintf(path.Join(n.opts.DataPath, "nsqd.%d.dat"), n.opts.ID) + fn := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID) data, err := ioutil.ReadFile(fn) if err != nil { if !os.IsNotExist(err) { @@ -329,7 +339,7 @@ func (n *NSQD) LoadMetadata() { func (n *NSQD) PersistMetadata() error { // persist metadata about what topics/channels we have // so that upon restart we can get back to the same state - fileName := fmt.Sprintf(path.Join(n.opts.DataPath, "nsqd.%d.dat"), n.opts.ID) + fileName := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID) n.logf("NSQ: persisting topic/channel metadata to %s", fileName) js := make(map[string]interface{}) @@ -423,6 +433,7 @@ func (n *NSQD) Exit() { // to return a pointer to a Topic object (potentially new) func (n *NSQD) GetTopic(topicName string) *Topic { n.Lock() + t, ok := n.topicMap[topicName] if ok { n.Unlock() @@ -440,10 +451,12 @@ func (n *NSQD) GetTopic(topicName string) *Topic { // channels from lookupd. This blocks concurrent PutMessages to this topic. t.Lock() n.Unlock() + // if using lookupd, make a blocking call to get the topics, and immediately create them. // this makes sure that any message received is buffered to the right channels - if len(n.lookupPeers) > 0 { - channelNames, _ := lookupd.GetLookupdTopicChannels(t.name, n.lookupHTTPAddrs()) + lookupdHTTPAddrs := n.lookupdHTTPAddrs() + if len(lookupdHTTPAddrs) > 0 { + channelNames, _ := lookupd.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs) for _, channelName := range channelNames { if strings.HasSuffix(channelName, "#ephemeral") { // we don't want to pre-create ephemeral channels @@ -453,6 +466,7 @@ func (n *NSQD) GetTopic(topicName string) *Topic { t.getOrCreateChannel(channelName) } } + t.Unlock() // NOTE: I would prefer for this to only happen in topic.GetChannel() but we're special @@ -507,8 +521,9 @@ func (n *NSQD) DeleteExistingTopic(topicName string) error { func (n *NSQD) idPump() { factory := &guidFactory{} lastError := time.Unix(0, 0) + workerID := n.getOpts().ID for { - id, err := factory.NewGUID(n.opts.ID) + id, err := factory.NewGUID(workerID) if err != nil { now := time.Now() if now.Sub(lastError) > time.Second { @@ -577,8 +592,8 @@ func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, c idealPoolSize := int(float64(num) * 0.25) if idealPoolSize < 1 { idealPoolSize = 1 - } else if idealPoolSize > n.opts.QueueScanWorkerPoolMax { - idealPoolSize = n.opts.QueueScanWorkerPoolMax + } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax { + idealPoolSize = n.getOpts().QueueScanWorkerPoolMax } for { if idealPoolSize == n.poolSize { @@ -632,12 +647,12 @@ func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, close // If QueueScanDirtyPercent (default: 25%) of the selected channels were dirty, // the loop continues without sleep. func (n *NSQD) queueScanLoop() { - workCh := make(chan *Channel, n.opts.QueueScanSelectionCount) - responseCh := make(chan bool, n.opts.QueueScanSelectionCount) + workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount) + responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount) closeCh := make(chan int) - workTicker := time.NewTicker(n.opts.QueueScanInterval) - refreshTicker := time.NewTicker(n.opts.QueueScanRefreshInterval) + workTicker := time.NewTicker(n.getOpts().QueueScanInterval) + refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval) channels := n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) @@ -656,7 +671,7 @@ func (n *NSQD) queueScanLoop() { goto exit } - num := n.opts.QueueScanSelectionCount + num := n.getOpts().QueueScanSelectionCount if num > len(channels) { num = len(channels) } @@ -673,7 +688,7 @@ func (n *NSQD) queueScanLoop() { } } - if float64(numDirty)/float64(num) > n.opts.QueueScanDirtyPercent { + if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent { goto loop } } @@ -685,7 +700,7 @@ exit: refreshTicker.Stop() } -func buildTLSConfig(opts *nsqdOptions) (*tls.Config, error) { +func buildTLSConfig(opts *Options) (*tls.Config, error) { var tlsConfig *tls.Config if opts.TLSCert == "" && opts.TLSKey == "" { @@ -732,5 +747,5 @@ func buildTLSConfig(opts *nsqdOptions) (*tls.Config, error) { } func (n *NSQD) IsAuthEnabled() bool { - return len(n.opts.AuthHTTPAddresses) != 0 + return len(n.getOpts().AuthHTTPAddresses) != 0 } diff --git a/nsqd/nsqd_test.go b/nsqd/nsqd_test.go index f854deddf..30f3d2103 100644 --- a/nsqd/nsqd_test.go +++ b/nsqd/nsqd_test.go @@ -3,6 +3,7 @@ package nsqd import ( "fmt" "io/ioutil" + "net" "os" "path" "path/filepath" @@ -13,6 +14,8 @@ import ( "time" "github.com/bitly/go-simplejson" + "github.com/bitly/nsq/internal/http_api" + "github.com/bitly/nsq/nsqlookupd" ) func assert(t *testing.T, condition bool, msg string, v ...interface{}) { @@ -60,7 +63,7 @@ func newTestLogger(tbl tbLog) logger { } func getMetadata(n *NSQD) (*simplejson.Json, error) { - fn := fmt.Sprintf(path.Join(n.opts.DataPath, "nsqd.%d.dat"), n.opts.ID) + fn := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID) data, err := ioutil.ReadFile(fn) if err != nil { return nil, err @@ -77,7 +80,7 @@ func TestStartup(t *testing.T) { iterations := 300 doneExitChan := make(chan int) - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.MemQueueSize = 100 opts.MaxBytesPerFile = 10240 @@ -147,7 +150,7 @@ func TestStartup(t *testing.T) { // start up a new nsqd w/ the same folder - opts = NewNSQDOptions() + opts = NewOptions() opts.Logger = newTestLogger(t) opts.MemQueueSize = 100 opts.MaxBytesPerFile = 10240 @@ -191,7 +194,7 @@ func TestStartup(t *testing.T) { func TestEphemeralTopicsAndChannels(t *testing.T) { // ephemeral topics/channels are lazily removed after the last channel/client is removed - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.MemQueueSize = 100 _, _, nsqd := mustStartNSQD(opts) @@ -243,7 +246,7 @@ func metadataForChannel(n *NSQD, topicIndex int, channelIndex int) *simplejson.J } func TestPauseMetadata(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) _, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -276,3 +279,148 @@ func TestPauseMetadata(t *testing.T) { b, _ = metadataForChannel(nsqd, 0, 0).Get("paused").Bool() equal(t, b, false) } + +func mustStartNSQLookupd(opts *nsqlookupd.Options) (*net.TCPAddr, *net.TCPAddr, *nsqlookupd.NSQLookupd) { + opts.TCPAddress = "127.0.0.1:0" + opts.HTTPAddress = "127.0.0.1:0" + lookupd := nsqlookupd.New(opts) + lookupd.Main() + return lookupd.RealTCPAddr(), lookupd.RealHTTPAddr(), lookupd +} + +func TestReconfigure(t *testing.T) { + lopts := nsqlookupd.NewOptions() + lopts.Logger = newTestLogger(t) + _, _, lookupd1 := mustStartNSQLookupd(lopts) + defer lookupd1.Exit() + _, _, lookupd2 := mustStartNSQLookupd(lopts) + defer lookupd2.Exit() + _, _, lookupd3 := mustStartNSQLookupd(lopts) + defer lookupd3.Exit() + + opts := NewOptions() + opts.Logger = newTestLogger(t) + _, _, nsqd := mustStartNSQD(opts) + defer os.RemoveAll(opts.DataPath) + defer nsqd.Exit() + + time.Sleep(50 * time.Millisecond) + + newOpts := *opts + newOpts.NSQLookupdTCPAddresses = []string{lookupd1.RealTCPAddr().String()} + nsqd.swapOpts(&newOpts) + nsqd.triggerOptsNotification() + equal(t, len(nsqd.getOpts().NSQLookupdTCPAddresses), 1) + + time.Sleep(50 * time.Millisecond) + + numLookupPeers := len(nsqd.lookupPeers.Load().([]*lookupPeer)) + equal(t, numLookupPeers, 1) + + newOpts = *opts + newOpts.NSQLookupdTCPAddresses = []string{lookupd2.RealTCPAddr().String(), lookupd3.RealTCPAddr().String()} + nsqd.swapOpts(&newOpts) + nsqd.triggerOptsNotification() + equal(t, len(nsqd.getOpts().NSQLookupdTCPAddresses), 2) + + time.Sleep(50 * time.Millisecond) + + var lookupPeers []string + for _, lp := range nsqd.lookupPeers.Load().([]*lookupPeer) { + lookupPeers = append(lookupPeers, lp.addr) + } + equal(t, len(lookupPeers), 2) + equal(t, lookupPeers, newOpts.NSQLookupdTCPAddresses) +} + +func TestCluster(t *testing.T) { + lopts := nsqlookupd.NewOptions() + lopts.Logger = newTestLogger(t) + lopts.BroadcastAddress = "127.0.0.1" + _, _, lookupd := mustStartNSQLookupd(lopts) + + opts := NewOptions() + opts.Logger = newTestLogger(t) + opts.NSQLookupdTCPAddresses = []string{lookupd.RealTCPAddr().String()} + opts.BroadcastAddress = "127.0.0.1" + _, _, nsqd := mustStartNSQD(opts) + defer os.RemoveAll(opts.DataPath) + defer nsqd.Exit() + + topicName := "cluster_test" + strconv.Itoa(int(time.Now().Unix())) + + hostname, err := os.Hostname() + equal(t, err, nil) + + url := fmt.Sprintf("http://%s/topic/create?topic=%s", nsqd.RealHTTPAddr(), topicName) + _, err = http_api.NegotiateV1("POST", url, nil) + equal(t, err, nil) + + url = fmt.Sprintf("http://%s/channel/create?topic=%s&channel=ch", nsqd.RealHTTPAddr(), topicName) + _, err = http_api.NegotiateV1("POST", url, nil) + equal(t, err, nil) + + // allow some time for nsqd to push info to nsqlookupd + time.Sleep(350 * time.Millisecond) + + data, err := http_api.NegotiateV1("GET", fmt.Sprintf("http://%s/debug", lookupd.RealHTTPAddr()), nil) + equal(t, err, nil) + + topicData := data.Get("topic:" + topicName + ":") + producers, _ := topicData.Array() + equal(t, len(producers), 1) + + producer := topicData.GetIndex(0) + equal(t, producer.Get("hostname").MustString(), hostname) + equal(t, producer.Get("broadcast_address").MustString(), "127.0.0.1") + equal(t, producer.Get("tcp_port").MustInt(), nsqd.RealTCPAddr().Port) + equal(t, producer.Get("tombstoned").MustBool(), false) + + channelData := data.Get("channel:" + topicName + ":ch") + producers, _ = channelData.Array() + equal(t, len(producers), 1) + + producer = topicData.GetIndex(0) + equal(t, producer.Get("hostname").MustString(), hostname) + equal(t, producer.Get("broadcast_address").MustString(), "127.0.0.1") + equal(t, producer.Get("tcp_port").MustInt(), nsqd.RealTCPAddr().Port) + equal(t, producer.Get("tombstoned").MustBool(), false) + + data, err = http_api.NegotiateV1("GET", fmt.Sprintf("http://%s/lookup?topic=%s", lookupd.RealHTTPAddr(), topicName), nil) + equal(t, err, nil) + + producers, _ = data.Get("producers").Array() + equal(t, len(producers), 1) + + producer = data.Get("producers").GetIndex(0) + equal(t, producer.Get("hostname").MustString(), hostname) + equal(t, producer.Get("broadcast_address").MustString(), "127.0.0.1") + equal(t, producer.Get("tcp_port").MustInt(), nsqd.RealTCPAddr().Port) + + channels, _ := data.Get("channels").Array() + equal(t, len(channels), 1) + + channel := channels[0].(string) + equal(t, channel, "ch") + + data, err = http_api.NegotiateV1("POST", fmt.Sprintf("http://%s/topic/delete?topic=%s", nsqd.RealHTTPAddr(), topicName), nil) + equal(t, err, nil) + + // allow some time for nsqd to push info to nsqlookupd + time.Sleep(350 * time.Millisecond) + + data, err = http_api.NegotiateV1("GET", fmt.Sprintf("http://%s/lookup?topic=%s", lookupd.RealHTTPAddr(), topicName), nil) + equal(t, err, nil) + + producers, _ = data.Get("producers").Array() + equal(t, len(producers), 0) + + data, err = http_api.NegotiateV1("GET", fmt.Sprintf("http://%s/debug", lookupd.RealHTTPAddr()), nil) + equal(t, err, nil) + + producers, _ = data.Get("topic:" + topicName + ":").Array() + equal(t, len(producers), 0) + + producers, _ = data.Get("channel:" + topicName + ":ch").Array() + equal(t, len(producers), 0) +} diff --git a/nsqd/options.go b/nsqd/options.go index a23eaa90b..72641d51c 100644 --- a/nsqd/options.go +++ b/nsqd/options.go @@ -10,7 +10,7 @@ import ( "time" ) -type nsqdOptions struct { +type Options struct { // basic options ID int64 `flag:"worker-id" cfg:"id"` Verbose bool `flag:"verbose"` @@ -74,7 +74,7 @@ type nsqdOptions struct { Logger logger } -func NewNSQDOptions() *nsqdOptions { +func NewOptions() *Options { hostname, err := os.Hostname() if err != nil { log.Fatal(err) @@ -84,7 +84,7 @@ func NewNSQDOptions() *nsqdOptions { io.WriteString(h, hostname) defaultID := int64(crc32.ChecksumIEEE(h.Sum(nil)) % 1024) - return &nsqdOptions{ + return &Options{ ID: defaultID, TCPAddress: "0.0.0.0:4150", @@ -92,6 +92,9 @@ func NewNSQDOptions() *nsqdOptions { HTTPSAddress: "0.0.0.0:4152", BroadcastAddress: hostname, + NSQLookupdTCPAddresses: make([]string, 0), + AuthHTTPAddresses: make([]string, 0), + MemQueueSize: 10000, MaxBytesPerFile: 100 * 1024 * 1024, SyncEvery: 2500, diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 768dbab9d..26464e264 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -78,7 +78,7 @@ func (p *protocolV2) IOLoop(conn net.Conn) error { } params := bytes.Split(line, separatorBytes) - if p.ctx.nsqd.opts.Verbose { + if p.ctx.nsqd.getOpts().Verbose { p.ctx.nsqd.logf("PROTOCOL(V2): [%s] %s", client, params) } @@ -122,7 +122,7 @@ func (p *protocolV2) IOLoop(conn net.Conn) error { } func (p *protocolV2) SendMessage(client *clientV2, msg *Message, buf *bytes.Buffer) error { - if p.ctx.nsqd.opts.Verbose { + if p.ctx.nsqd.getOpts().Verbose { p.ctx.nsqd.logf("PROTOCOL(V2): writing msg(%s) to client(%s) - %s", msg.ID, client, msg.Body) } @@ -340,9 +340,9 @@ func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error) return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body size") } - if int64(bodyLen) > p.ctx.nsqd.opts.MaxBodySize { + if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxBodySize { return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", - fmt.Sprintf("IDENTIFY body too big %d > %d", bodyLen, p.ctx.nsqd.opts.MaxBodySize)) + fmt.Sprintf("IDENTIFY body too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxBodySize)) } if bodyLen <= 0 { @@ -363,7 +363,7 @@ func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error) return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body") } - if p.ctx.nsqd.opts.Verbose { + if p.ctx.nsqd.getOpts().Verbose { p.ctx.nsqd.logf("PROTOCOL(V2): [%s] %+v", client, identifyData) } @@ -378,15 +378,15 @@ func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error) } tlsv1 := p.ctx.nsqd.tlsConfig != nil && identifyData.TLSv1 - deflate := p.ctx.nsqd.opts.DeflateEnabled && identifyData.Deflate + deflate := p.ctx.nsqd.getOpts().DeflateEnabled && identifyData.Deflate deflateLevel := 0 if deflate { if identifyData.DeflateLevel <= 0 { deflateLevel = 6 } - deflateLevel = int(math.Min(float64(deflateLevel), float64(p.ctx.nsqd.opts.MaxDeflateLevel))) + deflateLevel = int(math.Min(float64(deflateLevel), float64(p.ctx.nsqd.getOpts().MaxDeflateLevel))) } - snappy := p.ctx.nsqd.opts.SnappyEnabled && identifyData.Snappy + snappy := p.ctx.nsqd.getOpts().SnappyEnabled && identifyData.Snappy if deflate && snappy { return nil, protocol.NewFatalClientErr(nil, "E_IDENTIFY_FAILED", "cannot enable both deflate and snappy compression") @@ -407,14 +407,14 @@ func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error) OutputBufferSize int `json:"output_buffer_size"` OutputBufferTimeout int64 `json:"output_buffer_timeout"` }{ - MaxRdyCount: p.ctx.nsqd.opts.MaxRdyCount, + MaxRdyCount: p.ctx.nsqd.getOpts().MaxRdyCount, Version: version.Binary, - MaxMsgTimeout: int64(p.ctx.nsqd.opts.MaxMsgTimeout / time.Millisecond), + MaxMsgTimeout: int64(p.ctx.nsqd.getOpts().MaxMsgTimeout / time.Millisecond), MsgTimeout: int64(client.MsgTimeout / time.Millisecond), TLSv1: tlsv1, Deflate: deflate, DeflateLevel: deflateLevel, - MaxDeflateLevel: p.ctx.nsqd.opts.MaxDeflateLevel, + MaxDeflateLevel: p.ctx.nsqd.getOpts().MaxDeflateLevel, Snappy: snappy, SampleRate: client.SampleRate, AuthRequired: p.ctx.nsqd.IsAuthEnabled(), @@ -486,9 +486,9 @@ func (p *protocolV2) AUTH(client *clientV2, params [][]byte) ([]byte, error) { return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "AUTH failed to read body size") } - if int64(bodyLen) > p.ctx.nsqd.opts.MaxBodySize { + if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxBodySize { return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", - fmt.Sprintf("AUTH body too big %d > %d", bodyLen, p.ctx.nsqd.opts.MaxBodySize)) + fmt.Sprintf("AUTH body too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxBodySize)) } if bodyLen <= 0 { @@ -630,11 +630,11 @@ func (p *protocolV2) RDY(client *clientV2, params [][]byte) ([]byte, error) { count = int64(b10) } - if count < 0 || count > p.ctx.nsqd.opts.MaxRdyCount { + if count < 0 || count > p.ctx.nsqd.getOpts().MaxRdyCount { // this needs to be a fatal error otherwise clients would have // inconsistent state return nil, protocol.NewFatalClientErr(nil, "E_INVALID", - fmt.Sprintf("RDY count %d out of range 0-%d", count, p.ctx.nsqd.opts.MaxRdyCount)) + fmt.Sprintf("RDY count %d out of range 0-%d", count, p.ctx.nsqd.getOpts().MaxRdyCount)) } client.SetReadyCount(count) @@ -690,9 +690,9 @@ func (p *protocolV2) REQ(client *clientV2, params [][]byte) ([]byte, error) { } timeoutDuration := time.Duration(timeoutMs) * time.Millisecond - if timeoutDuration < 0 || timeoutDuration > p.ctx.nsqd.opts.MaxReqTimeout { + if timeoutDuration < 0 || timeoutDuration > p.ctx.nsqd.getOpts().MaxReqTimeout { return nil, protocol.NewFatalClientErr(nil, "E_INVALID", - fmt.Sprintf("REQ timeout %d out of range 0-%d", timeoutDuration, p.ctx.nsqd.opts.MaxReqTimeout)) + fmt.Sprintf("REQ timeout %d out of range 0-%d", timeoutDuration, p.ctx.nsqd.getOpts().MaxReqTimeout)) } err = client.Channel.RequeueMessage(client.ID, *id, timeoutDuration) @@ -743,9 +743,9 @@ func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { fmt.Sprintf("PUB invalid message body size %d", bodyLen)) } - if int64(bodyLen) > p.ctx.nsqd.opts.MaxMsgSize { + if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxMsgSize { return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE", - fmt.Sprintf("PUB message too big %d > %d", bodyLen, p.ctx.nsqd.opts.MaxMsgSize)) + fmt.Sprintf("PUB message too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxMsgSize)) } messageBody := make([]byte, bodyLen) @@ -791,13 +791,13 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) { fmt.Sprintf("MPUB invalid body size %d", bodyLen)) } - if int64(bodyLen) > p.ctx.nsqd.opts.MaxBodySize { + if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxBodySize { return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", - fmt.Sprintf("MPUB body too big %d > %d", bodyLen, p.ctx.nsqd.opts.MaxBodySize)) + fmt.Sprintf("MPUB body too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxBodySize)) } messages, err := readMPUB(client.Reader, client.lenSlice, p.ctx.nsqd.idChan, - p.ctx.nsqd.opts.MaxMsgSize) + p.ctx.nsqd.getOpts().MaxMsgSize) if err != nil { return nil, err } @@ -839,10 +839,10 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) { } timeoutDuration := time.Duration(timeoutMs) * time.Millisecond - if timeoutDuration < 0 || timeoutDuration > p.ctx.nsqd.opts.MaxReqTimeout { + if timeoutDuration < 0 || timeoutDuration > p.ctx.nsqd.getOpts().MaxReqTimeout { return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("DPUB timeout %d out of range 0-%d", - timeoutMs, p.ctx.nsqd.opts.MaxReqTimeout/time.Millisecond)) + timeoutMs, p.ctx.nsqd.getOpts().MaxReqTimeout/time.Millisecond)) } bodyLen, err := readLen(client.Reader, client.lenSlice) @@ -855,9 +855,9 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) { fmt.Sprintf("DPUB invalid message body size %d", bodyLen)) } - if int64(bodyLen) > p.ctx.nsqd.opts.MaxMsgSize { + if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxMsgSize { return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE", - fmt.Sprintf("DPUB message too big %d > %d", bodyLen, p.ctx.nsqd.opts.MaxMsgSize)) + fmt.Sprintf("DPUB message too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxMsgSize)) } messageBody := make([]byte, bodyLen) @@ -966,7 +966,7 @@ func readLen(r io.Reader, tmp []byte) (int32, error) { } func enforceTLSPolicy(client *clientV2, p *protocolV2, command []byte) error { - if p.ctx.nsqd.opts.TLSRequired != TLSNotRequired && atomic.LoadInt32(&client.TLS) != 1 { + if p.ctx.nsqd.getOpts().TLSRequired != TLSNotRequired && atomic.LoadInt32(&client.TLS) != 1 { return protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("cannot %s in current state (TLS required)", command)) } diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index 5b68709b1..fffc3e241 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -27,7 +27,7 @@ import ( "github.com/mreiferson/go-snappystream" ) -func mustStartNSQD(opts *nsqdOptions) (*net.TCPAddr, *net.TCPAddr, *NSQD) { +func mustStartNSQD(opts *Options) (*net.TCPAddr, *net.TCPAddr, *NSQD) { opts.TCPAddress = "127.0.0.1:0" opts.HTTPAddress = "127.0.0.1:0" opts.HTTPSAddress = "127.0.0.1:0" @@ -38,11 +38,9 @@ func mustStartNSQD(opts *nsqdOptions) (*net.TCPAddr, *net.TCPAddr, *NSQD) { } opts.DataPath = tmpDir } - nsqd := NewNSQD(opts) + nsqd := New(opts) nsqd.Main() - return nsqd.tcpListener.Addr().(*net.TCPAddr), - nsqd.httpListener.Addr().(*net.TCPAddr), - nsqd + return nsqd.RealTCPAddr(), nsqd.RealHTTPAddr(), nsqd } func mustConnectNSQD(tcpAddr *net.TCPAddr) (net.Conn, error) { @@ -122,7 +120,7 @@ func TestChannelTopicNames(t *testing.T) { // exercise the basic operations of the V2 protocol func TestBasicV2(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.ClientTimeout = 60 * time.Second tcpAddr, _, nsqd := mustStartNSQD(opts) @@ -157,7 +155,7 @@ func TestBasicV2(t *testing.T) { func TestMultipleConsumerV2(t *testing.T) { msgChan := make(chan *Message) - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.ClientTimeout = 60 * time.Second tcpAddr, _, nsqd := mustStartNSQD(opts) @@ -203,7 +201,7 @@ func TestMultipleConsumerV2(t *testing.T) { func TestClientTimeout(t *testing.T) { topicName := "test_client_timeout_v2" + strconv.Itoa(int(time.Now().Unix())) - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.ClientTimeout = 150 * time.Millisecond opts.Verbose = true @@ -240,7 +238,7 @@ done: func TestClientHeartbeat(t *testing.T) { topicName := "test_hb_v2" + strconv.Itoa(int(time.Now().Unix())) - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.ClientTimeout = 200 * time.Millisecond tcpAddr, _, nsqd := mustStartNSQD(opts) @@ -276,7 +274,7 @@ func TestClientHeartbeat(t *testing.T) { func TestClientHeartbeatDisableSUB(t *testing.T) { topicName := "test_hb_v2" + strconv.Itoa(int(time.Now().Unix())) - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.ClientTimeout = 200 * time.Millisecond opts.Verbose = true @@ -295,7 +293,7 @@ func TestClientHeartbeatDisableSUB(t *testing.T) { } func TestClientHeartbeatDisable(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.ClientTimeout = 100 * time.Millisecond tcpAddr, _, nsqd := mustStartNSQD(opts) @@ -317,7 +315,7 @@ func TestClientHeartbeatDisable(t *testing.T) { } func TestMaxHeartbeatIntervalValid(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.MaxHeartbeatInterval = 300 * time.Second tcpAddr, _, nsqd := mustStartNSQD(opts) @@ -335,7 +333,7 @@ func TestMaxHeartbeatIntervalValid(t *testing.T) { } func TestMaxHeartbeatIntervalInvalid(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.MaxHeartbeatInterval = 300 * time.Second tcpAddr, _, nsqd := mustStartNSQD(opts) @@ -356,7 +354,7 @@ func TestMaxHeartbeatIntervalInvalid(t *testing.T) { func TestPausing(t *testing.T) { topicName := "test_pause_v2" + strconv.Itoa(int(time.Now().Unix())) - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -420,7 +418,7 @@ func TestPausing(t *testing.T) { } func TestEmptyCommand(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -437,7 +435,7 @@ func TestEmptyCommand(t *testing.T) { } func TestSizeLimits(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true opts.MaxMsgSize = 100 @@ -554,7 +552,7 @@ func TestSizeLimits(t *testing.T) { } func TestDPUB(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true tcpAddr, _, nsqd := mustStartNSQD(opts) @@ -596,7 +594,7 @@ func TestDPUB(t *testing.T) { } func TestTouch(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true opts.MsgTimeout = 150 * time.Millisecond @@ -642,7 +640,7 @@ func TestTouch(t *testing.T) { } func TestMaxRdyCount(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true opts.MaxRdyCount = 50 @@ -690,7 +688,7 @@ func TestMaxRdyCount(t *testing.T) { } func TestFatalError(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -714,7 +712,7 @@ func TestFatalError(t *testing.T) { } func TestOutputBuffering(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true opts.MaxOutputBufferSize = 512 * 1024 @@ -766,7 +764,7 @@ func TestOutputBuffering(t *testing.T) { } func TestOutputBufferingValidity(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true opts.MaxOutputBufferSize = 512 * 1024 @@ -809,7 +807,7 @@ func TestOutputBufferingValidity(t *testing.T) { } func TestTLS(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true opts.TLSCert = "./test/certs/server.pem" @@ -848,7 +846,7 @@ func TestTLS(t *testing.T) { } func TestTLSRequired(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true opts.TLSCert = "./test/certs/server.pem" @@ -897,7 +895,7 @@ func TestTLSRequired(t *testing.T) { } func TestTLSAuthRequire(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true opts.TLSCert = "./test/certs/server.pem" @@ -963,7 +961,7 @@ func TestTLSAuthRequire(t *testing.T) { } func TestTLSAuthRequireVerify(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true opts.TLSCert = "./test/certs/server.pem" @@ -1052,7 +1050,7 @@ func TestTLSAuthRequireVerify(t *testing.T) { } func TestDeflate(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true opts.DeflateEnabled = true @@ -1088,7 +1086,7 @@ type readWriter struct { } func TestSnappy(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true opts.SnappyEnabled = true @@ -1141,7 +1139,7 @@ func TestSnappy(t *testing.T) { } func TestTLSDeflate(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true opts.DeflateEnabled = true @@ -1198,7 +1196,7 @@ func TestSampling(t *testing.T) { sampleRate := 42 slack := 5 - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true opts.MaxRdyCount = int64(num) @@ -1265,7 +1263,7 @@ func TestSampling(t *testing.T) { } func TestTLSSnappy(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true opts.SnappyEnabled = true @@ -1316,7 +1314,7 @@ func TestTLSSnappy(t *testing.T) { } func TestClientMsgTimeout(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true opts.QueueScanRefreshInterval = 100 * time.Millisecond @@ -1368,7 +1366,7 @@ func TestClientMsgTimeout(t *testing.T) { } func TestBadFin(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true tcpAddr, _, nsqd := mustStartNSQD(opts) @@ -1428,7 +1426,7 @@ func runAuthTest(t *testing.T, authResponse, authSecret, authError, authSuccess addr, err := url.Parse(authd.URL) equal(t, err, nil) - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true opts.AuthHTTPAddresses = []string{addr.Host} @@ -1457,9 +1455,9 @@ func runAuthTest(t *testing.T, authResponse, authSecret, authError, authSuccess func BenchmarkProtocolV2Exec(b *testing.B) { b.StopTimer() - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(b) - nsqd := NewNSQD(opts) + nsqd := New(opts) ctx := &context{nsqd} p := &protocolV2{ctx} c := newClientV2(0, nil, ctx) @@ -1474,7 +1472,7 @@ func BenchmarkProtocolV2Exec(b *testing.B) { func benchmarkProtocolV2Pub(b *testing.B, size int) { var wg sync.WaitGroup b.StopTimer() - opts := NewNSQDOptions() + opts := NewOptions() batchSize := int(opts.MaxBodySize) / (size + 4) opts.Logger = newTestLogger(b) opts.MemQueueSize = int64(b.N) @@ -1545,7 +1543,7 @@ func BenchmarkProtocolV2Pub1m(b *testing.B) { benchmarkProtocolV2Pub(b, 1024*1 func benchmarkProtocolV2Sub(b *testing.B, size int) { var wg sync.WaitGroup b.StopTimer() - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(b) opts.MemQueueSize = int64(b.N) tcpAddr, _, nsqd := mustStartNSQD(opts) @@ -1642,7 +1640,7 @@ func benchmarkProtocolV2MultiSub(b *testing.B, num int) { var wg sync.WaitGroup b.StopTimer() - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(b) opts.MemQueueSize = int64(b.N) tcpAddr, _, nsqd := mustStartNSQD(opts) diff --git a/nsqd/stats_test.go b/nsqd/stats_test.go index c8834d729..06fa40d80 100644 --- a/nsqd/stats_test.go +++ b/nsqd/stats_test.go @@ -13,7 +13,7 @@ import ( ) func TestStats(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -42,7 +42,7 @@ func TestStats(t *testing.T) { func TestClientAttributes(t *testing.T) { userAgent := "Test User Agent" - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.Verbose = true opts.SnappyEnabled = true diff --git a/nsqd/statsd.go b/nsqd/statsd.go index debbabca8..421730378 100644 --- a/nsqd/statsd.go +++ b/nsqd/statsd.go @@ -27,13 +27,13 @@ func (s Uint64Slice) Less(i, j int) bool { func (n *NSQD) statsdLoop() { var lastMemStats runtime.MemStats var lastStats []TopicStats - ticker := time.NewTicker(n.opts.StatsdInterval) + ticker := time.NewTicker(n.getOpts().StatsdInterval) for { select { case <-n.exitChan: goto exit case <-ticker.C: - client := statsd.NewClient(n.opts.StatsdAddress, n.opts.StatsdPrefix) + client := statsd.NewClient(n.getOpts().StatsdAddress, n.getOpts().StatsdPrefix) err := client.CreateSocket() if err != nil { n.logf("ERROR: failed to create UDP socket to statsd(%s)", client) @@ -114,7 +114,7 @@ func (n *NSQD) statsdLoop() { } lastStats = stats - if n.opts.StatsdMemStats { + if n.getOpts().StatsdMemStats { var memStats runtime.MemStats runtime.ReadMemStats(&memStats) diff --git a/nsqd/test/cluster_test.go b/nsqd/test/cluster_test.go deleted file mode 100644 index b93c9b534..000000000 --- a/nsqd/test/cluster_test.go +++ /dev/null @@ -1,102 +0,0 @@ -package test - -import ( - "fmt" - "os" - "path/filepath" - "reflect" - "runtime" - "strconv" - "testing" - "time" - - "github.com/bitly/nsq/internal/http_api" -) - -func equal(t *testing.T, act, exp interface{}) { - if !reflect.DeepEqual(exp, act) { - _, file, line, _ := runtime.Caller(1) - t.Logf("\033[31m%s:%d:\n\n\texp: %#v\n\n\tgot: %#v\033[39m\n\n", - filepath.Base(file), line, exp, act) - t.FailNow() - } -} - -func TestNsqdToLookupd(t *testing.T) { - topicName := "cluster_test" + strconv.Itoa(int(time.Now().Unix())) - - hostname, err := os.Hostname() - equal(t, err, nil) - - url := fmt.Sprintf("http://127.0.0.1:4151/topic/create?topic=%s", topicName) - _, err = http_api.NegotiateV1("POST", url, nil) - equal(t, err, nil) - - url = fmt.Sprintf("http://127.0.0.1:4151/channel/create?topic=%s&channel=ch", topicName) - _, err = http_api.NegotiateV1("POST", url, nil) - equal(t, err, nil) - - // allow some time for nsqd to push info to nsqlookupd - time.Sleep(350 * time.Millisecond) - - data, err := http_api.NegotiateV1("GET", "http://127.0.0.1:4161/debug", nil) - equal(t, err, nil) - - topicData := data.Get("topic:" + topicName + ":") - producers, _ := topicData.Array() - equal(t, len(producers), 1) - - producer := topicData.GetIndex(0) - equal(t, producer.Get("hostname").MustString(), hostname) - equal(t, producer.Get("broadcast_address").MustString(), "127.0.0.1") - equal(t, producer.Get("tcp_port").MustInt(), 4150) - equal(t, producer.Get("tombstoned").MustBool(), false) - - channelData := data.Get("channel:" + topicName + ":ch") - producers, _ = channelData.Array() - equal(t, len(producers), 1) - - producer = topicData.GetIndex(0) - equal(t, producer.Get("hostname").MustString(), hostname) - equal(t, producer.Get("broadcast_address").MustString(), "127.0.0.1") - equal(t, producer.Get("tcp_port").MustInt(), 4150) - equal(t, producer.Get("tombstoned").MustBool(), false) - - data, err = http_api.NegotiateV1("GET", "http://127.0.0.1:4161/lookup?topic="+topicName, nil) - equal(t, err, nil) - - producers, _ = data.Get("producers").Array() - equal(t, len(producers), 1) - - producer = data.Get("producers").GetIndex(0) - equal(t, producer.Get("hostname").MustString(), hostname) - equal(t, producer.Get("broadcast_address").MustString(), "127.0.0.1") - equal(t, producer.Get("tcp_port").MustInt(), 4150) - - channels, _ := data.Get("channels").Array() - equal(t, len(channels), 1) - - channel := channels[0].(string) - equal(t, channel, "ch") - - data, err = http_api.NegotiateV1("POST", "http://127.0.0.1:4151/topic/delete?topic="+topicName, nil) - equal(t, err, nil) - - // allow some time for nsqd to push info to nsqlookupd - time.Sleep(350 * time.Millisecond) - - data, err = http_api.NegotiateV1("GET", "http://127.0.0.1:4161/lookup?topic="+topicName, nil) - equal(t, err, nil) - - producers, _ = data.Get("producers").Array() - equal(t, len(producers), 0) - - data, err = http_api.NegotiateV1("GET", "http://127.0.0.1:4161/debug", nil) - equal(t, err, nil) - - producers, _ = data.Get("topic:" + topicName + ":").Array() - equal(t, len(producers), 0) - - producers, _ = data.Get("channel:" + topicName + ":ch").Array() - equal(t, len(producers), 0) -} diff --git a/nsqd/test/empty.go b/nsqd/test/empty.go deleted file mode 100644 index ede47886d..000000000 --- a/nsqd/test/empty.go +++ /dev/null @@ -1,7 +0,0 @@ -package test - -// This file exists to allow this directory to be built with go build. -// If this file didn't exist, an error would be thrown by Go 1.3+ about -// no buildable Go files in the directory. -// -// See github.com/bitly/nsq/issues/409 for more information. diff --git a/nsqd/topic.go b/nsqd/topic.go index 925c1529d..7478109e3 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -41,7 +41,7 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi t := &Topic{ name: topicName, channelMap: make(map[string]*Channel), - memoryMsgChan: make(chan *Message, ctx.nsqd.opts.MemQueueSize), + memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize), exitChan: make(chan int), channelUpdateChan: make(chan int), ctx: ctx, @@ -54,13 +54,13 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi t.backend = newDummyBackendQueue() } else { t.backend = newDiskQueue(topicName, - ctx.nsqd.opts.DataPath, - ctx.nsqd.opts.MaxBytesPerFile, + ctx.nsqd.getOpts().DataPath, + ctx.nsqd.getOpts().MaxBytesPerFile, int32(minValidMsgLength), - int32(ctx.nsqd.opts.MaxMsgSize), - ctx.nsqd.opts.SyncEvery, - ctx.nsqd.opts.SyncTimeout, - ctx.nsqd.opts.Logger) + int32(ctx.nsqd.getOpts().MaxMsgSize), + ctx.nsqd.getOpts().SyncEvery, + ctx.nsqd.getOpts().SyncTimeout, + ctx.nsqd.getOpts().Logger) } t.waitGroup.Wrap(func() { t.messagePump() }) @@ -395,8 +395,8 @@ func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile { } if latencyStream == nil { latencyStream = quantile.New( - t.ctx.nsqd.opts.E2EProcessingLatencyWindowTime, - t.ctx.nsqd.opts.E2EProcessingLatencyPercentiles) + t.ctx.nsqd.getOpts().E2EProcessingLatencyWindowTime, + t.ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles) } latencyStream.Merge(c.e2eProcessingLatencyStream) } diff --git a/nsqd/topic_test.go b/nsqd/topic_test.go index 3d637b0a5..ef540dc29 100644 --- a/nsqd/topic_test.go +++ b/nsqd/topic_test.go @@ -13,7 +13,7 @@ import ( ) func TestGetTopic(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) _, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -32,7 +32,7 @@ func TestGetTopic(t *testing.T) { } func TestGetChannel(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) _, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -60,7 +60,7 @@ func (d *errorBackendQueue) Depth() int64 { return 0 } func (d *errorBackendQueue) Empty() error { return nil } func TestHealth(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.MemQueueSize = 2 _, httpAddr, nsqd := mustStartNSQD(opts) @@ -96,7 +96,7 @@ func TestHealth(t *testing.T) { } func TestDeletes(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) _, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -121,7 +121,7 @@ func TestDeletes(t *testing.T) { } func TestDeleteLast(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) _, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -144,7 +144,7 @@ func TestDeleteLast(t *testing.T) { } func TestPause(t *testing.T) { - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) _, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -179,7 +179,7 @@ func TestPause(t *testing.T) { func BenchmarkTopicPut(b *testing.B) { b.StopTimer() topicName := "bench_topic_put" + strconv.Itoa(b.N) - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(b) opts.MemQueueSize = int64(b.N) _, _, nsqd := mustStartNSQD(opts) @@ -198,7 +198,7 @@ func BenchmarkTopicToChannelPut(b *testing.B) { b.StopTimer() topicName := "bench_topic_to_channel_put" + strconv.Itoa(b.N) channelName := "bench" - opts := NewNSQDOptions() + opts := NewOptions() opts.Logger = newTestLogger(b) opts.MemQueueSize = int64(b.N) _, _, nsqd := mustStartNSQD(opts) diff --git a/nsqlookupd/http.go b/nsqlookupd/http.go index a1259111a..f3f5e9d2b 100644 --- a/nsqlookupd/http.go +++ b/nsqlookupd/http.go @@ -4,136 +4,79 @@ import ( "fmt" "io" "net/http" - httpprof "net/http/pprof" + "net/http/pprof" "sync/atomic" "github.com/bitly/nsq/internal/http_api" "github.com/bitly/nsq/internal/protocol" "github.com/bitly/nsq/internal/version" + "github.com/julienschmidt/httprouter" ) type httpServer struct { - ctx *Context + ctx *Context + router http.Handler } -func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { - err := s.v1Router(w, req) - if err == nil { - return - } - - err = s.deprecatedRouter(w, req) - if err == nil { - return +func newHTTPServer(ctx *Context) *httpServer { + router := httprouter.New() + router.HandleMethodNotAllowed = true + s := &httpServer{ + ctx: ctx, + router: router, } - err = s.debugRouter(w, req) - if err != nil { - s.ctx.nsqlookupd.logf("ERROR: %s", err) - http_api.Respond(w, 404, "NOT_FOUND", nil) - } + // v1 negotiate + router.Handle("GET", "/ping", s.pingHandler) + router.Handle("GET", "/debug", http_api.NegotiateVersion(s.doDebug)) + router.Handle("GET", "/lookup", http_api.NegotiateVersion(s.doLookup)) + router.Handle("GET", "/topics", http_api.NegotiateVersion(s.doTopics)) + router.Handle("GET", "/channels", http_api.NegotiateVersion(s.doChannels)) + router.Handle("GET", "/nodes", http_api.NegotiateVersion(s.doNodes)) + + // only v1 + router.Handle("POST", "/topic/create", http_api.V1(s.doCreateTopic)) + router.Handle("POST", "/topic/delete", http_api.V1(s.doDeleteTopic)) + router.Handle("POST", "/channel/create", http_api.V1(s.doCreateChannel)) + router.Handle("POST", "/channel/delete", http_api.V1(s.doDeleteChannel)) + router.Handle("POST", "/topic/tombstone", http_api.V1(s.doTombstoneTopicProducer)) + + // deprecated, v1 negotiate + router.Handle("GET", "/info", http_api.NegotiateVersion(s.doInfo)) + router.Handle("POST", "/create_topic", http_api.NegotiateVersion(s.doCreateTopic)) + router.Handle("POST", "/delete_topic", http_api.NegotiateVersion(s.doDeleteTopic)) + router.Handle("POST", "/create_channel", http_api.NegotiateVersion(s.doCreateChannel)) + router.Handle("POST", "/delete_channel", http_api.NegotiateVersion(s.doDeleteChannel)) + router.Handle("POST", "/tombstone_topic_producer", http_api.NegotiateVersion(s.doTombstoneTopicProducer)) + router.Handle("GET", "/create_topic", http_api.NegotiateVersion(s.doCreateTopic)) + router.Handle("GET", "/delete_topic", http_api.NegotiateVersion(s.doDeleteTopic)) + router.Handle("GET", "/create_channel", http_api.NegotiateVersion(s.doCreateChannel)) + router.Handle("GET", "/delete_channel", http_api.NegotiateVersion(s.doDeleteChannel)) + router.Handle("GET", "/tombstone_topic_producer", http_api.NegotiateVersion(s.doTombstoneTopicProducer)) + + // debug + router.HandlerFunc("GET", "/debug/pprof", pprof.Index) + router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline) + router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol) + router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile) + router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap")) + router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine")) + router.Handler("GET", "/debug/pprof/block", pprof.Handler("block")) + router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate")) + + return s } -func (s *httpServer) debugRouter(w http.ResponseWriter, req *http.Request) error { - switch req.URL.Path { - case "/debug": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doDebug(req) }) - case "/debug/pprof": - httpprof.Index(w, req) - case "/debug/pprof/cmdline": - httpprof.Cmdline(w, req) - case "/debug/pprof/symbol": - httpprof.Symbol(w, req) - case "/debug/pprof/heap": - httpprof.Handler("heap").ServeHTTP(w, req) - case "/debug/pprof/goroutine": - httpprof.Handler("goroutine").ServeHTTP(w, req) - case "/debug/pprof/profile": - httpprof.Profile(w, req) - case "/debug/pprof/block": - httpprof.Handler("block").ServeHTTP(w, req) - case "/debug/pprof/threadcreate": - httpprof.Handler("threadcreate").ServeHTTP(w, req) - default: - return fmt.Errorf("404 %s", req.URL.Path) - } - return nil -} - -func (s *httpServer) v1Router(w http.ResponseWriter, req *http.Request) error { - switch req.URL.Path { - case "/ping": - s.pingHandler(w, req) - - case "/lookup": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doLookup(req) }) - case "/topics": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doTopics(req) }) - case "/channels": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doChannels(req) }) - case "/nodes": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doNodes(req) }) - - case "/topic/create": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doCreateTopic(req) })) - case "/topic/delete": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doDeleteTopic(req) })) - case "/topic/tombstone": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doTombstoneTopicProducer(req) })) - - case "/channel/create": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doCreateChannel(req) })) - case "/channel/delete": - http_api.V1Wrapper(w, req, http_api.RequirePOST(req, - func() (interface{}, error) { return s.doDeleteChannel(req) })) - - default: - return fmt.Errorf("404 %s", req.URL.Path) - } - return nil -} - -func (s *httpServer) deprecatedRouter(w http.ResponseWriter, req *http.Request) error { - switch req.URL.Path { - case "/info": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doInfo(req) }) - case "/delete_topic": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doDeleteTopic(req) }) - case "/delete_channel": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doDeleteChannel(req) }) - case "/tombstone_topic_producer": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doTombstoneTopicProducer(req) }) - case "/create_topic": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doCreateTopic(req) }) - case "/create_channel": - http_api.NegotiateVersionWrapper(w, req, - func() (interface{}, error) { return s.doCreateChannel(req) }) - default: - return fmt.Errorf("404 %s", req.URL.Path) - } - return nil +func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { + s.router.ServeHTTP(w, req) } -func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request) { +func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { w.Header().Set("Content-Length", "2") io.WriteString(w, "OK") } -func (s *httpServer) doInfo(req *http.Request) (interface{}, error) { +func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { return struct { Version string `json:"version"` }{ @@ -141,14 +84,14 @@ func (s *httpServer) doInfo(req *http.Request) (interface{}, error) { }, nil } -func (s *httpServer) doTopics(req *http.Request) (interface{}, error) { +func (s *httpServer) doTopics(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { topics := s.ctx.nsqlookupd.DB.FindRegistrations("topic", "*", "").Keys() return map[string]interface{}{ "topics": topics, }, nil } -func (s *httpServer) doChannels(req *http.Request) (interface{}, error) { +func (s *httpServer) doChannels(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { return nil, http_api.Err{400, "INVALID_REQUEST"} @@ -165,7 +108,7 @@ func (s *httpServer) doChannels(req *http.Request) (interface{}, error) { }, nil } -func (s *httpServer) doLookup(req *http.Request) (interface{}, error) { +func (s *httpServer) doLookup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { return nil, http_api.Err{400, "INVALID_REQUEST"} @@ -191,7 +134,7 @@ func (s *httpServer) doLookup(req *http.Request) (interface{}, error) { }, nil } -func (s *httpServer) doCreateTopic(req *http.Request) (interface{}, error) { +func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { return nil, http_api.Err{400, "INVALID_REQUEST"} @@ -213,7 +156,7 @@ func (s *httpServer) doCreateTopic(req *http.Request) (interface{}, error) { return nil, nil } -func (s *httpServer) doDeleteTopic(req *http.Request) (interface{}, error) { +func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { return nil, http_api.Err{400, "INVALID_REQUEST"} @@ -239,7 +182,7 @@ func (s *httpServer) doDeleteTopic(req *http.Request) (interface{}, error) { return nil, nil } -func (s *httpServer) doTombstoneTopicProducer(req *http.Request) (interface{}, error) { +func (s *httpServer) doTombstoneTopicProducer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { return nil, http_api.Err{400, "INVALID_REQUEST"} @@ -267,7 +210,7 @@ func (s *httpServer) doTombstoneTopicProducer(req *http.Request) (interface{}, e return nil, nil } -func (s *httpServer) doCreateChannel(req *http.Request) (interface{}, error) { +func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { return nil, http_api.Err{400, "INVALID_REQUEST"} @@ -289,7 +232,7 @@ func (s *httpServer) doCreateChannel(req *http.Request) (interface{}, error) { return nil, nil } -func (s *httpServer) doDeleteChannel(req *http.Request) (interface{}, error) { +func (s *httpServer) doDeleteChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { return nil, http_api.Err{400, "INVALID_REQUEST"} @@ -324,7 +267,7 @@ type node struct { Topics []string `json:"topics"` } -func (s *httpServer) doNodes(req *http.Request) (interface{}, error) { +func (s *httpServer) doNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { // dont filter out tombstoned nodes producers := s.ctx.nsqlookupd.DB.FindProducers("client", "", "").FilterByActive( s.ctx.nsqlookupd.opts.InactiveProducerTimeout, 0) @@ -361,7 +304,7 @@ func (s *httpServer) doNodes(req *http.Request) (interface{}, error) { }, nil } -func (s *httpServer) doDebug(req *http.Request) (interface{}, error) { +func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { s.ctx.nsqlookupd.DB.RLock() defer s.ctx.nsqlookupd.DB.RUnlock() diff --git a/nsqlookupd/nsqlookupd.go b/nsqlookupd/nsqlookupd.go index 81256e115..3cfcf8be6 100644 --- a/nsqlookupd/nsqlookupd.go +++ b/nsqlookupd/nsqlookupd.go @@ -14,14 +14,14 @@ import ( type NSQLookupd struct { sync.RWMutex - opts *nsqlookupdOptions + opts *Options tcpListener net.Listener httpListener net.Listener waitGroup util.WaitGroupWrapper DB *RegistrationDB } -func NewNSQLookupd(opts *nsqlookupdOptions) *NSQLookupd { +func New(opts *Options) *NSQLookupd { n := &NSQLookupd{ opts: opts, DB: NewRegistrationDB(), @@ -61,9 +61,9 @@ func (l *NSQLookupd) Main() { l.Lock() l.httpListener = httpListener l.Unlock() - httpServer := &httpServer{ctx: ctx} + httpServer := newHTTPServer(ctx) l.waitGroup.Wrap(func() { - http_api.Serve(httpListener, httpServer, l.opts.Logger, "HTTP") + http_api.Serve(httpListener, httpServer, "HTTP", l.opts.Logger) }) } diff --git a/nsqlookupd/nsqlookupd_test.go b/nsqlookupd/nsqlookupd_test.go index ea27bc090..f75ab1199 100644 --- a/nsqlookupd/nsqlookupd_test.go +++ b/nsqlookupd/nsqlookupd_test.go @@ -40,14 +40,12 @@ func newTestLogger(tbl tbLog) logger { return &testLogger{tbl} } -func mustStartLookupd(opts *nsqlookupdOptions) (*net.TCPAddr, *net.TCPAddr, *NSQLookupd) { +func mustStartLookupd(opts *Options) (*net.TCPAddr, *net.TCPAddr, *NSQLookupd) { opts.TCPAddress = "127.0.0.1:0" opts.HTTPAddress = "127.0.0.1:0" - nsqlookupd := NewNSQLookupd(opts) + nsqlookupd := New(opts) nsqlookupd.Main() - return nsqlookupd.tcpListener.Addr().(*net.TCPAddr), - nsqlookupd.httpListener.Addr().(*net.TCPAddr), - nsqlookupd + return nsqlookupd.RealTCPAddr(), nsqlookupd.RealHTTPAddr(), nsqlookupd } func mustConnectLookupd(t *testing.T, tcpAddr *net.TCPAddr) net.Conn { @@ -74,7 +72,7 @@ func identify(t *testing.T, conn net.Conn, address string, tcpPort int, httpPort } func TestBasicLookupd(t *testing.T) { - opts := NewNSQLookupdOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) tcpAddr, httpAddr, nsqlookupd := mustStartLookupd(opts) defer nsqlookupd.Exit() @@ -169,7 +167,7 @@ func TestBasicLookupd(t *testing.T) { } func TestChannelUnregister(t *testing.T) { - opts := NewNSQLookupdOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) tcpAddr, httpAddr, nsqlookupd := mustStartLookupd(opts) defer nsqlookupd.Exit() @@ -219,7 +217,7 @@ func TestChannelUnregister(t *testing.T) { } func TestTombstoneRecover(t *testing.T) { - opts := NewNSQLookupdOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.TombstoneLifetime = 50 * time.Millisecond tcpAddr, httpAddr, nsqlookupd := mustStartLookupd(opts) @@ -268,7 +266,7 @@ func TestTombstoneRecover(t *testing.T) { } func TestTombstoneUnregister(t *testing.T) { - opts := NewNSQLookupdOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.TombstoneLifetime = 50 * time.Millisecond tcpAddr, httpAddr, nsqlookupd := mustStartLookupd(opts) @@ -310,7 +308,7 @@ func TestTombstoneUnregister(t *testing.T) { } func TestInactiveNodes(t *testing.T) { - opts := NewNSQLookupdOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) opts.InactiveProducerTimeout = 200 * time.Millisecond tcpAddr, httpAddr, nsqlookupd := mustStartLookupd(opts) @@ -342,7 +340,7 @@ func TestInactiveNodes(t *testing.T) { } func TestTombstonedNodes(t *testing.T) { - opts := NewNSQLookupdOptions() + opts := NewOptions() opts.Logger = newTestLogger(t) tcpAddr, httpAddr, nsqlookupd := mustStartLookupd(opts) defer nsqlookupd.Exit() diff --git a/nsqlookupd/options.go b/nsqlookupd/options.go index 80cfea9ca..2a3c826c3 100644 --- a/nsqlookupd/options.go +++ b/nsqlookupd/options.go @@ -6,7 +6,7 @@ import ( "time" ) -type nsqlookupdOptions struct { +type Options struct { Verbose bool `flag:"verbose"` TCPAddress string `flag:"tcp-address"` @@ -19,13 +19,13 @@ type nsqlookupdOptions struct { Logger logger } -func NewNSQLookupdOptions() *nsqlookupdOptions { +func NewOptions() *Options { hostname, err := os.Hostname() if err != nil { log.Fatal(err) } - return &nsqlookupdOptions{ + return &Options{ TCPAddress: "0.0.0.0:4160", HTTPAddress: "0.0.0.0:4161", BroadcastAddress: hostname, diff --git a/test.sh b/test.sh index f7b5caca6..419958b9e 100755 --- a/test.sh +++ b/test.sh @@ -1,43 +1,15 @@ #!/bin/bash set -e -# build and run nsqlookupd -LOOKUP_LOGFILE=$(mktemp -t nsqlookupd.XXXXXXX) -cmd="apps/nsqlookupd/nsqlookupd --broadcast-address=127.0.0.1" -echo "building and starting $cmd" -echo " logging to $LOOKUP_LOGFILE" -go build -o apps/nsqlookupd/nsqlookupd ./apps/nsqlookupd/ -$cmd >$LOOKUP_LOGFILE 2>&1 & -LOOKUPD_PID=$! - -# build and run nsqd configured to use our lookupd above -NSQD_LOGFILE=$(mktemp -t nsqd.XXXXXXX) -cmd="apps/nsqd/nsqd --data-path=/tmp --broadcast-address=127.0.0.1 --lookupd-tcp-address=127.0.0.1:4160 --tls-cert=nsqd/test/certs/cert.pem --tls-key=nsqd/test/certs/key.pem --tls-min-version=tls1.0" -echo "building and starting $cmd" -echo " logging to $NSQD_LOGFILE" -go build -o apps/nsqd/nsqd ./apps/nsqd -$cmd >$NSQD_LOGFILE 2>&1 & -NSQD_PID=$! - -sleep 0.3 - -cleanup() { - echo "killing nsqd PID $NSQD_PID" - kill -s TERM $NSQD_PID || cat $NSQD_LOGFILE - echo "killing nsqlookupd PID $LOOKUPD_PID" - kill -s TERM $LOOKUPD_PID || cat $LOOKUP_LOGFILE -} -trap cleanup INT TERM EXIT - -go test -timeout 60s ./... -GOMAXPROCS=4 go test -timeout 60s -race ./... +GOMAXPROCS=1 go test -timeout 90s ./... +GOMAXPROCS=4 go test -timeout 90s -race ./... # no tests, but a build is something for dir in $(find apps bench -maxdepth 1 -type d) nsqadmin; do - if grep -q '^package main$' $dir/*.go ; then + if grep -q '^package main$' $dir/*.go 2>/dev/null; then echo "building $dir" go build -o $dir/$(basename $dir) ./$dir else - echo "WARNING: skipping go build in $dir" + echo "(skipped $dir)" fi done