diff --git a/nsqadmin/context.go b/nsqadmin/context.go deleted file mode 100644 index 66dbd5d0a..000000000 --- a/nsqadmin/context.go +++ /dev/null @@ -1,5 +0,0 @@ -package nsqadmin - -type Context struct { - nsqadmin *NSQAdmin -} diff --git a/nsqadmin/http.go b/nsqadmin/http.go index bc94205b9..9a5e28022 100644 --- a/nsqadmin/http.go +++ b/nsqadmin/http.go @@ -48,30 +48,30 @@ func NewSingleHostReverseProxy(target *url.URL, connectTimeout time.Duration, re } type httpServer struct { - ctx *Context + nsqadmin *NSQAdmin router http.Handler client *http_api.Client ci *clusterinfo.ClusterInfo basePath string } -func NewHTTPServer(ctx *Context) *httpServer { - log := http_api.Log(ctx.nsqadmin.logf) +func NewHTTPServer(nsqadmin *NSQAdmin) *httpServer { + log := http_api.Log(nsqadmin.logf) - client := http_api.NewClient(ctx.nsqadmin.httpClientTLSConfig, ctx.nsqadmin.getOpts().HTTPClientConnectTimeout, - ctx.nsqadmin.getOpts().HTTPClientRequestTimeout) + client := http_api.NewClient(nsqadmin.httpClientTLSConfig, nsqadmin.getOpts().HTTPClientConnectTimeout, + nsqadmin.getOpts().HTTPClientRequestTimeout) router := httprouter.New() router.HandleMethodNotAllowed = true - router.PanicHandler = http_api.LogPanicHandler(ctx.nsqadmin.logf) - router.NotFound = http_api.LogNotFoundHandler(ctx.nsqadmin.logf) - router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqadmin.logf) + router.PanicHandler = http_api.LogPanicHandler(nsqadmin.logf) + router.NotFound = http_api.LogNotFoundHandler(nsqadmin.logf) + router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(nsqadmin.logf) s := &httpServer{ - ctx: ctx, + nsqadmin: nsqadmin, router: router, client: client, - ci: clusterinfo.New(ctx.nsqadmin.logf, client), - basePath: ctx.nsqadmin.getOpts().BasePath, + ci: clusterinfo.New(nsqadmin.logf, client), + basePath: nsqadmin.getOpts().BasePath, } bp := func(p string) string { @@ -91,9 +91,9 @@ func NewHTTPServer(ctx *Context) *httpServer { router.Handle("GET", bp("/static/:asset"), http_api.Decorate(s.staticAssetHandler, log, http_api.PlainText)) router.Handle("GET", bp("/fonts/:asset"), http_api.Decorate(s.staticAssetHandler, log, http_api.PlainText)) - if s.ctx.nsqadmin.getOpts().ProxyGraphite { - proxy := NewSingleHostReverseProxy(ctx.nsqadmin.graphiteURL, ctx.nsqadmin.getOpts().HTTPClientConnectTimeout, - ctx.nsqadmin.getOpts().HTTPClientRequestTimeout) + if s.nsqadmin.getOpts().ProxyGraphite { + proxy := NewSingleHostReverseProxy(nsqadmin.graphiteURL, nsqadmin.getOpts().HTTPClientConnectTimeout, + nsqadmin.getOpts().HTTPClientRequestTimeout) router.Handler("GET", bp("/render"), proxy) } @@ -147,14 +147,14 @@ func (s *httpServer) indexHandler(w http.ResponseWriter, req *http.Request, ps h IsAdmin bool }{ Version: version.Binary, - ProxyGraphite: s.ctx.nsqadmin.getOpts().ProxyGraphite, - GraphEnabled: s.ctx.nsqadmin.getOpts().GraphiteURL != "", - GraphiteURL: s.ctx.nsqadmin.getOpts().GraphiteURL, - StatsdInterval: int(s.ctx.nsqadmin.getOpts().StatsdInterval / time.Second), - StatsdCounterFormat: s.ctx.nsqadmin.getOpts().StatsdCounterFormat, - StatsdGaugeFormat: s.ctx.nsqadmin.getOpts().StatsdGaugeFormat, - StatsdPrefix: s.ctx.nsqadmin.getOpts().StatsdPrefix, - NSQLookupd: s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, + ProxyGraphite: s.nsqadmin.getOpts().ProxyGraphite, + GraphEnabled: s.nsqadmin.getOpts().GraphiteURL != "", + GraphiteURL: s.nsqadmin.getOpts().GraphiteURL, + StatsdInterval: int(s.nsqadmin.getOpts().StatsdInterval / time.Second), + StatsdCounterFormat: s.nsqadmin.getOpts().StatsdCounterFormat, + StatsdGaugeFormat: s.nsqadmin.getOpts().StatsdGaugeFormat, + StatsdPrefix: s.nsqadmin.getOpts().StatsdPrefix, + NSQLookupd: s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, IsAdmin: s.isAuthorizedAdminRequest(req), }) @@ -203,33 +203,33 @@ func (s *httpServer) topicsHandler(w http.ResponseWriter, req *http.Request, ps } var topics []string - if len(s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses) != 0 { - topics, err = s.ci.GetLookupdTopics(s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses) + if len(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses) != 0 { + topics, err = s.ci.GetLookupdTopics(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses) } else { - topics, err = s.ci.GetNSQDTopics(s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) + topics, err = s.ci.GetNSQDTopics(s.nsqadmin.getOpts().NSQDHTTPAddresses) } if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { - s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get topics - %s", err) + s.nsqadmin.logf(LOG_ERROR, "failed to get topics - %s", err) return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)} } - s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) + s.nsqadmin.logf(LOG_WARN, "%s", err) messages = append(messages, pe.Error()) } inactive, _ := reqParams.Get("inactive") if inactive == "true" { topicChannelMap := make(map[string][]string) - if len(s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses) == 0 { + if len(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses) == 0 { goto respond } for _, topicName := range topics { producers, _ := s.ci.GetLookupdTopicProducers( - topicName, s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses) + topicName, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses) if len(producers) == 0 { topicChannels, _ := s.ci.GetLookupdTopicChannels( - topicName, s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses) + topicName, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses) topicChannelMap[topicName] = topicChannels } } @@ -252,25 +252,25 @@ func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps h topicName := ps.ByName("topic") producers, err := s.ci.GetTopicProducers(topicName, - s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, - s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) + s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, + s.nsqadmin.getOpts().NSQDHTTPAddresses) if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { - s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get topic producers - %s", err) + s.nsqadmin.logf(LOG_ERROR, "failed to get topic producers - %s", err) return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)} } - s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) + s.nsqadmin.logf(LOG_WARN, "%s", err) messages = append(messages, pe.Error()) } topicStats, _, err := s.ci.GetNSQDStats(producers, topicName, "", false) if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { - s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get topic metadata - %s", err) + s.nsqadmin.logf(LOG_ERROR, "failed to get topic metadata - %s", err) return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)} } - s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) + s.nsqadmin.logf(LOG_WARN, "%s", err) messages = append(messages, pe.Error()) } @@ -292,25 +292,25 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps channelName := ps.ByName("channel") producers, err := s.ci.GetTopicProducers(topicName, - s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, - s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) + s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, + s.nsqadmin.getOpts().NSQDHTTPAddresses) if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { - s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get topic producers - %s", err) + s.nsqadmin.logf(LOG_ERROR, "failed to get topic producers - %s", err) return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)} } - s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) + s.nsqadmin.logf(LOG_WARN, "%s", err) messages = append(messages, pe.Error()) } _, channelStats, err := s.ci.GetNSQDStats(producers, topicName, channelName, true) if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { - s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get channel metadata - %s", err) + s.nsqadmin.logf(LOG_ERROR, "failed to get channel metadata - %s", err) return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)} } - s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) + s.nsqadmin.logf(LOG_WARN, "%s", err) messages = append(messages, pe.Error()) } @@ -323,14 +323,14 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps func (s *httpServer) nodesHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { var messages []string - producers, err := s.ci.GetProducers(s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) + producers, err := s.ci.GetProducers(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.nsqadmin.getOpts().NSQDHTTPAddresses) if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { - s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get nodes - %s", err) + s.nsqadmin.logf(LOG_ERROR, "failed to get nodes - %s", err) return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)} } - s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) + s.nsqadmin.logf(LOG_WARN, "%s", err) messages = append(messages, pe.Error()) } @@ -345,14 +345,14 @@ func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps ht node := ps.ByName("node") - producers, err := s.ci.GetProducers(s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) + producers, err := s.ci.GetProducers(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.nsqadmin.getOpts().NSQDHTTPAddresses) if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { - s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get producers - %s", err) + s.nsqadmin.logf(LOG_ERROR, "failed to get producers - %s", err) return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)} } - s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) + s.nsqadmin.logf(LOG_WARN, "%s", err) messages = append(messages, pe.Error()) } @@ -363,7 +363,7 @@ func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps ht topicStats, _, err := s.ci.GetNSQDStats(clusterinfo.Producers{producer}, "", "", true) if err != nil { - s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get nsqd stats - %s", err) + s.nsqadmin.logf(LOG_ERROR, "failed to get nsqd stats - %s", err) return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)} } @@ -409,14 +409,14 @@ func (s *httpServer) tombstoneNodeForTopicHandler(w http.ResponseWriter, req *ht } err = s.ci.TombstoneNodeForTopic(body.Topic, node, - s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses) + s.nsqadmin.getOpts().NSQLookupdHTTPAddresses) if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { - s.ctx.nsqadmin.logf(LOG_ERROR, "failed to tombstone node for topic - %s", err) + s.nsqadmin.logf(LOG_ERROR, "failed to tombstone node for topic - %s", err) return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)} } - s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) + s.nsqadmin.logf(LOG_WARN, "%s", err) messages = append(messages, pe.Error()) } @@ -453,14 +453,14 @@ func (s *httpServer) createTopicChannelHandler(w http.ResponseWriter, req *http. } err = s.ci.CreateTopicChannel(body.Topic, body.Channel, - s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses) + s.nsqadmin.getOpts().NSQLookupdHTTPAddresses) if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { - s.ctx.nsqadmin.logf(LOG_ERROR, "failed to create topic/channel - %s", err) + s.nsqadmin.logf(LOG_ERROR, "failed to create topic/channel - %s", err) return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)} } - s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) + s.nsqadmin.logf(LOG_WARN, "%s", err) messages = append(messages, pe.Error()) } @@ -484,15 +484,15 @@ func (s *httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request topicName := ps.ByName("topic") err := s.ci.DeleteTopic(topicName, - s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, - s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) + s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, + s.nsqadmin.getOpts().NSQDHTTPAddresses) if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { - s.ctx.nsqadmin.logf(LOG_ERROR, "failed to delete topic - %s", err) + s.nsqadmin.logf(LOG_ERROR, "failed to delete topic - %s", err) return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)} } - s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) + s.nsqadmin.logf(LOG_WARN, "%s", err) messages = append(messages, pe.Error()) } @@ -514,15 +514,15 @@ func (s *httpServer) deleteChannelHandler(w http.ResponseWriter, req *http.Reque channelName := ps.ByName("channel") err := s.ci.DeleteChannel(topicName, channelName, - s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, - s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) + s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, + s.nsqadmin.getOpts().NSQDHTTPAddresses) if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { - s.ctx.nsqadmin.logf(LOG_ERROR, "failed to delete channel - %s", err) + s.nsqadmin.logf(LOG_ERROR, "failed to delete channel - %s", err) return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)} } - s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) + s.nsqadmin.logf(LOG_WARN, "%s", err) messages = append(messages, pe.Error()) } @@ -564,42 +564,42 @@ func (s *httpServer) topicChannelAction(req *http.Request, topicName string, cha case "pause": if channelName != "" { err = s.ci.PauseChannel(topicName, channelName, - s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, - s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) + s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, + s.nsqadmin.getOpts().NSQDHTTPAddresses) s.notifyAdminAction("pause_channel", topicName, channelName, "", req) } else { err = s.ci.PauseTopic(topicName, - s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, - s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) + s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, + s.nsqadmin.getOpts().NSQDHTTPAddresses) s.notifyAdminAction("pause_topic", topicName, "", "", req) } case "unpause": if channelName != "" { err = s.ci.UnPauseChannel(topicName, channelName, - s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, - s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) + s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, + s.nsqadmin.getOpts().NSQDHTTPAddresses) s.notifyAdminAction("unpause_channel", topicName, channelName, "", req) } else { err = s.ci.UnPauseTopic(topicName, - s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, - s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) + s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, + s.nsqadmin.getOpts().NSQDHTTPAddresses) s.notifyAdminAction("unpause_topic", topicName, "", "", req) } case "empty": if channelName != "" { err = s.ci.EmptyChannel(topicName, channelName, - s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, - s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) + s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, + s.nsqadmin.getOpts().NSQDHTTPAddresses) s.notifyAdminAction("empty_channel", topicName, channelName, "", req) } else { err = s.ci.EmptyTopic(topicName, - s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, - s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) + s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, + s.nsqadmin.getOpts().NSQDHTTPAddresses) s.notifyAdminAction("empty_topic", topicName, "", "", req) } @@ -610,10 +610,10 @@ func (s *httpServer) topicChannelAction(req *http.Request, topicName string, cha if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { - s.ctx.nsqadmin.logf(LOG_ERROR, "failed to %s topic/channel - %s", body.Action, err) + s.nsqadmin.logf(LOG_ERROR, "failed to %s topic/channel - %s", body.Action, err) return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)} } - s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) + s.nsqadmin.logf(LOG_WARN, "%s", err) messages = append(messages, pe.Error()) } @@ -633,24 +633,24 @@ func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request, ps var messages []string stats := make(map[string]*counterStats) - producers, err := s.ci.GetProducers(s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses) + producers, err := s.ci.GetProducers(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.nsqadmin.getOpts().NSQDHTTPAddresses) if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { - s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get counter producer list - %s", err) + s.nsqadmin.logf(LOG_ERROR, "failed to get counter producer list - %s", err) return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)} } - s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) + s.nsqadmin.logf(LOG_WARN, "%s", err) messages = append(messages, pe.Error()) } _, channelStats, err := s.ci.GetNSQDStats(producers, "", "", false) if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { - s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get nsqd stats - %s", err) + s.nsqadmin.logf(LOG_ERROR, "failed to get nsqd stats - %s", err) return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)} } - s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) + s.nsqadmin.logf(LOG_WARN, "%s", err) messages = append(messages, pe.Error()) } @@ -693,14 +693,14 @@ func (s *httpServer) graphiteHandler(w http.ResponseWriter, req *http.Request, p } params := url.Values{} - params.Set("from", fmt.Sprintf("-%dsec", s.ctx.nsqadmin.getOpts().StatsdInterval*2/time.Second)) - params.Set("until", fmt.Sprintf("-%dsec", s.ctx.nsqadmin.getOpts().StatsdInterval/time.Second)) + params.Set("from", fmt.Sprintf("-%dsec", s.nsqadmin.getOpts().StatsdInterval*2/time.Second)) + params.Set("until", fmt.Sprintf("-%dsec", s.nsqadmin.getOpts().StatsdInterval/time.Second)) params.Set("format", "json") params.Set("target", target) query := fmt.Sprintf("/render?%s", params.Encode()) - url := s.ctx.nsqadmin.getOpts().GraphiteURL + query + url := s.nsqadmin.getOpts().GraphiteURL + query - s.ctx.nsqadmin.logf(LOG_INFO, "GRAPHITE: %s", url) + s.nsqadmin.logf(LOG_INFO, "GRAPHITE: %s", url) var response []struct { Target string `json:"target"` @@ -708,7 +708,7 @@ func (s *httpServer) graphiteHandler(w http.ResponseWriter, req *http.Request, p } err = s.client.GETV1(url, &response) if err != nil { - s.ctx.nsqadmin.logf(LOG_ERROR, "graphite request failed - %s", err) + s.nsqadmin.logf(LOG_ERROR, "graphite request failed - %s", err) return nil, http_api.Err{500, "INTERNAL_ERROR"} } @@ -717,7 +717,7 @@ func (s *httpServer) graphiteHandler(w http.ResponseWriter, req *http.Request, p if rate < 0 { rateStr = "N/A" } else { - rateDivisor := s.ctx.nsqadmin.getOpts().StatsdInterval / time.Second + rateDivisor := s.nsqadmin.getOpts().StatsdInterval / time.Second rateStr = fmt.Sprintf("%.2f", rate/float64(rateDivisor)) } return struct { @@ -728,17 +728,17 @@ func (s *httpServer) graphiteHandler(w http.ResponseWriter, req *http.Request, p func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { opt := ps.ByName("opt") - allowConfigFromCIDR := s.ctx.nsqadmin.getOpts().AllowConfigFromCIDR + allowConfigFromCIDR := s.nsqadmin.getOpts().AllowConfigFromCIDR if allowConfigFromCIDR != "" { _, ipnet, _ := net.ParseCIDR(allowConfigFromCIDR) addr, _, err := net.SplitHostPort(req.RemoteAddr) if err != nil { - s.ctx.nsqadmin.logf(LOG_ERROR, "failed to parse RemoteAddr %s", req.RemoteAddr) + s.nsqadmin.logf(LOG_ERROR, "failed to parse RemoteAddr %s", req.RemoteAddr) return nil, http_api.Err{400, "INVALID_REMOTE_ADDR"} } ip := net.ParseIP(addr) if ip == nil { - s.ctx.nsqadmin.logf(LOG_ERROR, "failed to parse RemoteAddr %s", req.RemoteAddr) + s.nsqadmin.logf(LOG_ERROR, "failed to parse RemoteAddr %s", req.RemoteAddr) return nil, http_api.Err{400, "INVALID_REMOTE_ADDR"} } if !ipnet.Contains(ip) { @@ -758,7 +758,7 @@ func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httpr return nil, http_api.Err{413, "INVALID_VALUE"} } - opts := *s.ctx.nsqadmin.getOpts() + opts := *s.nsqadmin.getOpts() switch opt { case "nsqlookupd_http_addresses": err := json.Unmarshal(body, &opts.NSQLookupdHTTPAddresses) @@ -775,10 +775,10 @@ func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httpr default: return nil, http_api.Err{400, "INVALID_OPTION"} } - s.ctx.nsqadmin.swapOpts(&opts) + s.nsqadmin.swapOpts(&opts) } - v, ok := getOptByCfgName(s.ctx.nsqadmin.getOpts(), opt) + v, ok := getOptByCfgName(s.nsqadmin.getOpts(), opt) if !ok { return nil, http_api.Err{400, "INVALID_OPTION"} } @@ -787,11 +787,11 @@ func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httpr } func (s *httpServer) isAuthorizedAdminRequest(req *http.Request) bool { - adminUsers := s.ctx.nsqadmin.getOpts().AdminUsers + adminUsers := s.nsqadmin.getOpts().AdminUsers if len(adminUsers) == 0 { return true } - aclHttpHeader := s.ctx.nsqadmin.getOpts().AclHttpHeader + aclHttpHeader := s.nsqadmin.getOpts().AclHttpHeader user := req.Header.Get(aclHttpHeader) for _, v := range adminUsers { if v == user { diff --git a/nsqadmin/notify.go b/nsqadmin/notify.go index 94c26e93c..d7fa8e077 100644 --- a/nsqadmin/notify.go +++ b/nsqadmin/notify.go @@ -39,7 +39,7 @@ func basicAuthUser(req *http.Request) string { } func (s *httpServer) notifyAdminAction(action, topic, channel, node string, req *http.Request) { - if s.ctx.nsqadmin.getOpts().NotificationHTTPEndpoint == "" { + if s.nsqadmin.getOpts().NotificationHTTPEndpoint == "" { return } via, _ := os.Hostname() @@ -67,5 +67,5 @@ func (s *httpServer) notifyAdminAction(action, topic, channel, node string, req Via: via, } // Perform all work in a new goroutine so this never blocks - go func() { s.ctx.nsqadmin.notifications <- a }() + go func() { s.nsqadmin.notifications <- a }() } diff --git a/nsqadmin/nsqadmin.go b/nsqadmin/nsqadmin.go index 65e022eba..f27801a25 100644 --- a/nsqadmin/nsqadmin.go +++ b/nsqadmin/nsqadmin.go @@ -179,7 +179,7 @@ func (n *NSQAdmin) Main() error { }) } - httpServer := NewHTTPServer(&Context{n}) + httpServer := NewHTTPServer(n) n.waitGroup.Wrap(func() { exitFunc(http_api.Serve(n.httpListener, http_api.CompressHandler(httpServer), "HTTP", n.logf)) }) diff --git a/nsqd/channel.go b/nsqd/channel.go index 23bf73ff4..3fc931c38 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -43,7 +43,7 @@ type Channel struct { topicName string name string - ctx *context + nsqd *NSQD backend BackendQueue @@ -71,7 +71,7 @@ type Channel struct { } // NewChannel creates a new instance of the Channel type and returns a pointer -func NewChannel(topicName string, channelName string, ctx *context, +func NewChannel(topicName string, channelName string, nsqd *NSQD, deleteCallback func(*Channel)) *Channel { c := &Channel{ @@ -80,16 +80,16 @@ func NewChannel(topicName string, channelName string, ctx *context, memoryMsgChan: nil, clients: make(map[int64]Consumer), deleteCallback: deleteCallback, - ctx: ctx, + nsqd: nsqd, } // create mem-queue only if size > 0 (do not use unbuffered chan) - if ctx.nsqd.getOpts().MemQueueSize > 0 { - c.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize) + if nsqd.getOpts().MemQueueSize > 0 { + c.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize) } - if len(ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 { + if len(nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 { c.e2eProcessingLatencyStream = quantile.New( - ctx.nsqd.getOpts().E2EProcessingLatencyWindowTime, - ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles, + nsqd.getOpts().E2EProcessingLatencyWindowTime, + nsqd.getOpts().E2EProcessingLatencyPercentiles, ) } @@ -100,30 +100,30 @@ func NewChannel(topicName string, channelName string, ctx *context, c.backend = newDummyBackendQueue() } else { dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) { - opts := ctx.nsqd.getOpts() + opts := nsqd.getOpts() lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...) } // backend names, for uniqueness, automatically include the topic... backendName := getBackendName(topicName, channelName) c.backend = diskqueue.New( backendName, - ctx.nsqd.getOpts().DataPath, - ctx.nsqd.getOpts().MaxBytesPerFile, + nsqd.getOpts().DataPath, + nsqd.getOpts().MaxBytesPerFile, int32(minValidMsgLength), - int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength, - ctx.nsqd.getOpts().SyncEvery, - ctx.nsqd.getOpts().SyncTimeout, + int32(nsqd.getOpts().MaxMsgSize)+minValidMsgLength, + nsqd.getOpts().SyncEvery, + nsqd.getOpts().SyncTimeout, dqLogf, ) } - c.ctx.nsqd.Notify(c) + c.nsqd.Notify(c) return c } func (c *Channel) initPQ() { - pqSize := int(math.Max(1, float64(c.ctx.nsqd.getOpts().MemQueueSize)/10)) + pqSize := int(math.Max(1, float64(c.nsqd.getOpts().MemQueueSize)/10)) c.inFlightMutex.Lock() c.inFlightMessages = make(map[MessageID]*Message) @@ -160,13 +160,13 @@ func (c *Channel) exit(deleted bool) error { } if deleted { - c.ctx.nsqd.logf(LOG_INFO, "CHANNEL(%s): deleting", c.name) + c.nsqd.logf(LOG_INFO, "CHANNEL(%s): deleting", c.name) // since we are explicitly deleting a channel (not just at system exit time) // de-register this from the lookupd - c.ctx.nsqd.Notify(c) + c.nsqd.Notify(c) } else { - c.ctx.nsqd.logf(LOG_INFO, "CHANNEL(%s): closing", c.name) + c.nsqd.logf(LOG_INFO, "CHANNEL(%s): closing", c.name) } // this forceably closes client connections @@ -212,7 +212,7 @@ finish: // it does not drain inflight/deferred because it is only called in Close() func (c *Channel) flush() error { if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 { - c.ctx.nsqd.logf(LOG_INFO, "CHANNEL(%s): flushing %d memory %d in-flight %d deferred messages to backend", + c.nsqd.logf(LOG_INFO, "CHANNEL(%s): flushing %d memory %d in-flight %d deferred messages to backend", c.name, len(c.memoryMsgChan), len(c.inFlightMessages), len(c.deferredMessages)) } @@ -221,7 +221,7 @@ func (c *Channel) flush() error { case msg := <-c.memoryMsgChan: err := writeMessageToBackend(msg, c.backend) if err != nil { - c.ctx.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err) + c.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err) } default: goto finish @@ -233,7 +233,7 @@ finish: for _, msg := range c.inFlightMessages { err := writeMessageToBackend(msg, c.backend) if err != nil { - c.ctx.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err) + c.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err) } } c.inFlightMutex.Unlock() @@ -243,7 +243,7 @@ finish: msg := item.Value.(*Message) err := writeMessageToBackend(msg, c.backend) if err != nil { - c.ctx.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err) + c.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err) } } c.deferredMutex.Unlock() @@ -306,9 +306,9 @@ func (c *Channel) put(m *Message) error { case c.memoryMsgChan <- m: default: err := writeMessageToBackend(m, c.backend) - c.ctx.nsqd.SetHealth(err) + c.nsqd.SetHealth(err) if err != nil { - c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s", + c.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s", c.name, err) return err } @@ -331,9 +331,9 @@ func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout ti newTimeout := time.Now().Add(clientMsgTimeout) if newTimeout.Sub(msg.deliveryTS) >= - c.ctx.nsqd.getOpts().MaxMsgTimeout { + c.nsqd.getOpts().MaxMsgTimeout { // we would have gone over, set to the max - newTimeout = msg.deliveryTS.Add(c.ctx.nsqd.getOpts().MaxMsgTimeout) + newTimeout = msg.deliveryTS.Add(c.nsqd.getOpts().MaxMsgTimeout) } msg.pri = newTimeout.UnixNano() @@ -398,7 +398,7 @@ func (c *Channel) AddClient(clientID int64, client Consumer) error { return nil } - maxChannelConsumers := c.ctx.nsqd.getOpts().MaxChannelConsumers + maxChannelConsumers := c.nsqd.getOpts().MaxChannelConsumers if maxChannelConsumers != 0 && len(c.clients) >= maxChannelConsumers { return errors.New("E_TOO_MANY_CHANNEL_CONSUMERS") } diff --git a/nsqd/channel_test.go b/nsqd/channel_test.go index 4873706a3..5d4d786d5 100644 --- a/nsqd/channel_test.go +++ b/nsqd/channel_test.go @@ -150,7 +150,7 @@ func TestChannelEmptyConsumer(t *testing.T) { topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) channel := topic.GetChannel("channel") - client := newClientV2(0, conn, &context{nsqd}) + client := newClientV2(0, conn, nsqd) client.SetReadyCount(25) err := channel.AddClient(client.ID, client) test.Equal(t, err, nil) @@ -189,12 +189,12 @@ func TestMaxChannelConsumers(t *testing.T) { topic := nsqd.GetTopic(topicName) channel := topic.GetChannel("channel") - client1 := newClientV2(1, conn, &context{nsqd}) + client1 := newClientV2(1, conn, nsqd) client1.SetReadyCount(25) err := channel.AddClient(client1.ID, client1) test.Equal(t, err, nil) - client2 := newClientV2(2, conn, &context{nsqd}) + client2 := newClientV2(2, conn, nsqd) client2.SetReadyCount(25) err = channel.AddClient(client2.ID, client2) test.NotEqual(t, err, nil) diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index 877d6b234..f81b760e0 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -61,7 +61,7 @@ type clientV2 struct { metaLock sync.RWMutex ID int64 - ctx *context + nsqd *NSQD UserAgent string // original connection @@ -108,15 +108,15 @@ type clientV2 struct { AuthState *auth.State } -func newClientV2(id int64, conn net.Conn, ctx *context) *clientV2 { +func newClientV2(id int64, conn net.Conn, nsqd *NSQD) *clientV2 { var identifier string if conn != nil { identifier, _, _ = net.SplitHostPort(conn.RemoteAddr().String()) } c := &clientV2{ - ID: id, - ctx: ctx, + ID: id, + nsqd: nsqd, Conn: conn, @@ -124,9 +124,9 @@ func newClientV2(id int64, conn net.Conn, ctx *context) *clientV2 { Writer: bufio.NewWriterSize(conn, defaultBufferSize), OutputBufferSize: defaultBufferSize, - OutputBufferTimeout: ctx.nsqd.getOpts().OutputBufferTimeout, + OutputBufferTimeout: nsqd.getOpts().OutputBufferTimeout, - MsgTimeout: ctx.nsqd.getOpts().MsgTimeout, + MsgTimeout: nsqd.getOpts().MsgTimeout, // ReadyStateChan has a buffer of 1 to guarantee that in the event // there is a race the state update is not lost @@ -142,7 +142,7 @@ func newClientV2(id int64, conn net.Conn, ctx *context) *clientV2 { IdentifyEventChan: make(chan identifyEvent, 1), // heartbeats are client configurable but default to 30s - HeartbeatInterval: ctx.nsqd.getOpts().ClientTimeout / 2, + HeartbeatInterval: nsqd.getOpts().ClientTimeout / 2, pubCounts: make(map[string]uint64), } @@ -155,7 +155,7 @@ func (c *clientV2) String() string { } func (c *clientV2) Identify(data identifyDataV2) error { - c.ctx.nsqd.logf(LOG_INFO, "[%s] IDENTIFY: %+v", c, data) + c.nsqd.logf(LOG_INFO, "[%s] IDENTIFY: %+v", c, data) c.metaLock.Lock() c.ClientID = data.ClientID @@ -317,7 +317,7 @@ func (c *clientV2) IsReadyForMessages() bool { readyCount := atomic.LoadInt64(&c.ReadyCount) inFlightCount := atomic.LoadInt64(&c.InFlightCount) - c.ctx.nsqd.logf(LOG_DEBUG, "[%s] state rdy: %4d inflt: %4d", c, readyCount, inFlightCount) + c.nsqd.logf(LOG_DEBUG, "[%s] state rdy: %4d inflt: %4d", c, readyCount, inFlightCount) if inFlightCount >= readyCount || readyCount <= 0 { return false @@ -402,7 +402,7 @@ func (c *clientV2) SetHeartbeatInterval(desiredInterval int) error { case desiredInterval == 0: // do nothing (use default) case desiredInterval >= 1000 && - desiredInterval <= int(c.ctx.nsqd.getOpts().MaxHeartbeatInterval/time.Millisecond): + desiredInterval <= int(c.nsqd.getOpts().MaxHeartbeatInterval/time.Millisecond): c.HeartbeatInterval = time.Duration(desiredInterval) * time.Millisecond default: return fmt.Errorf("heartbeat interval (%d) is invalid", desiredInterval) @@ -421,8 +421,8 @@ func (c *clientV2) SetOutputBuffer(desiredSize int, desiredTimeout int) error { case desiredTimeout == 0: // do nothing (use default) case true && - desiredTimeout >= int(c.ctx.nsqd.getOpts().MinOutputBufferTimeout/time.Millisecond) && - desiredTimeout <= int(c.ctx.nsqd.getOpts().MaxOutputBufferTimeout/time.Millisecond): + desiredTimeout >= int(c.nsqd.getOpts().MinOutputBufferTimeout/time.Millisecond) && + desiredTimeout <= int(c.nsqd.getOpts().MaxOutputBufferTimeout/time.Millisecond): c.OutputBufferTimeout = time.Duration(desiredTimeout) * time.Millisecond default: @@ -436,7 +436,7 @@ func (c *clientV2) SetOutputBuffer(desiredSize int, desiredTimeout int) error { c.OutputBufferTimeout = 0 case desiredSize == 0: // do nothing (use default) - case desiredSize >= 64 && desiredSize <= int(c.ctx.nsqd.getOpts().MaxOutputBufferSize): + case desiredSize >= 64 && desiredSize <= int(c.nsqd.getOpts().MaxOutputBufferSize): c.OutputBufferSize = desiredSize default: return fmt.Errorf("output buffer size (%d) is invalid", desiredSize) @@ -469,7 +469,7 @@ func (c *clientV2) SetMsgTimeout(msgTimeout int) error { case msgTimeout == 0: // do nothing (use default) case msgTimeout >= 1000 && - msgTimeout <= int(c.ctx.nsqd.getOpts().MaxMsgTimeout/time.Millisecond): + msgTimeout <= int(c.nsqd.getOpts().MaxMsgTimeout/time.Millisecond): c.MsgTimeout = time.Duration(msgTimeout) * time.Millisecond default: return fmt.Errorf("msg timeout (%d) is invalid", msgTimeout) @@ -482,7 +482,7 @@ func (c *clientV2) UpgradeTLS() error { c.writeLock.Lock() defer c.writeLock.Unlock() - tlsConn := tls.Server(c.Conn, c.ctx.nsqd.tlsConfig) + tlsConn := tls.Server(c.Conn, c.nsqd.tlsConfig) tlsConn.SetDeadline(time.Now().Add(5 * time.Second)) err := tlsConn.Handshake() if err != nil { @@ -570,10 +570,10 @@ func (c *clientV2) QueryAuthd() error { } } - authState, err := auth.QueryAnyAuthd(c.ctx.nsqd.getOpts().AuthHTTPAddresses, + authState, err := auth.QueryAnyAuthd(c.nsqd.getOpts().AuthHTTPAddresses, remoteIP, tlsEnabled, commonName, c.AuthSecret, - c.ctx.nsqd.getOpts().HTTPClientConnectTimeout, - c.ctx.nsqd.getOpts().HTTPClientRequestTimeout) + c.nsqd.getOpts().HTTPClientConnectTimeout, + c.nsqd.getOpts().HTTPClientRequestTimeout) if err != nil { return err } diff --git a/nsqd/context.go b/nsqd/context.go deleted file mode 100644 index d8938494f..000000000 --- a/nsqd/context.go +++ /dev/null @@ -1,5 +0,0 @@ -package nsqd - -type context struct { - nsqd *NSQD -} diff --git a/nsqd/http.go b/nsqd/http.go index fef9a05da..ffdec549d 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -32,22 +32,22 @@ var boolParams = map[string]bool{ } type httpServer struct { - ctx *context + nsqd *NSQD tlsEnabled bool tlsRequired bool router http.Handler } -func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer { - log := http_api.Log(ctx.nsqd.logf) +func newHTTPServer(nsqd *NSQD, tlsEnabled bool, tlsRequired bool) *httpServer { + log := http_api.Log(nsqd.logf) router := httprouter.New() router.HandleMethodNotAllowed = true - router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.logf) - router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.logf) - router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.logf) + router.PanicHandler = http_api.LogPanicHandler(nsqd.logf) + router.NotFound = http_api.LogNotFoundHandler(nsqd.logf) + router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(nsqd.logf) s := &httpServer{ - ctx: ctx, + nsqd: nsqd, tlsEnabled: tlsEnabled, tlsRequired: tlsRequired, router: router, @@ -102,7 +102,7 @@ func setBlockRateHandler(w http.ResponseWriter, req *http.Request, ps httprouter func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { if !s.tlsEnabled && s.tlsRequired { resp := fmt.Sprintf(`{"message": "TLS_REQUIRED", "https_port": %d}`, - s.ctx.nsqd.RealHTTPSAddr().Port) + s.nsqd.RealHTTPSAddr().Port) w.Header().Set("X-NSQ-Content-Type", "nsq; version=1.0") w.Header().Set("Content-Type", "application/json; charset=utf-8") w.WriteHeader(403) @@ -113,8 +113,8 @@ func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { } func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { - health := s.ctx.nsqd.GetHealth() - if !s.ctx.nsqd.IsHealthy() { + health := s.nsqd.GetHealth() + if !s.nsqd.IsHealthy() { return nil, http_api.Err{500, health} } return health, nil @@ -134,18 +134,18 @@ func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprou StartTime int64 `json:"start_time"` }{ Version: version.Binary, - BroadcastAddress: s.ctx.nsqd.getOpts().BroadcastAddress, + BroadcastAddress: s.nsqd.getOpts().BroadcastAddress, Hostname: hostname, - TCPPort: s.ctx.nsqd.RealTCPAddr().Port, - HTTPPort: s.ctx.nsqd.RealHTTPAddr().Port, - StartTime: s.ctx.nsqd.GetStartTime().Unix(), + TCPPort: s.nsqd.RealTCPAddr().Port, + HTTPPort: s.nsqd.RealHTTPAddr().Port, + StartTime: s.nsqd.GetStartTime().Unix(), }, nil } func (s *httpServer) getExistingTopicFromQuery(req *http.Request) (*http_api.ReqParams, *Topic, string, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { - s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) + s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) return nil, nil, "", http_api.Err{400, "INVALID_REQUEST"} } @@ -154,7 +154,7 @@ func (s *httpServer) getExistingTopicFromQuery(req *http.Request) (*http_api.Req return nil, nil, "", http_api.Err{400, err.Error()} } - topic, err := s.ctx.nsqd.GetExistingTopic(topicName) + topic, err := s.nsqd.GetExistingTopic(topicName) if err != nil { return nil, nil, "", http_api.Err{404, "TOPIC_NOT_FOUND"} } @@ -165,7 +165,7 @@ func (s *httpServer) getExistingTopicFromQuery(req *http.Request) (*http_api.Req func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, error) { reqParams, err := url.ParseQuery(req.URL.RawQuery) if err != nil { - s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) + s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) return nil, nil, http_api.Err{400, "INVALID_REQUEST"} } @@ -179,20 +179,20 @@ func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, e return nil, nil, http_api.Err{400, "INVALID_TOPIC"} } - return reqParams, s.ctx.nsqd.GetTopic(topicName), nil + return reqParams, s.nsqd.GetTopic(topicName), nil } func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { // TODO: one day I'd really like to just error on chunked requests // to be able to fail "too big" requests before we even read - if req.ContentLength > s.ctx.nsqd.getOpts().MaxMsgSize { + if req.ContentLength > s.nsqd.getOpts().MaxMsgSize { return nil, http_api.Err{413, "MSG_TOO_BIG"} } // add 1 so that it's greater than our max when we test for it // (LimitReader returns a "fake" EOF) - readMax := s.ctx.nsqd.getOpts().MaxMsgSize + 1 + readMax := s.nsqd.getOpts().MaxMsgSize + 1 body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax)) if err != nil { return nil, http_api.Err{500, "INTERNAL_ERROR"} @@ -217,7 +217,7 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout return nil, http_api.Err{400, "INVALID_DEFER"} } deferred = time.Duration(di) * time.Millisecond - if deferred < 0 || deferred > s.ctx.nsqd.getOpts().MaxReqTimeout { + if deferred < 0 || deferred > s.nsqd.getOpts().MaxReqTimeout { return nil, http_api.Err{400, "INVALID_DEFER"} } } @@ -239,7 +239,7 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou // TODO: one day I'd really like to just error on chunked requests // to be able to fail "too big" requests before we even read - if req.ContentLength > s.ctx.nsqd.getOpts().MaxBodySize { + if req.ContentLength > s.nsqd.getOpts().MaxBodySize { return nil, http_api.Err{413, "BODY_TOO_BIG"} } @@ -253,20 +253,20 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou if vals, ok := reqParams["binary"]; ok { if binaryMode, ok = boolParams[vals[0]]; !ok { binaryMode = true - s.ctx.nsqd.logf(LOG_WARN, "deprecated value '%s' used for /mpub binary param", vals[0]) + s.nsqd.logf(LOG_WARN, "deprecated value '%s' used for /mpub binary param", vals[0]) } } if binaryMode { tmp := make([]byte, 4) msgs, err = readMPUB(req.Body, tmp, topic, - s.ctx.nsqd.getOpts().MaxMsgSize, s.ctx.nsqd.getOpts().MaxBodySize) + s.nsqd.getOpts().MaxMsgSize, s.nsqd.getOpts().MaxBodySize) if err != nil { return nil, http_api.Err{413, err.(*protocol.FatalClientErr).Code[2:]} } } else { // add 1 so that it's greater than our max when we test for it // (LimitReader returns a "fake" EOF) - readMax := s.ctx.nsqd.getOpts().MaxBodySize + 1 + readMax := s.nsqd.getOpts().MaxBodySize + 1 rdr := bufio.NewReader(io.LimitReader(req.Body, readMax)) total := 0 for !exit { @@ -293,7 +293,7 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou continue } - if int64(len(block)) > s.ctx.nsqd.getOpts().MaxMsgSize { + if int64(len(block)) > s.nsqd.getOpts().MaxMsgSize { return nil, http_api.Err{413, "MSG_TOO_BIG"} } @@ -318,7 +318,7 @@ func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps func (s *httpServer) doEmptyTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { - s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) + s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) return nil, http_api.Err{400, "INVALID_REQUEST"} } @@ -331,7 +331,7 @@ func (s *httpServer) doEmptyTopic(w http.ResponseWriter, req *http.Request, ps h return nil, http_api.Err{400, "INVALID_TOPIC"} } - topic, err := s.ctx.nsqd.GetExistingTopic(topicName) + topic, err := s.nsqd.GetExistingTopic(topicName) if err != nil { return nil, http_api.Err{404, "TOPIC_NOT_FOUND"} } @@ -347,7 +347,7 @@ func (s *httpServer) doEmptyTopic(w http.ResponseWriter, req *http.Request, ps h func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { - s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) + s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) return nil, http_api.Err{400, "INVALID_REQUEST"} } @@ -356,7 +356,7 @@ func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} } - err = s.ctx.nsqd.DeleteExistingTopic(topicName) + err = s.nsqd.DeleteExistingTopic(topicName) if err != nil { return nil, http_api.Err{404, "TOPIC_NOT_FOUND"} } @@ -367,7 +367,7 @@ func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps func (s *httpServer) doPauseTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { reqParams, err := http_api.NewReqParams(req) if err != nil { - s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) + s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) return nil, http_api.Err{400, "INVALID_REQUEST"} } @@ -376,7 +376,7 @@ func (s *httpServer) doPauseTopic(w http.ResponseWriter, req *http.Request, ps h return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} } - topic, err := s.ctx.nsqd.GetExistingTopic(topicName) + topic, err := s.nsqd.GetExistingTopic(topicName) if err != nil { return nil, http_api.Err{404, "TOPIC_NOT_FOUND"} } @@ -387,15 +387,15 @@ func (s *httpServer) doPauseTopic(w http.ResponseWriter, req *http.Request, ps h err = topic.Pause() } if err != nil { - s.ctx.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, err) + s.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, err) return nil, http_api.Err{500, "INTERNAL_ERROR"} } // pro-actively persist metadata so in case of process failure // nsqd won't suddenly (un)pause a topic - s.ctx.nsqd.Lock() - s.ctx.nsqd.PersistMetadata() - s.ctx.nsqd.Unlock() + s.nsqd.Lock() + s.nsqd.PersistMetadata() + s.nsqd.Unlock() return nil, nil } @@ -458,15 +458,15 @@ func (s *httpServer) doPauseChannel(w http.ResponseWriter, req *http.Request, ps err = channel.Pause() } if err != nil { - s.ctx.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, err) + s.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, err) return nil, http_api.Err{500, "INTERNAL_ERROR"} } // pro-actively persist metadata so in case of process failure // nsqd won't suddenly (un)pause a channel - s.ctx.nsqd.Lock() - s.ctx.nsqd.PersistMetadata() - s.ctx.nsqd.Unlock() + s.nsqd.Lock() + s.nsqd.PersistMetadata() + s.nsqd.Unlock() return nil, nil } @@ -475,7 +475,7 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro reqParams, err := http_api.NewReqParams(req) if err != nil { - s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) + s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) return nil, http_api.Err{400, "INVALID_REQUEST"} } formatString, _ := reqParams.Get("format") @@ -494,12 +494,12 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro includeMem = true } if includeClients { - producerStats = s.ctx.nsqd.GetProducerStats() + producerStats = s.nsqd.GetProducerStats() } - stats := s.ctx.nsqd.GetStats(topicName, channelName, includeClients) - health := s.ctx.nsqd.GetHealth() - startTime := s.ctx.nsqd.GetStartTime() + stats := s.nsqd.GetStats(topicName, channelName, includeClients) + health := s.nsqd.GetHealth() + startTime := s.nsqd.GetStartTime() uptime := time.Since(startTime) // filter by topic (if specified) @@ -681,7 +681,7 @@ func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httpr if req.Method == "PUT" { // add 1 so that it's greater than our max when we test for it // (LimitReader returns a "fake" EOF) - readMax := s.ctx.nsqd.getOpts().MaxMsgSize + 1 + readMax := s.nsqd.getOpts().MaxMsgSize + 1 body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax)) if err != nil { return nil, http_api.Err{500, "INTERNAL_ERROR"} @@ -690,7 +690,7 @@ func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httpr return nil, http_api.Err{413, "INVALID_VALUE"} } - opts := *s.ctx.nsqd.getOpts() + opts := *s.nsqd.getOpts() switch opt { case "nsqlookupd_tcp_addresses": err := json.Unmarshal(body, &opts.NSQLookupdTCPAddresses) @@ -707,11 +707,11 @@ func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httpr default: return nil, http_api.Err{400, "INVALID_OPTION"} } - s.ctx.nsqd.swapOpts(&opts) - s.ctx.nsqd.triggerOptsNotification() + s.nsqd.swapOpts(&opts) + s.nsqd.triggerOptsNotification() } - v, ok := getOptByCfgName(s.ctx.nsqd.getOpts(), opt) + v, ok := getOptByCfgName(s.nsqd.getOpts(), opt) if !ok { return nil, http_api.Err{400, "INVALID_OPTION"} } diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 87df91c94..390e1a654 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -248,8 +248,6 @@ func (n *NSQD) RemoveClient(clientID int64) { } func (n *NSQD) Main() error { - ctx := &context{n} - exitCh := make(chan error) var once sync.Once exitFunc := func(err error) { @@ -261,18 +259,18 @@ func (n *NSQD) Main() error { }) } - n.tcpServer.ctx = ctx + n.tcpServer.nsqd = n n.waitGroup.Wrap(func() { exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf)) }) - httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired) + httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired) n.waitGroup.Wrap(func() { exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)) }) if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { - httpsServer := newHTTPServer(ctx, true, true) + httpsServer := newHTTPServer(n, true, true) n.waitGroup.Wrap(func() { exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)) }) @@ -482,7 +480,7 @@ func (n *NSQD) GetTopic(topicName string) *Topic { deleteCallback := func(t *Topic) { n.DeleteExistingTopic(t.name) } - t = NewTopic(topicName, &context{n}, deleteCallback) + t = NewTopic(topicName, n, deleteCallback) n.topicMap[topicName] = t n.Unlock() diff --git a/nsqd/nsqd_test.go b/nsqd/nsqd_test.go index 147280fa7..b39df44e8 100644 --- a/nsqd/nsqd_test.go +++ b/nsqd/nsqd_test.go @@ -178,7 +178,7 @@ func TestEphemeralTopicsAndChannels(t *testing.T) { body := []byte("an_ephemeral_message") topic := nsqd.GetTopic(topicName) ephemeralChannel := topic.GetChannel("ch1#ephemeral") - client := newClientV2(0, nil, &context{nsqd}) + client := newClientV2(0, nil, nsqd) err := ephemeralChannel.AddClient(client.ID, client) test.Equal(t, err, nil) diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 0c0593ce7..c95aeba6d 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -30,7 +30,7 @@ var heartbeatBytes = []byte("_heartbeat_") var okBytes = []byte("OK") type protocolV2 struct { - ctx *context + nsqd *NSQD } func (p *protocolV2) IOLoop(conn net.Conn) error { @@ -38,9 +38,9 @@ func (p *protocolV2) IOLoop(conn net.Conn) error { var line []byte var zeroTime time.Time - clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) - client := newClientV2(clientID, conn, p.ctx) - p.ctx.nsqd.AddClient(client.ID, client) + clientID := atomic.AddInt64(&p.nsqd.clientIDSequence, 1) + client := newClientV2(clientID, conn, p.nsqd) + p.nsqd.AddClient(client.ID, client) // synchronize the startup of messagePump in order // to guarantee that it gets a chance to initialize @@ -78,7 +78,7 @@ func (p *protocolV2) IOLoop(conn net.Conn) error { } params := bytes.Split(line, separatorBytes) - p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params) + p.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params) var response []byte response, err = p.Exec(client, params) @@ -87,11 +87,11 @@ func (p *protocolV2) IOLoop(conn net.Conn) error { if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil { ctx = " - " + parentErr.Error() } - p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx) + p.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx) sendErr := p.Send(client, frameTypeError, []byte(err.Error())) if sendErr != nil { - p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx) + p.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx) break } @@ -111,19 +111,19 @@ func (p *protocolV2) IOLoop(conn net.Conn) error { } } - p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client) + p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client) conn.Close() close(client.ExitChan) if client.Channel != nil { client.Channel.RemoveClient(client.ID) } - p.ctx.nsqd.RemoveClient(client.ID) + p.nsqd.RemoveClient(client.ID) return err } func (p *protocolV2) SendMessage(client *clientV2, msg *Message) error { - p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): writing msg(%s) to client(%s) - %s", msg.ID, client, msg.Body) + p.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): writing msg(%s) to client(%s) - %s", msg.ID, client, msg.Body) buf := bufferPoolGet() defer bufferPoolPut(buf) @@ -308,7 +308,7 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { msg, err := decodeMessage(b) if err != nil { - p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) + p.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } msg.Attempts++ @@ -339,11 +339,11 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { } exit: - p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client) + p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client) heartbeatTicker.Stop() outputBufferTicker.Stop() if err != nil { - p.ctx.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err) + p.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err) } } @@ -359,9 +359,9 @@ func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error) return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body size") } - if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxBodySize { + if int64(bodyLen) > p.nsqd.getOpts().MaxBodySize { return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", - fmt.Sprintf("IDENTIFY body too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxBodySize)) + fmt.Sprintf("IDENTIFY body too big %d > %d", bodyLen, p.nsqd.getOpts().MaxBodySize)) } if bodyLen <= 0 { @@ -382,7 +382,7 @@ func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error) return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body") } - p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %+v", client, identifyData) + p.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %+v", client, identifyData) err = client.Identify(identifyData) if err != nil { @@ -394,16 +394,16 @@ func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error) return okBytes, nil } - tlsv1 := p.ctx.nsqd.tlsConfig != nil && identifyData.TLSv1 - deflate := p.ctx.nsqd.getOpts().DeflateEnabled && identifyData.Deflate + tlsv1 := p.nsqd.tlsConfig != nil && identifyData.TLSv1 + deflate := p.nsqd.getOpts().DeflateEnabled && identifyData.Deflate deflateLevel := 6 if deflate && identifyData.DeflateLevel > 0 { deflateLevel = identifyData.DeflateLevel } - if max := p.ctx.nsqd.getOpts().MaxDeflateLevel; max < deflateLevel { + if max := p.nsqd.getOpts().MaxDeflateLevel; max < deflateLevel { deflateLevel = max } - snappy := p.ctx.nsqd.getOpts().SnappyEnabled && identifyData.Snappy + snappy := p.nsqd.getOpts().SnappyEnabled && identifyData.Snappy if deflate && snappy { return nil, protocol.NewFatalClientErr(nil, "E_IDENTIFY_FAILED", "cannot enable both deflate and snappy compression") @@ -424,17 +424,17 @@ func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error) OutputBufferSize int `json:"output_buffer_size"` OutputBufferTimeout int64 `json:"output_buffer_timeout"` }{ - MaxRdyCount: p.ctx.nsqd.getOpts().MaxRdyCount, + MaxRdyCount: p.nsqd.getOpts().MaxRdyCount, Version: version.Binary, - MaxMsgTimeout: int64(p.ctx.nsqd.getOpts().MaxMsgTimeout / time.Millisecond), + MaxMsgTimeout: int64(p.nsqd.getOpts().MaxMsgTimeout / time.Millisecond), MsgTimeout: int64(client.MsgTimeout / time.Millisecond), TLSv1: tlsv1, Deflate: deflate, DeflateLevel: deflateLevel, - MaxDeflateLevel: p.ctx.nsqd.getOpts().MaxDeflateLevel, + MaxDeflateLevel: p.nsqd.getOpts().MaxDeflateLevel, Snappy: snappy, SampleRate: client.SampleRate, - AuthRequired: p.ctx.nsqd.IsAuthEnabled(), + AuthRequired: p.nsqd.IsAuthEnabled(), OutputBufferSize: client.OutputBufferSize, OutputBufferTimeout: int64(client.OutputBufferTimeout / time.Millisecond), }) @@ -448,7 +448,7 @@ func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error) } if tlsv1 { - p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] upgrading connection to TLS", client) + p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] upgrading connection to TLS", client) err = client.UpgradeTLS() if err != nil { return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error()) @@ -461,7 +461,7 @@ func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error) } if snappy { - p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] upgrading connection to snappy", client) + p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] upgrading connection to snappy", client) err = client.UpgradeSnappy() if err != nil { return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error()) @@ -474,7 +474,7 @@ func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error) } if deflate { - p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] upgrading connection to deflate (level %d)", client, deflateLevel) + p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] upgrading connection to deflate (level %d)", client, deflateLevel) err = client.UpgradeDeflate(deflateLevel) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error()) @@ -503,9 +503,9 @@ func (p *protocolV2) AUTH(client *clientV2, params [][]byte) ([]byte, error) { return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "AUTH failed to read body size") } - if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxBodySize { + if int64(bodyLen) > p.nsqd.getOpts().MaxBodySize { return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", - fmt.Sprintf("AUTH body too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxBodySize)) + fmt.Sprintf("AUTH body too big %d > %d", bodyLen, p.nsqd.getOpts().MaxBodySize)) } if bodyLen <= 0 { @@ -523,13 +523,13 @@ func (p *protocolV2) AUTH(client *clientV2, params [][]byte) ([]byte, error) { return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "AUTH already set") } - if !client.ctx.nsqd.IsAuthEnabled() { + if !client.nsqd.IsAuthEnabled() { return nil, protocol.NewFatalClientErr(err, "E_AUTH_DISABLED", "AUTH disabled") } if err := client.Auth(string(body)); err != nil { // we don't want to leak errors contacting the auth server to untrusted clients - p.ctx.nsqd.logf(LOG_WARN, "PROTOCOL(V2): [%s] AUTH failed %s", client, err) + p.nsqd.logf(LOG_WARN, "PROTOCOL(V2): [%s] AUTH failed %s", client, err) return nil, protocol.NewFatalClientErr(err, "E_AUTH_FAILED", "AUTH failed") } @@ -562,7 +562,7 @@ func (p *protocolV2) AUTH(client *clientV2, params [][]byte) ([]byte, error) { func (p *protocolV2) CheckAuth(client *clientV2, cmd, topicName, channelName string) error { // if auth is enabled, the client must have authorized already // compare topic/channel against cached authorization data (refetching if expired) - if client.ctx.nsqd.IsAuthEnabled() { + if client.nsqd.IsAuthEnabled() { if !client.HasAuthorizations() { return protocol.NewFatalClientErr(nil, "E_AUTH_FIRST", fmt.Sprintf("AUTH required before %s", cmd)) @@ -570,7 +570,7 @@ func (p *protocolV2) CheckAuth(client *clientV2, cmd, topicName, channelName str ok, err := client.IsAuthorized(topicName, channelName) if err != nil { // we don't want to leak errors contacting the auth server to untrusted clients - p.ctx.nsqd.logf(LOG_WARN, "PROTOCOL(V2): [%s] AUTH failed %s", client, err) + p.nsqd.logf(LOG_WARN, "PROTOCOL(V2): [%s] AUTH failed %s", client, err) return protocol.NewFatalClientErr(nil, "E_AUTH_FAILED", "AUTH failed") } if !ok { @@ -615,12 +615,12 @@ func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) { // Avoid adding a client to an ephemeral channel / topic which has started exiting. var channel *Channel for { - topic := p.ctx.nsqd.GetTopic(topicName) + topic := p.nsqd.GetTopic(topicName) channel = topic.GetChannel(channelName) if err := channel.AddClient(client.ID, client); err != nil { return nil, protocol.NewFatalClientErr(nil, "E_TOO_MANY_CHANNEL_CONSUMERS", fmt.Sprintf("channel consumers for %s:%s exceeds limit of %d", - topicName, channelName, p.ctx.nsqd.getOpts().MaxChannelConsumers)) + topicName, channelName, p.nsqd.getOpts().MaxChannelConsumers)) } if (channel.ephemeral && channel.Exiting()) || (topic.ephemeral && topic.Exiting()) { @@ -643,7 +643,7 @@ func (p *protocolV2) RDY(client *clientV2, params [][]byte) ([]byte, error) { if state == stateClosing { // just ignore ready changes on a closing channel - p.ctx.nsqd.logf(LOG_INFO, + p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] ignoring RDY after CLS in state ClientStateV2Closing", client) return nil, nil @@ -663,11 +663,11 @@ func (p *protocolV2) RDY(client *clientV2, params [][]byte) ([]byte, error) { count = int64(b10) } - if count < 0 || count > p.ctx.nsqd.getOpts().MaxRdyCount { + if count < 0 || count > p.nsqd.getOpts().MaxRdyCount { // this needs to be a fatal error otherwise clients would have // inconsistent state return nil, protocol.NewFatalClientErr(nil, "E_INVALID", - fmt.Sprintf("RDY count %d out of range 0-%d", count, p.ctx.nsqd.getOpts().MaxRdyCount)) + fmt.Sprintf("RDY count %d out of range 0-%d", count, p.nsqd.getOpts().MaxRdyCount)) } client.SetReadyCount(count) @@ -723,7 +723,7 @@ func (p *protocolV2) REQ(client *clientV2, params [][]byte) ([]byte, error) { } timeoutDuration := time.Duration(timeoutMs) * time.Millisecond - maxReqTimeout := p.ctx.nsqd.getOpts().MaxReqTimeout + maxReqTimeout := p.nsqd.getOpts().MaxReqTimeout clampedTimeout := timeoutDuration if timeoutDuration < 0 { @@ -732,7 +732,7 @@ func (p *protocolV2) REQ(client *clientV2, params [][]byte) ([]byte, error) { clampedTimeout = maxReqTimeout } if clampedTimeout != timeoutDuration { - p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] REQ timeout %d out of range 0-%d. Setting to %d", + p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] REQ timeout %d out of range 0-%d. Setting to %d", client, timeoutDuration, maxReqTimeout, clampedTimeout) timeoutDuration = clampedTimeout } @@ -785,9 +785,9 @@ func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { fmt.Sprintf("PUB invalid message body size %d", bodyLen)) } - if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxMsgSize { + if int64(bodyLen) > p.nsqd.getOpts().MaxMsgSize { return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE", - fmt.Sprintf("PUB message too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxMsgSize)) + fmt.Sprintf("PUB message too big %d > %d", bodyLen, p.nsqd.getOpts().MaxMsgSize)) } messageBody := make([]byte, bodyLen) @@ -800,7 +800,7 @@ func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { return nil, err } - topic := p.ctx.nsqd.GetTopic(topicName) + topic := p.nsqd.GetTopic(topicName) msg := NewMessage(topic.GenerateID(), messageBody) err = topic.PutMessage(msg) if err != nil { @@ -829,7 +829,7 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) { return nil, err } - topic := p.ctx.nsqd.GetTopic(topicName) + topic := p.nsqd.GetTopic(topicName) bodyLen, err := readLen(client.Reader, client.lenSlice) if err != nil { @@ -841,13 +841,13 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) { fmt.Sprintf("MPUB invalid body size %d", bodyLen)) } - if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxBodySize { + if int64(bodyLen) > p.nsqd.getOpts().MaxBodySize { return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", - fmt.Sprintf("MPUB body too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxBodySize)) + fmt.Sprintf("MPUB body too big %d > %d", bodyLen, p.nsqd.getOpts().MaxBodySize)) } messages, err := readMPUB(client.Reader, client.lenSlice, topic, - p.ctx.nsqd.getOpts().MaxMsgSize, p.ctx.nsqd.getOpts().MaxBodySize) + p.nsqd.getOpts().MaxMsgSize, p.nsqd.getOpts().MaxBodySize) if err != nil { return nil, err } @@ -885,10 +885,10 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) { } timeoutDuration := time.Duration(timeoutMs) * time.Millisecond - if timeoutDuration < 0 || timeoutDuration > p.ctx.nsqd.getOpts().MaxReqTimeout { + if timeoutDuration < 0 || timeoutDuration > p.nsqd.getOpts().MaxReqTimeout { return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("DPUB timeout %d out of range 0-%d", - timeoutMs, p.ctx.nsqd.getOpts().MaxReqTimeout/time.Millisecond)) + timeoutMs, p.nsqd.getOpts().MaxReqTimeout/time.Millisecond)) } bodyLen, err := readLen(client.Reader, client.lenSlice) @@ -901,9 +901,9 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) { fmt.Sprintf("DPUB invalid message body size %d", bodyLen)) } - if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxMsgSize { + if int64(bodyLen) > p.nsqd.getOpts().MaxMsgSize { return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE", - fmt.Sprintf("DPUB message too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxMsgSize)) + fmt.Sprintf("DPUB message too big %d > %d", bodyLen, p.nsqd.getOpts().MaxMsgSize)) } messageBody := make([]byte, bodyLen) @@ -916,7 +916,7 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) { return nil, err } - topic := p.ctx.nsqd.GetTopic(topicName) + topic := p.nsqd.GetTopic(topicName) msg := NewMessage(topic.GenerateID(), messageBody) msg.deferred = timeoutDuration err = topic.PutMessage(msg) @@ -1016,7 +1016,7 @@ func readLen(r io.Reader, tmp []byte) (int32, error) { } func enforceTLSPolicy(client *clientV2, p *protocolV2, command []byte) error { - if p.ctx.nsqd.getOpts().TLSRequired != TLSNotRequired && atomic.LoadInt32(&client.TLS) != 1 { + if p.nsqd.getOpts().TLSRequired != TLSNotRequired && atomic.LoadInt32(&client.TLS) != 1 { return protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("cannot %s in current state (TLS required)", command)) } diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index e8f7d1ff6..5ff8ad9d5 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -1605,8 +1605,8 @@ func testIOLoopReturnsClientErr(t *testing.T, fakeConn test.FakeNetConn) { nsqd, err := New(opts) test.Nil(t, err) - prot := &protocolV2{ctx: &context{nsqd: nsqd}} - defer prot.ctx.nsqd.Exit() + prot := &protocolV2{nsqd: nsqd} + defer prot.nsqd.Exit() err = prot.IOLoop(fakeConn) test.NotNil(t, err) @@ -1619,9 +1619,8 @@ func BenchmarkProtocolV2Exec(b *testing.B) { opts := NewOptions() opts.Logger = test.NewTestLogger(b) nsqd, _ := New(opts) - ctx := &context{nsqd} - p := &protocolV2{ctx} - c := newClientV2(0, nil, ctx) + p := &protocolV2{nsqd} + c := newClientV2(0, nil, nsqd) params := [][]byte{[]byte("NOP")} b.StartTimer() diff --git a/nsqd/tcp.go b/nsqd/tcp.go index 527d66ccd..7279b6f3f 100644 --- a/nsqd/tcp.go +++ b/nsqd/tcp.go @@ -9,12 +9,12 @@ import ( ) type tcpServer struct { - ctx *context + nsqd *NSQD conns sync.Map } func (p *tcpServer) Handle(clientConn net.Conn) { - p.ctx.nsqd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr()) + p.nsqd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr()) // The client should initialize itself by sending a 4 byte sequence indicating // the version of the protocol that it intends to communicate, this will allow us @@ -22,23 +22,23 @@ func (p *tcpServer) Handle(clientConn net.Conn) { buf := make([]byte, 4) _, err := io.ReadFull(clientConn, buf) if err != nil { - p.ctx.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err) + p.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err) clientConn.Close() return } protocolMagic := string(buf) - p.ctx.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'", + p.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) var prot protocol.Protocol switch protocolMagic { case " V2": - prot = &protocolV2{ctx: p.ctx} + prot = &protocolV2{nsqd: p.nsqd} default: protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL")) clientConn.Close() - p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'", + p.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) return } @@ -47,7 +47,7 @@ func (p *tcpServer) Handle(clientConn net.Conn) { err = prot.IOLoop(clientConn) if err != nil { - p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err) + p.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err) } p.conns.Delete(clientConn.RemoteAddr()) diff --git a/nsqd/topic.go b/nsqd/topic.go index 3ad5d6850..76aad14bb 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -38,11 +38,11 @@ type Topic struct { paused int32 pauseChan chan int - ctx *context + nsqd *NSQD } // Topic constructor -func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic { +func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic { t := &Topic{ name: topicName, channelMap: make(map[string]*Channel), @@ -50,39 +50,39 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi startChan: make(chan int, 1), exitChan: make(chan int), channelUpdateChan: make(chan int), - ctx: ctx, + nsqd: nsqd, paused: 0, pauseChan: make(chan int), deleteCallback: deleteCallback, - idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID), + idFactory: NewGUIDFactory(nsqd.getOpts().ID), } // create mem-queue only if size > 0 (do not use unbuffered chan) - if ctx.nsqd.getOpts().MemQueueSize > 0 { - t.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize) + if nsqd.getOpts().MemQueueSize > 0 { + t.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize) } if strings.HasSuffix(topicName, "#ephemeral") { t.ephemeral = true t.backend = newDummyBackendQueue() } else { dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) { - opts := ctx.nsqd.getOpts() + opts := nsqd.getOpts() lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...) } t.backend = diskqueue.New( topicName, - ctx.nsqd.getOpts().DataPath, - ctx.nsqd.getOpts().MaxBytesPerFile, + nsqd.getOpts().DataPath, + nsqd.getOpts().MaxBytesPerFile, int32(minValidMsgLength), - int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength, - ctx.nsqd.getOpts().SyncEvery, - ctx.nsqd.getOpts().SyncTimeout, + int32(nsqd.getOpts().MaxMsgSize)+minValidMsgLength, + nsqd.getOpts().SyncEvery, + nsqd.getOpts().SyncTimeout, dqLogf, ) } t.waitGroup.Wrap(t.messagePump) - t.ctx.nsqd.Notify(t) + t.nsqd.Notify(t) return t } @@ -125,9 +125,9 @@ func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) { deleteCallback := func(c *Channel) { t.DeleteExistingChannel(c.name) } - channel = NewChannel(t.name, channelName, t.ctx, deleteCallback) + channel = NewChannel(t.name, channelName, t.nsqd, deleteCallback) t.channelMap[channelName] = channel - t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): new channel(%s)", t.name, channel.name) + t.nsqd.logf(LOG_INFO, "TOPIC(%s): new channel(%s)", t.name, channel.name) return channel, true } return channel, false @@ -156,7 +156,7 @@ func (t *Topic) DeleteExistingChannel(channelName string) error { numChannels := len(t.channelMap) t.Unlock() - t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting channel %s", t.name, channel.name) + t.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting channel %s", t.name, channel.name) // delete empties the channel before closing // (so that we dont leave any messages around) @@ -221,9 +221,9 @@ func (t *Topic) put(m *Message) error { case t.memoryMsgChan <- m: default: err := writeMessageToBackend(m, t.backend) - t.ctx.nsqd.SetHealth(err) + t.nsqd.SetHealth(err) if err != nil { - t.ctx.nsqd.logf(LOG_ERROR, + t.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to write message to backend - %s", t.name, err) return err @@ -276,7 +276,7 @@ func (t *Topic) messagePump() { case buf = <-backendChan: msg, err = decodeMessage(buf) if err != nil { - t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) + t.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } case <-t.channelUpdateChan: @@ -324,7 +324,7 @@ func (t *Topic) messagePump() { } err := channel.PutMessage(chanMsg) if err != nil { - t.ctx.nsqd.logf(LOG_ERROR, + t.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", t.name, msg.ID, channel.name, err) } @@ -332,7 +332,7 @@ func (t *Topic) messagePump() { } exit: - t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name) + t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name) } // Delete empties the topic and all its channels and closes @@ -351,13 +351,13 @@ func (t *Topic) exit(deleted bool) error { } if deleted { - t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting", t.name) + t.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting", t.name) // since we are explicitly deleting a topic (not just at system exit time) // de-register this from the lookupd - t.ctx.nsqd.Notify(t) + t.nsqd.Notify(t) } else { - t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing", t.name) + t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing", t.name) } close(t.exitChan) @@ -383,7 +383,7 @@ func (t *Topic) exit(deleted bool) error { err := channel.Close() if err != nil { // we need to continue regardless of error to close all the channels - t.ctx.nsqd.logf(LOG_ERROR, "channel(%s) close - %s", channel.name, err) + t.nsqd.logf(LOG_ERROR, "channel(%s) close - %s", channel.name, err) } } @@ -407,7 +407,7 @@ finish: func (t *Topic) flush() error { if len(t.memoryMsgChan) > 0 { - t.ctx.nsqd.logf(LOG_INFO, + t.nsqd.logf(LOG_INFO, "TOPIC(%s): flushing %d memory messages to backend", t.name, len(t.memoryMsgChan)) } @@ -417,7 +417,7 @@ func (t *Topic) flush() error { case msg := <-t.memoryMsgChan: err := writeMessageToBackend(msg, t.backend) if err != nil { - t.ctx.nsqd.logf(LOG_ERROR, + t.nsqd.logf(LOG_ERROR, "ERROR: failed to write message to backend - %s", err) } default: @@ -443,8 +443,8 @@ func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile { } if latencyStream == nil { latencyStream = quantile.New( - t.ctx.nsqd.getOpts().E2EProcessingLatencyWindowTime, - t.ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles) + t.nsqd.getOpts().E2EProcessingLatencyWindowTime, + t.nsqd.getOpts().E2EProcessingLatencyPercentiles) } latencyStream.Merge(c.e2eProcessingLatencyStream) } @@ -486,7 +486,7 @@ func (t *Topic) GenerateID() MessageID { return id.Hex() } if i%10000 == 0 { - t.ctx.nsqd.logf(LOG_ERROR, "TOPIC(%s): failed to create guid - %s", t.name, err) + t.nsqd.logf(LOG_ERROR, "TOPIC(%s): failed to create guid - %s", t.name, err) } time.Sleep(time.Millisecond) i++ diff --git a/nsqlookupd/context.go b/nsqlookupd/context.go deleted file mode 100644 index e7132ef25..000000000 --- a/nsqlookupd/context.go +++ /dev/null @@ -1,5 +0,0 @@ -package nsqlookupd - -type Context struct { - nsqlookupd *NSQLookupd -} diff --git a/nsqlookupd/http.go b/nsqlookupd/http.go index c9fa2e1a7..d2806ea6d 100644 --- a/nsqlookupd/http.go +++ b/nsqlookupd/http.go @@ -13,21 +13,21 @@ import ( ) type httpServer struct { - ctx *Context - router http.Handler + nsqlookupd *NSQLookupd + router http.Handler } -func newHTTPServer(ctx *Context) *httpServer { - log := http_api.Log(ctx.nsqlookupd.logf) +func newHTTPServer(l *NSQLookupd) *httpServer { + log := http_api.Log(l.logf) router := httprouter.New() router.HandleMethodNotAllowed = true - router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.logf) - router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.logf) - router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlookupd.logf) + router.PanicHandler = http_api.LogPanicHandler(l.logf) + router.NotFound = http_api.LogNotFoundHandler(l.logf) + router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(l.logf) s := &httpServer{ - ctx: ctx, - router: router, + nsqlookupd: l, + router: router, } router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText)) @@ -78,7 +78,7 @@ func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprou } func (s *httpServer) doTopics(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { - topics := s.ctx.nsqlookupd.DB.FindRegistrations("topic", "*", "").Keys() + topics := s.nsqlookupd.DB.FindRegistrations("topic", "*", "").Keys() return map[string]interface{}{ "topics": topics, }, nil @@ -95,7 +95,7 @@ func (s *httpServer) doChannels(w http.ResponseWriter, req *http.Request, ps htt return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} } - channels := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys() + channels := s.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys() return map[string]interface{}{ "channels": channels, }, nil @@ -112,15 +112,15 @@ func (s *httpServer) doLookup(w http.ResponseWriter, req *http.Request, ps httpr return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} } - registration := s.ctx.nsqlookupd.DB.FindRegistrations("topic", topicName, "") + registration := s.nsqlookupd.DB.FindRegistrations("topic", topicName, "") if len(registration) == 0 { return nil, http_api.Err{404, "TOPIC_NOT_FOUND"} } - channels := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys() - producers := s.ctx.nsqlookupd.DB.FindProducers("topic", topicName, "") - producers = producers.FilterByActive(s.ctx.nsqlookupd.opts.InactiveProducerTimeout, - s.ctx.nsqlookupd.opts.TombstoneLifetime) + channels := s.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys() + producers := s.nsqlookupd.DB.FindProducers("topic", topicName, "") + producers = producers.FilterByActive(s.nsqlookupd.opts.InactiveProducerTimeout, + s.nsqlookupd.opts.TombstoneLifetime) return map[string]interface{}{ "channels": channels, "producers": producers.PeerInfo(), @@ -142,9 +142,9 @@ func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps return nil, http_api.Err{400, "INVALID_ARG_TOPIC"} } - s.ctx.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName) + s.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName) key := Registration{"topic", topicName, ""} - s.ctx.nsqlookupd.DB.AddRegistration(key) + s.nsqlookupd.DB.AddRegistration(key) return nil, nil } @@ -160,16 +160,16 @@ func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} } - registrations := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, "*") + registrations := s.nsqlookupd.DB.FindRegistrations("channel", topicName, "*") for _, registration := range registrations { - s.ctx.nsqlookupd.logf(LOG_INFO, "DB: removing channel(%s) from topic(%s)", registration.SubKey, topicName) - s.ctx.nsqlookupd.DB.RemoveRegistration(registration) + s.nsqlookupd.logf(LOG_INFO, "DB: removing channel(%s) from topic(%s)", registration.SubKey, topicName) + s.nsqlookupd.DB.RemoveRegistration(registration) } - registrations = s.ctx.nsqlookupd.DB.FindRegistrations("topic", topicName, "") + registrations = s.nsqlookupd.DB.FindRegistrations("topic", topicName, "") for _, registration := range registrations { - s.ctx.nsqlookupd.logf(LOG_INFO, "DB: removing topic(%s)", topicName) - s.ctx.nsqlookupd.DB.RemoveRegistration(registration) + s.nsqlookupd.logf(LOG_INFO, "DB: removing topic(%s)", topicName) + s.nsqlookupd.DB.RemoveRegistration(registration) } return nil, nil @@ -191,8 +191,8 @@ func (s *httpServer) doTombstoneTopicProducer(w http.ResponseWriter, req *http.R return nil, http_api.Err{400, "MISSING_ARG_NODE"} } - s.ctx.nsqlookupd.logf(LOG_INFO, "DB: setting tombstone for producer@%s of topic(%s)", node, topicName) - producers := s.ctx.nsqlookupd.DB.FindProducers("topic", topicName, "") + s.nsqlookupd.logf(LOG_INFO, "DB: setting tombstone for producer@%s of topic(%s)", node, topicName) + producers := s.nsqlookupd.DB.FindProducers("topic", topicName, "") for _, p := range producers { thisNode := fmt.Sprintf("%s:%d", p.peerInfo.BroadcastAddress, p.peerInfo.HTTPPort) if thisNode == node { @@ -214,13 +214,13 @@ func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, p return nil, http_api.Err{400, err.Error()} } - s.ctx.nsqlookupd.logf(LOG_INFO, "DB: adding channel(%s) in topic(%s)", channelName, topicName) + s.nsqlookupd.logf(LOG_INFO, "DB: adding channel(%s) in topic(%s)", channelName, topicName) key := Registration{"channel", topicName, channelName} - s.ctx.nsqlookupd.DB.AddRegistration(key) + s.nsqlookupd.DB.AddRegistration(key) - s.ctx.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName) + s.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName) key = Registration{"topic", topicName, ""} - s.ctx.nsqlookupd.DB.AddRegistration(key) + s.nsqlookupd.DB.AddRegistration(key) return nil, nil } @@ -236,14 +236,14 @@ func (s *httpServer) doDeleteChannel(w http.ResponseWriter, req *http.Request, p return nil, http_api.Err{400, err.Error()} } - registrations := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, channelName) + registrations := s.nsqlookupd.DB.FindRegistrations("channel", topicName, channelName) if len(registrations) == 0 { return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"} } - s.ctx.nsqlookupd.logf(LOG_INFO, "DB: removing channel(%s) from topic(%s)", channelName, topicName) + s.nsqlookupd.logf(LOG_INFO, "DB: removing channel(%s) from topic(%s)", channelName, topicName) for _, registration := range registrations { - s.ctx.nsqlookupd.DB.RemoveRegistration(registration) + s.nsqlookupd.DB.RemoveRegistration(registration) } return nil, nil @@ -262,25 +262,25 @@ type node struct { func (s *httpServer) doNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { // dont filter out tombstoned nodes - producers := s.ctx.nsqlookupd.DB.FindProducers("client", "", "").FilterByActive( - s.ctx.nsqlookupd.opts.InactiveProducerTimeout, 0) + producers := s.nsqlookupd.DB.FindProducers("client", "", "").FilterByActive( + s.nsqlookupd.opts.InactiveProducerTimeout, 0) nodes := make([]*node, len(producers)) topicProducersMap := make(map[string]Producers) for i, p := range producers { - topics := s.ctx.nsqlookupd.DB.LookupRegistrations(p.peerInfo.id).Filter("topic", "*", "").Keys() + topics := s.nsqlookupd.DB.LookupRegistrations(p.peerInfo.id).Filter("topic", "*", "").Keys() // for each topic find the producer that matches this peer // to add tombstone information tombstones := make([]bool, len(topics)) for j, t := range topics { if _, exists := topicProducersMap[t]; !exists { - topicProducersMap[t] = s.ctx.nsqlookupd.DB.FindProducers("topic", t, "") + topicProducersMap[t] = s.nsqlookupd.DB.FindProducers("topic", t, "") } topicProducers := topicProducersMap[t] for _, tp := range topicProducers { if tp.peerInfo == p.peerInfo { - tombstones[j] = tp.IsTombstoned(s.ctx.nsqlookupd.opts.TombstoneLifetime) + tombstones[j] = tp.IsTombstoned(s.nsqlookupd.opts.TombstoneLifetime) break } } @@ -304,11 +304,11 @@ func (s *httpServer) doNodes(w http.ResponseWriter, req *http.Request, ps httpro } func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { - s.ctx.nsqlookupd.DB.RLock() - defer s.ctx.nsqlookupd.DB.RUnlock() + s.nsqlookupd.DB.RLock() + defer s.nsqlookupd.DB.RUnlock() data := make(map[string][]map[string]interface{}) - for r, producers := range s.ctx.nsqlookupd.DB.registrationMap { + for r, producers := range s.nsqlookupd.DB.registrationMap { key := r.Category + ":" + r.Key + ":" + r.SubKey for _, p := range producers { m := map[string]interface{}{ diff --git a/nsqlookupd/lookup_protocol_v1.go b/nsqlookupd/lookup_protocol_v1.go index dbeb78c2a..1236833ce 100644 --- a/nsqlookupd/lookup_protocol_v1.go +++ b/nsqlookupd/lookup_protocol_v1.go @@ -18,7 +18,7 @@ import ( ) type LookupProtocolV1 struct { - ctx *Context + nsqlookupd *NSQLookupd } func (p *LookupProtocolV1) IOLoop(conn net.Conn) error { @@ -43,11 +43,11 @@ func (p *LookupProtocolV1) IOLoop(conn net.Conn) error { if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil { ctx = " - " + parentErr.Error() } - p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx) + p.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx) _, sendErr := protocol.SendResponse(client, []byte(err.Error())) if sendErr != nil { - p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx) + p.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx) break } @@ -67,12 +67,12 @@ func (p *LookupProtocolV1) IOLoop(conn net.Conn) error { } conn.Close() - p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): closing", client) + p.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): closing", client) if client.peerInfo != nil { - registrations := p.ctx.nsqlookupd.DB.LookupRegistrations(client.peerInfo.id) + registrations := p.nsqlookupd.DB.LookupRegistrations(client.peerInfo.id) for _, r := range registrations { - if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed { - p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", + if removed, _ := p.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed { + p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", client, r.Category, r.Key, r.SubKey) } } @@ -128,14 +128,14 @@ func (p *LookupProtocolV1) REGISTER(client *ClientV1, reader *bufio.Reader, para if channel != "" { key := Registration{"channel", topic, channel} - if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) { - p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", + if p.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) { + p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "channel", topic, channel) } } key := Registration{"topic", topic, ""} - if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) { - p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", + if p.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) { + p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "topic", topic, "") } @@ -154,37 +154,37 @@ func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, pa if channel != "" { key := Registration{"channel", topic, channel} - removed, left := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id) + removed, left := p.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id) if removed { - p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", + p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", client, "channel", topic, channel) } // for ephemeral channels, remove the channel as well if it has no producers if left == 0 && strings.HasSuffix(channel, "#ephemeral") { - p.ctx.nsqlookupd.DB.RemoveRegistration(key) + p.nsqlookupd.DB.RemoveRegistration(key) } } else { // no channel was specified so this is a topic unregistration // remove all of the channel registrations... // normally this shouldn't happen which is why we print a warning message // if anything is actually removed - registrations := p.ctx.nsqlookupd.DB.FindRegistrations("channel", topic, "*") + registrations := p.nsqlookupd.DB.FindRegistrations("channel", topic, "*") for _, r := range registrations { - removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id) + removed, _ := p.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id) if removed { - p.ctx.nsqlookupd.logf(LOG_WARN, "client(%s) unexpected UNREGISTER category:%s key:%s subkey:%s", + p.nsqlookupd.logf(LOG_WARN, "client(%s) unexpected UNREGISTER category:%s key:%s subkey:%s", client, "channel", topic, r.SubKey) } } key := Registration{"topic", topic, ""} - removed, left := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id) + removed, left := p.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id) if removed { - p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", + p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", client, "topic", topic, "") } if left == 0 && strings.HasSuffix(topic, "#ephemeral") { - p.ctx.nsqlookupd.DB.RemoveRegistration(key) + p.nsqlookupd.DB.RemoveRegistration(key) } } @@ -226,29 +226,29 @@ func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, para atomic.StoreInt64(&peerInfo.lastUpdate, time.Now().UnixNano()) - p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): IDENTIFY Address:%s TCP:%d HTTP:%d Version:%s", + p.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): IDENTIFY Address:%s TCP:%d HTTP:%d Version:%s", client, peerInfo.BroadcastAddress, peerInfo.TCPPort, peerInfo.HTTPPort, peerInfo.Version) client.peerInfo = &peerInfo - if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) { - p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "") + if p.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) { + p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "") } // build a response data := make(map[string]interface{}) - data["tcp_port"] = p.ctx.nsqlookupd.RealTCPAddr().Port - data["http_port"] = p.ctx.nsqlookupd.RealHTTPAddr().Port + data["tcp_port"] = p.nsqlookupd.RealTCPAddr().Port + data["http_port"] = p.nsqlookupd.RealHTTPAddr().Port data["version"] = version.Binary hostname, err := os.Hostname() if err != nil { log.Fatalf("ERROR: unable to get hostname %s", err) } - data["broadcast_address"] = p.ctx.nsqlookupd.opts.BroadcastAddress + data["broadcast_address"] = p.nsqlookupd.opts.BroadcastAddress data["hostname"] = hostname response, err := json.Marshal(data) if err != nil { - p.ctx.nsqlookupd.logf(LOG_ERROR, "marshaling %v", data) + p.nsqlookupd.logf(LOG_ERROR, "marshaling %v", data) return []byte("OK"), nil } return response, nil @@ -259,7 +259,7 @@ func (p *LookupProtocolV1) PING(client *ClientV1, params []string) ([]byte, erro // we could get a PING before other commands on the same client connection cur := time.Unix(0, atomic.LoadInt64(&client.peerInfo.lastUpdate)) now := time.Now() - p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): pinged (last ping %s)", client.peerInfo.id, + p.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): pinged (last ping %s)", client.peerInfo.id, now.Sub(cur)) atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano()) } diff --git a/nsqlookupd/lookup_protocol_v1_test.go b/nsqlookupd/lookup_protocol_v1_test.go index b3cfca60d..86aadefc7 100644 --- a/nsqlookupd/lookup_protocol_v1_test.go +++ b/nsqlookupd/lookup_protocol_v1_test.go @@ -38,14 +38,14 @@ func testIOLoopReturnsClientErr(t *testing.T, fakeConn test.FakeNetConn) { nsqlookupd, err := New(opts) test.Nil(t, err) - prot := &LookupProtocolV1{ctx: &Context{nsqlookupd: nsqlookupd}} + prot := &LookupProtocolV1{nsqlookupd: nsqlookupd} - nsqlookupd.tcpServer = &tcpServer{ctx: prot.ctx} + nsqlookupd.tcpServer = &tcpServer{nsqlookupd: prot.nsqlookupd} errChan := make(chan error) testIOLoop := func() { errChan <- prot.IOLoop(fakeConn) - defer prot.ctx.nsqlookupd.Exit() + defer prot.nsqlookupd.Exit() } go testIOLoop() diff --git a/nsqlookupd/nsqlookupd.go b/nsqlookupd/nsqlookupd.go index 4fc85db7e..d56426f57 100644 --- a/nsqlookupd/nsqlookupd.go +++ b/nsqlookupd/nsqlookupd.go @@ -51,8 +51,6 @@ func New(opts *Options) (*NSQLookupd, error) { // Main starts an instance of nsqlookupd and returns an // error if there was a problem starting up. func (l *NSQLookupd) Main() error { - ctx := &Context{l} - exitCh := make(chan error) var once sync.Once exitFunc := func(err error) { @@ -64,11 +62,11 @@ func (l *NSQLookupd) Main() error { }) } - l.tcpServer = &tcpServer{ctx: ctx} + l.tcpServer = &tcpServer{nsqlookupd: l} l.waitGroup.Wrap(func() { exitFunc(protocol.TCPServer(l.tcpListener, l.tcpServer, l.logf)) }) - httpServer := newHTTPServer(ctx) + httpServer := newHTTPServer(l) l.waitGroup.Wrap(func() { exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.logf)) }) diff --git a/nsqlookupd/tcp.go b/nsqlookupd/tcp.go index 24b00de89..405d6c6cb 100644 --- a/nsqlookupd/tcp.go +++ b/nsqlookupd/tcp.go @@ -9,12 +9,12 @@ import ( ) type tcpServer struct { - ctx *Context - conns sync.Map + nsqlookupd *NSQLookupd + conns sync.Map } func (p *tcpServer) Handle(clientConn net.Conn) { - p.ctx.nsqlookupd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr()) + p.nsqlookupd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr()) // The client should initialize itself by sending a 4 byte sequence indicating // the version of the protocol that it intends to communicate, this will allow us @@ -22,23 +22,23 @@ func (p *tcpServer) Handle(clientConn net.Conn) { buf := make([]byte, 4) _, err := io.ReadFull(clientConn, buf) if err != nil { - p.ctx.nsqlookupd.logf(LOG_ERROR, "failed to read protocol version - %s", err) + p.nsqlookupd.logf(LOG_ERROR, "failed to read protocol version - %s", err) clientConn.Close() return } protocolMagic := string(buf) - p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'", + p.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) var prot protocol.Protocol switch protocolMagic { case " V1": - prot = &LookupProtocolV1{ctx: p.ctx} + prot = &LookupProtocolV1{nsqlookupd: p.nsqlookupd} default: protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL")) clientConn.Close() - p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'", + p.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) return } @@ -47,7 +47,7 @@ func (p *tcpServer) Handle(clientConn net.Conn) { err = prot.IOLoop(clientConn) if err != nil { - p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err) + p.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err) } p.conns.Delete(clientConn.RemoteAddr())