diff --git a/nsqadmin/http.go b/nsqadmin/http.go index 90fbe165d..30ba9f2fb 100644 --- a/nsqadmin/http.go +++ b/nsqadmin/http.go @@ -769,8 +769,7 @@ func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httpr if err != nil { return nil, http_api.Err{400, "INVALID_VALUE"} } - opts.LogLevel = logLevelStr - opts.logLevel = logLevel + opts.LogLevel = logLevel default: return nil, http_api.Err{400, "INVALID_OPTION"} } diff --git a/nsqadmin/http_test.go b/nsqadmin/http_test.go index 6ffbf6d97..9dc20960c 100644 --- a/nsqadmin/http_test.go +++ b/nsqadmin/http_test.go @@ -49,8 +49,16 @@ type ChannelStatsDoc struct { func mustStartNSQLookupd(opts *nsqlookupd.Options) (*net.TCPAddr, *net.TCPAddr, *nsqlookupd.NSQLookupd) { opts.TCPAddress = "127.0.0.1:0" opts.HTTPAddress = "127.0.0.1:0" - lookupd := nsqlookupd.New(opts) - lookupd.Main() + lookupd, err := nsqlookupd.New(opts) + if err != nil { + panic(err) + } + go func() { + err := lookupd.Main() + if err != nil { + panic(err) + } + }() return lookupd.RealTCPAddr(), lookupd.RealHTTPAddr(), lookupd } @@ -66,8 +74,16 @@ func bootstrapNSQClusterWithAuth(t *testing.T, withAuth bool) (string, []*nsqd.N nsqlookupdOpts.HTTPAddress = "127.0.0.1:0" nsqlookupdOpts.BroadcastAddress = "127.0.0.1" nsqlookupdOpts.Logger = lgr - nsqlookupd1 := nsqlookupd.New(nsqlookupdOpts) - nsqlookupd1.Main() + nsqlookupd1, err := nsqlookupd.New(nsqlookupdOpts) + if err != nil { + panic(err) + } + go func() { + err := nsqlookupd1.Main() + if err != nil { + panic(err) + } + }() time.Sleep(100 * time.Millisecond) @@ -82,8 +98,16 @@ func bootstrapNSQClusterWithAuth(t *testing.T, withAuth bool) (string, []*nsqd.N panic(err) } nsqdOpts.DataPath = tmpDir - nsqd1 := nsqd.New(nsqdOpts) - nsqd1.Main() + nsqd1, err := nsqd.New(nsqdOpts) + if err != nil { + panic(err) + } + go func() { + err := nsqd1.Main() + if err != nil { + panic(err) + } + }() nsqadminOpts := NewOptions() nsqadminOpts.HTTPAddress = "127.0.0.1:0" @@ -92,8 +116,16 @@ func bootstrapNSQClusterWithAuth(t *testing.T, withAuth bool) (string, []*nsqd.N if withAuth { nsqadminOpts.AdminUsers = []string{"matt"} } - nsqadmin1 := New(nsqadminOpts) - nsqadmin1.Main() + nsqadmin1, err := New(nsqadminOpts) + if err != nil { + panic(err) + } + go func() { + err := nsqadmin1.Main() + if err != nil { + panic(err) + } + }() time.Sleep(100 * time.Millisecond) @@ -573,7 +605,7 @@ func TestHTTPconfig(t *testing.T) { defer resp.Body.Close() body, _ = ioutil.ReadAll(resp.Body) test.Equal(t, 200, resp.StatusCode) - test.Equal(t, LOG_FATAL, nsqadmin1.getOpts().logLevel) + test.Equal(t, LOG_FATAL, nsqadmin1.getOpts().LogLevel) url = fmt.Sprintf("http://%s/config/log_level", nsqadmin1.RealHTTPAddr()) req, err = http.NewRequest("PUT", url, bytes.NewBuffer([]byte(`bad`))) @@ -591,8 +623,14 @@ func TestHTTPconfigCIDR(t *testing.T) { opts.NSQLookupdHTTPAddresses = []string{"127.0.0.1:4161"} opts.Logger = test.NewTestLogger(t) opts.AllowConfigFromCIDR = "10.0.0.0/8" - nsqadmin := New(opts) - nsqadmin.Main() + nsqadmin, err := New(opts) + test.Nil(t, err) + go func() { + err := nsqadmin.Main() + if err != nil { + panic(err) + } + }() defer nsqadmin.Exit() time.Sleep(100 * time.Millisecond) diff --git a/nsqadmin/logger.go b/nsqadmin/logger.go index 90c123ba1..3b8160556 100644 --- a/nsqadmin/logger.go +++ b/nsqadmin/logger.go @@ -16,5 +16,5 @@ const ( func (n *NSQAdmin) logf(level lg.LogLevel, f string, args ...interface{}) { opts := n.getOpts() - lg.Logf(opts.Logger, opts.logLevel, level, f, args...) + lg.Logf(opts.Logger, opts.LogLevel, level, f, args...) } diff --git a/nsqadmin/nsqadmin.go b/nsqadmin/nsqadmin.go index e3452c3fd..0c5f81e99 100644 --- a/nsqadmin/nsqadmin.go +++ b/nsqadmin/nsqadmin.go @@ -5,6 +5,8 @@ import ( "crypto/tls" "crypto/x509" "encoding/json" + "errors" + "fmt" "io/ioutil" "log" "net" @@ -16,7 +18,6 @@ import ( "sync/atomic" "github.com/nsqio/nsq/internal/http_api" - "github.com/nsqio/nsq/internal/lg" "github.com/nsqio/nsq/internal/util" "github.com/nsqio/nsq/internal/version" ) @@ -31,7 +32,7 @@ type NSQAdmin struct { httpClientTLSConfig *tls.Config } -func New(opts *Options) *NSQAdmin { +func New(opts *Options) (*NSQAdmin, error) { if opts.Logger == nil { opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds) } @@ -41,41 +42,20 @@ func New(opts *Options) *NSQAdmin { } n.swapOpts(opts) - var err error - opts.logLevel, err = lg.ParseLogLevel(opts.LogLevel) - if err != nil { - n.logf(LOG_FATAL, "%s", err) - os.Exit(1) - } - if len(opts.NSQDHTTPAddresses) == 0 && len(opts.NSQLookupdHTTPAddresses) == 0 { - n.logf(LOG_FATAL, "--nsqd-http-address or --lookupd-http-address required.") - os.Exit(1) + return nil, errors.New("--nsqd-http-address or --lookupd-http-address required") } if len(opts.NSQDHTTPAddresses) != 0 && len(opts.NSQLookupdHTTPAddresses) != 0 { - n.logf(LOG_FATAL, "use --nsqd-http-address or --lookupd-http-address not both") - os.Exit(1) - } - - // verify that the supplied address is valid - verifyAddress := func(arg string, address string) *net.TCPAddr { - addr, err := net.ResolveTCPAddr("tcp", address) - if err != nil { - n.logf(LOG_FATAL, "failed to resolve %s address (%s) - %s", arg, address, err) - os.Exit(1) - } - return addr + return nil, errors.New("use --nsqd-http-address or --lookupd-http-address not both") } if opts.HTTPClientTLSCert != "" && opts.HTTPClientTLSKey == "" { - n.logf(LOG_FATAL, "--http-client-tls-key must be specified with --http-client-tls-cert") - os.Exit(1) + return nil, errors.New("--http-client-tls-key must be specified with --http-client-tls-cert") } if opts.HTTPClientTLSKey != "" && opts.HTTPClientTLSCert == "" { - n.logf(LOG_FATAL, "--http-client-tls-cert must be specified with --http-client-tls-key") - os.Exit(1) + return nil, errors.New("--http-client-tls-cert must be specified with --http-client-tls-key") } n.httpClientTLSConfig = &tls.Config{ @@ -84,9 +64,8 @@ func New(opts *Options) *NSQAdmin { if opts.HTTPClientTLSCert != "" && opts.HTTPClientTLSKey != "" { cert, err := tls.LoadX509KeyPair(opts.HTTPClientTLSCert, opts.HTTPClientTLSKey) if err != nil { - n.logf(LOG_FATAL, "failed to LoadX509KeyPair %s, %s - %s", + return nil, fmt.Errorf("failed to LoadX509KeyPair %s, %s - %s", opts.HTTPClientTLSCert, opts.HTTPClientTLSKey, err) - os.Exit(1) } n.httpClientTLSConfig.Certificates = []tls.Certificate{cert} } @@ -94,30 +73,33 @@ func New(opts *Options) *NSQAdmin { tlsCertPool := x509.NewCertPool() caCertFile, err := ioutil.ReadFile(opts.HTTPClientTLSRootCAFile) if err != nil { - n.logf(LOG_FATAL, "failed to read TLS root CA file %s - %s", + return nil, fmt.Errorf("failed to read TLS root CA file %s - %s", opts.HTTPClientTLSRootCAFile, err) - os.Exit(1) } if !tlsCertPool.AppendCertsFromPEM(caCertFile) { - n.logf(LOG_FATAL, "failed to AppendCertsFromPEM %s", opts.HTTPClientTLSRootCAFile) - os.Exit(1) + return nil, fmt.Errorf("failed to AppendCertsFromPEM %s", opts.HTTPClientTLSRootCAFile) } n.httpClientTLSConfig.RootCAs = tlsCertPool } - // require that both the hostname and port be specified for _, address := range opts.NSQLookupdHTTPAddresses { - verifyAddress("--lookupd-http-address", address) + _, err := net.ResolveTCPAddr("tcp", address) + if err != nil { + return nil, fmt.Errorf("failed to resolve --lookupd-http-address (%s) - %s", address, err) + } } for _, address := range opts.NSQDHTTPAddresses { - verifyAddress("--nsqd-http-address", address) + _, err := net.ResolveTCPAddr("tcp", address) + if err != nil { + return nil, fmt.Errorf("failed to resolve --nsqd-tcp-address (%s) - %s", address, err) + } } if opts.ProxyGraphite { url, err := url.Parse(opts.GraphiteURL) if err != nil { - n.logf(LOG_FATAL, "failed to parse --graphite-url='%s' - %s", opts.GraphiteURL, err) + return nil, fmt.Errorf("failed to parse --graphite-url (%s) - %s", opts.GraphiteURL, err) os.Exit(1) } n.graphiteURL = url @@ -126,8 +108,7 @@ func New(opts *Options) *NSQAdmin { if opts.AllowConfigFromCIDR != "" { _, _, err := net.ParseCIDR(opts.AllowConfigFromCIDR) if err != nil { - n.logf(LOG_FATAL, "failed to parse --allow-config-from-cidr='%s' - %s", opts.AllowConfigFromCIDR, err) - os.Exit(1) + return nil, fmt.Errorf("failed to parse --allow-config-from-cidr (%s) - %s", opts.AllowConfigFromCIDR, err) } } @@ -135,7 +116,13 @@ func New(opts *Options) *NSQAdmin { n.logf(LOG_INFO, version.String("nsqadmin")) - return n + var err error + n.httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress) + if err != nil { + return nil, fmt.Errorf("listen (%s) failed - %s", n.getOpts().HTTPAddress, err) + } + + return n, nil } func normalizeBasePath(p string) string { @@ -180,22 +167,32 @@ func (n *NSQAdmin) handleAdminActions() { } } -func (n *NSQAdmin) Main() { - httpListener, err := net.Listen("tcp", n.getOpts().HTTPAddress) - if err != nil { - n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPAddress, err) - os.Exit(1) +func (n *NSQAdmin) Main() error { + exitCh := make(chan error) + var once sync.Once + exitFunc := func(err error) { + once.Do(func() { + if err != nil { + n.logf(LOG_FATAL, "%s", err) + } + exitCh <- err + }) } - n.httpListener = httpListener + httpServer := NewHTTPServer(&Context{n}) n.waitGroup.Wrap(func() { - http_api.Serve(n.httpListener, http_api.CompressHandler(httpServer), "HTTP", n.logf) + exitFunc(http_api.Serve(n.httpListener, http_api.CompressHandler(httpServer), "HTTP", n.logf)) }) n.waitGroup.Wrap(n.handleAdminActions) + + err := <-exitCh + return err } func (n *NSQAdmin) Exit() { - n.httpListener.Close() + if n.httpListener != nil { + n.httpListener.Close() + } close(n.notifications) n.waitGroup.Wait() } diff --git a/nsqadmin/nsqadmin_test.go b/nsqadmin/nsqadmin_test.go index 6220e2826..fe19e2b1a 100644 --- a/nsqadmin/nsqadmin_test.go +++ b/nsqadmin/nsqadmin_test.go @@ -7,7 +7,6 @@ import ( "net/http" "net/url" "os" - "os/exec" "testing" "github.com/nsqio/nsq/internal/lg" @@ -16,41 +15,23 @@ import ( ) func TestNeitherNSQDAndNSQLookup(t *testing.T) { - if os.Getenv("BE_CRASHER") == "1" { - opts := NewOptions() - opts.Logger = lg.NilLogger{} - opts.HTTPAddress = "127.0.0.1:0" - New(opts) - return - } - cmd := exec.Command(os.Args[0], "-test.run=TestNeitherNSQDAndNSQLookup") - cmd.Env = append(os.Environ(), "BE_CRASHER=1") - err := cmd.Run() - test.Equal(t, "exit status 1", fmt.Sprintf("%v", err)) - if e, ok := err.(*exec.ExitError); ok && !e.Success() { - return - } - t.Fatalf("process ran with err %v, want exit status 1", err) + opts := NewOptions() + opts.Logger = lg.NilLogger{} + opts.HTTPAddress = "127.0.0.1:0" + _, err := New(opts) + test.NotNil(t, err) + test.Equal(t, "--nsqd-http-address or --lookupd-http-address required", fmt.Sprintf("%s", err)) } func TestBothNSQDAndNSQLookup(t *testing.T) { - if os.Getenv("BE_CRASHER") == "1" { - opts := NewOptions() - opts.Logger = lg.NilLogger{} - opts.HTTPAddress = "127.0.0.1:0" - opts.NSQLookupdHTTPAddresses = []string{"127.0.0.1:4161"} - opts.NSQDHTTPAddresses = []string{"127.0.0.1:4151"} - New(opts) - return - } - cmd := exec.Command(os.Args[0], "-test.run=TestBothNSQDAndNSQLookup") - cmd.Env = append(os.Environ(), "BE_CRASHER=1") - err := cmd.Run() - test.Equal(t, "exit status 1", fmt.Sprintf("%v", err)) - if e, ok := err.(*exec.ExitError); ok && !e.Success() { - return - } - t.Fatalf("process ran with err %v, want exit status 1", err) + opts := NewOptions() + opts.Logger = lg.NilLogger{} + opts.HTTPAddress = "127.0.0.1:0" + opts.NSQLookupdHTTPAddresses = []string{"127.0.0.1:4161"} + opts.NSQDHTTPAddresses = []string{"127.0.0.1:4151"} + _, err := New(opts) + test.NotNil(t, err) + test.Equal(t, "use --nsqd-http-address or --lookupd-http-address not both", fmt.Sprintf("%s", err)) } func TestTLSHTTPClient(t *testing.T) { @@ -73,8 +54,14 @@ func TestTLSHTTPClient(t *testing.T) { opts.HTTPClientTLSCert = "./test/client.pem" opts.HTTPClientTLSKey = "./test/client.key" opts.Logger = lgr - nsqadmin := New(opts) - nsqadmin.Main() + nsqadmin, err := New(opts) + test.Nil(t, err) + go func() { + err := nsqadmin.Main() + if err != nil { + panic(err) + } + }() defer nsqadmin.Exit() httpAddr := nsqadmin.RealHTTPAddr() @@ -102,25 +89,15 @@ func mustStartNSQD(opts *nsqd.Options) (*net.TCPAddr, *net.TCPAddr, *nsqd.NSQD) } opts.DataPath = tmpDir } - nsqd := nsqd.New(opts) - nsqd.Main() - return nsqd.RealTCPAddr(), nsqd.RealHTTPAddr(), nsqd -} - -func TestCrashingLogger(t *testing.T) { - if os.Getenv("BE_CRASHER") == "1" { - // Test invalid log level causes error - opts := NewOptions() - opts.LogLevel = "bad" - opts.NSQLookupdHTTPAddresses = []string{"127.0.0.1:4161"} - _ = New(opts) - return - } - cmd := exec.Command(os.Args[0], "-test.run=TestCrashingLogger") - cmd.Env = append(os.Environ(), "BE_CRASHER=1") - err := cmd.Run() - if e, ok := err.(*exec.ExitError); ok && !e.Success() { - return + nsqd, err := nsqd.New(opts) + if err != nil { + panic(err) } - t.Fatalf("process ran with err %v, want exit status 1", err) + go func() { + err := nsqd.Main() + if err != nil { + panic(err) + } + }() + return nsqd.RealTCPAddr(), nsqd.RealHTTPAddr(), nsqd } diff --git a/nsqadmin/options.go b/nsqadmin/options.go index 50d00b62b..8100af881 100644 --- a/nsqadmin/options.go +++ b/nsqadmin/options.go @@ -7,11 +7,9 @@ import ( ) type Options struct { - LogLevel string `flag:"log-level"` - LogPrefix string `flag:"log-prefix"` - Verbose bool `flag:"verbose"` // for backwards compatibility + LogLevel lg.LogLevel `flag:"log-level"` + LogPrefix string `flag:"log-prefix"` Logger Logger - logLevel lg.LogLevel // private, not really an option HTTPAddress string `flag:"http-address"` BasePath string `flag:"base-path"` @@ -47,7 +45,7 @@ type Options struct { func NewOptions() *Options { return &Options{ LogPrefix: "[nsqadmin] ", - LogLevel: "info", + LogLevel: lg.INFO, HTTPAddress: "0.0.0.0:4171", BasePath: "/", StatsdPrefix: "nsq.%s",