Skip to content

Commit

Permalink
nsqadmin meh
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Mar 1, 2019
1 parent 8ba8e4f commit 69be08a
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 122 deletions.
3 changes: 1 addition & 2 deletions nsqadmin/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,8 +769,7 @@ func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httpr
if err != nil {
return nil, http_api.Err{400, "INVALID_VALUE"}
}
opts.LogLevel = logLevelStr
opts.logLevel = logLevel
opts.LogLevel = logLevel
default:
return nil, http_api.Err{400, "INVALID_OPTION"}
}
Expand Down
60 changes: 49 additions & 11 deletions nsqadmin/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,16 @@ type ChannelStatsDoc struct {
func mustStartNSQLookupd(opts *nsqlookupd.Options) (*net.TCPAddr, *net.TCPAddr, *nsqlookupd.NSQLookupd) {
opts.TCPAddress = "127.0.0.1:0"
opts.HTTPAddress = "127.0.0.1:0"
lookupd := nsqlookupd.New(opts)
lookupd.Main()
lookupd, err := nsqlookupd.New(opts)
if err != nil {
panic(err)
}
go func() {
err := lookupd.Main()
if err != nil {
panic(err)
}
}()
return lookupd.RealTCPAddr(), lookupd.RealHTTPAddr(), lookupd
}

Expand All @@ -66,8 +74,16 @@ func bootstrapNSQClusterWithAuth(t *testing.T, withAuth bool) (string, []*nsqd.N
nsqlookupdOpts.HTTPAddress = "127.0.0.1:0"
nsqlookupdOpts.BroadcastAddress = "127.0.0.1"
nsqlookupdOpts.Logger = lgr
nsqlookupd1 := nsqlookupd.New(nsqlookupdOpts)
nsqlookupd1.Main()
nsqlookupd1, err := nsqlookupd.New(nsqlookupdOpts)
if err != nil {
panic(err)
}
go func() {
err := nsqlookupd1.Main()
if err != nil {
panic(err)
}
}()

time.Sleep(100 * time.Millisecond)

Expand All @@ -82,8 +98,16 @@ func bootstrapNSQClusterWithAuth(t *testing.T, withAuth bool) (string, []*nsqd.N
panic(err)
}
nsqdOpts.DataPath = tmpDir
nsqd1 := nsqd.New(nsqdOpts)
nsqd1.Main()
nsqd1, err := nsqd.New(nsqdOpts)
if err != nil {
panic(err)
}
go func() {
err := nsqd1.Main()
if err != nil {
panic(err)
}
}()

nsqadminOpts := NewOptions()
nsqadminOpts.HTTPAddress = "127.0.0.1:0"
Expand All @@ -92,8 +116,16 @@ func bootstrapNSQClusterWithAuth(t *testing.T, withAuth bool) (string, []*nsqd.N
if withAuth {
nsqadminOpts.AdminUsers = []string{"matt"}
}
nsqadmin1 := New(nsqadminOpts)
nsqadmin1.Main()
nsqadmin1, err := New(nsqadminOpts)
if err != nil {
panic(err)
}
go func() {
err := nsqadmin1.Main()
if err != nil {
panic(err)
}
}()

time.Sleep(100 * time.Millisecond)

Expand Down Expand Up @@ -573,7 +605,7 @@ func TestHTTPconfig(t *testing.T) {
defer resp.Body.Close()
body, _ = ioutil.ReadAll(resp.Body)
test.Equal(t, 200, resp.StatusCode)
test.Equal(t, LOG_FATAL, nsqadmin1.getOpts().logLevel)
test.Equal(t, LOG_FATAL, nsqadmin1.getOpts().LogLevel)

url = fmt.Sprintf("http://%s/config/log_level", nsqadmin1.RealHTTPAddr())
req, err = http.NewRequest("PUT", url, bytes.NewBuffer([]byte(`bad`)))
Expand All @@ -591,8 +623,14 @@ func TestHTTPconfigCIDR(t *testing.T) {
opts.NSQLookupdHTTPAddresses = []string{"127.0.0.1:4161"}
opts.Logger = test.NewTestLogger(t)
opts.AllowConfigFromCIDR = "10.0.0.0/8"
nsqadmin := New(opts)
nsqadmin.Main()
nsqadmin, err := New(opts)
test.Nil(t, err)
go func() {
err := nsqadmin.Main()
if err != nil {
panic(err)
}
}()
defer nsqadmin.Exit()

time.Sleep(100 * time.Millisecond)
Expand Down
2 changes: 1 addition & 1 deletion nsqadmin/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ const (

func (n *NSQAdmin) logf(level lg.LogLevel, f string, args ...interface{}) {
opts := n.getOpts()
lg.Logf(opts.Logger, opts.logLevel, level, f, args...)
lg.Logf(opts.Logger, opts.LogLevel, level, f, args...)
}
93 changes: 45 additions & 48 deletions nsqadmin/nsqadmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net"
Expand All @@ -16,7 +18,6 @@ import (
"sync/atomic"

"github.com/nsqio/nsq/internal/http_api"
"github.com/nsqio/nsq/internal/lg"
"github.com/nsqio/nsq/internal/util"
"github.com/nsqio/nsq/internal/version"
)
Expand All @@ -31,7 +32,7 @@ type NSQAdmin struct {
httpClientTLSConfig *tls.Config
}

func New(opts *Options) *NSQAdmin {
func New(opts *Options) (*NSQAdmin, error) {
if opts.Logger == nil {
opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
}
Expand All @@ -41,41 +42,20 @@ func New(opts *Options) *NSQAdmin {
}
n.swapOpts(opts)

var err error
opts.logLevel, err = lg.ParseLogLevel(opts.LogLevel)
if err != nil {
n.logf(LOG_FATAL, "%s", err)
os.Exit(1)
}

if len(opts.NSQDHTTPAddresses) == 0 && len(opts.NSQLookupdHTTPAddresses) == 0 {
n.logf(LOG_FATAL, "--nsqd-http-address or --lookupd-http-address required.")
os.Exit(1)
return nil, errors.New("--nsqd-http-address or --lookupd-http-address required")
}

if len(opts.NSQDHTTPAddresses) != 0 && len(opts.NSQLookupdHTTPAddresses) != 0 {
n.logf(LOG_FATAL, "use --nsqd-http-address or --lookupd-http-address not both")
os.Exit(1)
}

// verify that the supplied address is valid
verifyAddress := func(arg string, address string) *net.TCPAddr {
addr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
n.logf(LOG_FATAL, "failed to resolve %s address (%s) - %s", arg, address, err)
os.Exit(1)
}
return addr
return nil, errors.New("use --nsqd-http-address or --lookupd-http-address not both")
}

if opts.HTTPClientTLSCert != "" && opts.HTTPClientTLSKey == "" {
n.logf(LOG_FATAL, "--http-client-tls-key must be specified with --http-client-tls-cert")
os.Exit(1)
return nil, errors.New("--http-client-tls-key must be specified with --http-client-tls-cert")
}

if opts.HTTPClientTLSKey != "" && opts.HTTPClientTLSCert == "" {
n.logf(LOG_FATAL, "--http-client-tls-cert must be specified with --http-client-tls-key")
os.Exit(1)
return nil, errors.New("--http-client-tls-cert must be specified with --http-client-tls-key")
}

n.httpClientTLSConfig = &tls.Config{
Expand All @@ -84,40 +64,42 @@ func New(opts *Options) *NSQAdmin {
if opts.HTTPClientTLSCert != "" && opts.HTTPClientTLSKey != "" {
cert, err := tls.LoadX509KeyPair(opts.HTTPClientTLSCert, opts.HTTPClientTLSKey)
if err != nil {
n.logf(LOG_FATAL, "failed to LoadX509KeyPair %s, %s - %s",
return nil, fmt.Errorf("failed to LoadX509KeyPair %s, %s - %s",
opts.HTTPClientTLSCert, opts.HTTPClientTLSKey, err)
os.Exit(1)
}
n.httpClientTLSConfig.Certificates = []tls.Certificate{cert}
}
if opts.HTTPClientTLSRootCAFile != "" {
tlsCertPool := x509.NewCertPool()
caCertFile, err := ioutil.ReadFile(opts.HTTPClientTLSRootCAFile)
if err != nil {
n.logf(LOG_FATAL, "failed to read TLS root CA file %s - %s",
return nil, fmt.Errorf("failed to read TLS root CA file %s - %s",
opts.HTTPClientTLSRootCAFile, err)
os.Exit(1)
}
if !tlsCertPool.AppendCertsFromPEM(caCertFile) {
n.logf(LOG_FATAL, "failed to AppendCertsFromPEM %s", opts.HTTPClientTLSRootCAFile)
os.Exit(1)
return nil, fmt.Errorf("failed to AppendCertsFromPEM %s", opts.HTTPClientTLSRootCAFile)
}
n.httpClientTLSConfig.RootCAs = tlsCertPool
}

// require that both the hostname and port be specified
for _, address := range opts.NSQLookupdHTTPAddresses {
verifyAddress("--lookupd-http-address", address)
_, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return nil, fmt.Errorf("failed to resolve --lookupd-http-address (%s) - %s", address, err)
}
}

for _, address := range opts.NSQDHTTPAddresses {
verifyAddress("--nsqd-http-address", address)
_, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return nil, fmt.Errorf("failed to resolve --nsqd-tcp-address (%s) - %s", address, err)
}
}

if opts.ProxyGraphite {
url, err := url.Parse(opts.GraphiteURL)
if err != nil {
n.logf(LOG_FATAL, "failed to parse --graphite-url='%s' - %s", opts.GraphiteURL, err)
return nil, fmt.Errorf("failed to parse --graphite-url (%s) - %s", opts.GraphiteURL, err)
os.Exit(1)
}
n.graphiteURL = url
Expand All @@ -126,16 +108,21 @@ func New(opts *Options) *NSQAdmin {
if opts.AllowConfigFromCIDR != "" {
_, _, err := net.ParseCIDR(opts.AllowConfigFromCIDR)
if err != nil {
n.logf(LOG_FATAL, "failed to parse --allow-config-from-cidr='%s' - %s", opts.AllowConfigFromCIDR, err)
os.Exit(1)
return nil, fmt.Errorf("failed to parse --allow-config-from-cidr (%s) - %s", opts.AllowConfigFromCIDR, err)
}
}

opts.BasePath = normalizeBasePath(opts.BasePath)

n.logf(LOG_INFO, version.String("nsqadmin"))

return n
var err error
n.httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
if err != nil {
return nil, fmt.Errorf("listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
}

return n, nil
}

func normalizeBasePath(p string) string {
Expand Down Expand Up @@ -180,22 +167,32 @@ func (n *NSQAdmin) handleAdminActions() {
}
}

func (n *NSQAdmin) Main() {
httpListener, err := net.Listen("tcp", n.getOpts().HTTPAddress)
if err != nil {
n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
os.Exit(1)
func (n *NSQAdmin) Main() error {
exitCh := make(chan error)
var once sync.Once
exitFunc := func(err error) {
once.Do(func() {
if err != nil {
n.logf(LOG_FATAL, "%s", err)
}
exitCh <- err
})
}
n.httpListener = httpListener

httpServer := NewHTTPServer(&Context{n})
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpListener, http_api.CompressHandler(httpServer), "HTTP", n.logf)
exitFunc(http_api.Serve(n.httpListener, http_api.CompressHandler(httpServer), "HTTP", n.logf))
})
n.waitGroup.Wrap(n.handleAdminActions)

err := <-exitCh
return err
}

func (n *NSQAdmin) Exit() {
n.httpListener.Close()
if n.httpListener != nil {
n.httpListener.Close()
}
close(n.notifications)
n.waitGroup.Wait()
}
Loading

0 comments on commit 69be08a

Please sign in to comment.