Skip to content

Commit

Permalink
nsqadmin: switch to go-svc; update logging
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Mar 2, 2019
1 parent 9c6da6c commit 30a3c75
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 108 deletions.
139 changes: 88 additions & 51 deletions apps/nsqadmin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,93 +3,130 @@ package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"

"github.com/BurntSushi/toml"
"github.com/judwhite/go-svc/svc"
"github.com/mreiferson/go-options"
"github.com/nsqio/nsq/internal/app"
"github.com/nsqio/nsq/internal/lg"
"github.com/nsqio/nsq/internal/version"
"github.com/nsqio/nsq/nsqadmin"
)

var (
flagSet = flag.NewFlagSet("nsqadmin", flag.ExitOnError)
func nsqadminFlagSet(opts *nsqadmin.Options) *flag.FlagSet {
flagSet := flag.NewFlagSet("nsqadmin", flag.ExitOnError)

basePath = flagSet.String("base-path", "/", "URL base path")
config = flagSet.String("config", "", "path to config file")
showVersion = flagSet.Bool("version", false, "print version string")
flagSet.String("config", "", "path to config file")
flagSet.Bool("version", false, "print version string")

logLevel = flagSet.String("log-level", "info", "set log verbosity: debug, info, warn, error, or fatal")
logPrefix = flagSet.String("log-prefix", "[nsqadmin] ", "log message prefix")
verbose = flagSet.Bool("verbose", false, "deprecated in favor of log-level")
logLevel := opts.LogLevel
flagSet.Var(&logLevel, "log-level", "set log verbosity: debug, info, warn, error, or fatal")
flagSet.String("log-prefix", "[nsqadmin] ", "log message prefix")
flagSet.Bool("verbose", false, "[deprecated] has no effect, use --log-level")

httpAddress = flagSet.String("http-address", "0.0.0.0:4171", "<addr>:<port> to listen on for HTTP clients")
flagSet.String("http-address", opts.HTTPAddress, "<addr>:<port> to listen on for HTTP clients")
flagSet.String("base-path", opts.BasePath, "URL base path")

graphiteURL = flagSet.String("graphite-url", "", "graphite HTTP address")
proxyGraphite = flagSet.Bool("proxy-graphite", false, "proxy HTTP requests to graphite")
flagSet.String("graphite-url", opts.GraphiteURL, "graphite HTTP address")
flagSet.Bool("proxy-graphite", false, "proxy HTTP requests to graphite")

statsdCounterFormat = flagSet.String("statsd-counter-format", "stats.counters.%s.count", "The counter stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string.")
statsdGaugeFormat = flagSet.String("statsd-gauge-format", "stats.gauges.%s", "The gauge stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string.")
statsdPrefix = flagSet.String("statsd-prefix", "nsq.%s", "prefix used for keys sent to statsd (%s for host replacement, must match nsqd)")
statsdInterval = flagSet.Duration("statsd-interval", 60*time.Second, "time interval nsqd is configured to push to statsd (must match nsqd)")
flagSet.String("statsd-counter-format", opts.StatsdCounterFormat, "The counter stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string.")
flagSet.String("statsd-gauge-format", opts.StatsdGaugeFormat, "The gauge stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string.")
flagSet.String("statsd-prefix", opts.StatsdPrefix, "prefix used for keys sent to statsd (%s for host replacement, must match nsqd)")
flagSet.Duration("statsd-interval", opts.StatsdInterval, "time interval nsqd is configured to push to statsd (must match nsqd)")

notificationHTTPEndpoint = flagSet.String("notification-http-endpoint", "", "HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent")
flagSet.String("notification-http-endpoint", "", "HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent")

httpConnectTimeout = flagSet.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect")
httpRequestTimeout = flagSet.Duration("http-client-request-timeout", 5*time.Second, "timeout for HTTP request")
flagSet.Duration("http-client-connect-timeout", opts.HTTPClientConnectTimeout, "timeout for HTTP connect")
flagSet.Duration("http-client-request-timeout", opts.HTTPClientRequestTimeout, "timeout for HTTP request")

httpClientTLSInsecureSkipVerify = flagSet.Bool("http-client-tls-insecure-skip-verify", false, "configure the HTTP client to skip verification of TLS certificates")
httpClientTLSRootCAFile = flagSet.String("http-client-tls-root-ca-file", "", "path to CA file for the HTTP client")
httpClientTLSCert = flagSet.String("http-client-tls-cert", "", "path to certificate file for the HTTP client")
httpClientTLSKey = flagSet.String("http-client-tls-key", "", "path to key file for the HTTP client")
flagSet.Bool("http-client-tls-insecure-skip-verify", false, "configure the HTTP client to skip verification of TLS certificates")
flagSet.String("http-client-tls-root-ca-file", "", "path to CA file for the HTTP client")
flagSet.String("http-client-tls-cert", "", "path to certificate file for the HTTP client")
flagSet.String("http-client-tls-key", "", "path to key file for the HTTP client")

allowConfigFromCIDR = flagSet.String("allow-config-from-cidr", "127.0.0.1/8", "A CIDR from which to allow HTTP requests to the /config endpoint")
aclHttpHeader = flagSet.String("acl-http-header", "X-Forwarded-User", "HTTP header to check for authenticated admin users")
flagSet.String("allow-config-from-cidr", opts.AllowConfigFromCIDR, "A CIDR from which to allow HTTP requests to the /config endpoint")
flagSet.String("acl-http-header", opts.AclHttpHeader, "HTTP header to check for authenticated admin users")

adminUsers = app.StringArray{}
nsqlookupdHTTPAddresses = app.StringArray{}
nsqdHTTPAddresses = app.StringArray{}
)

func init() {
nsqlookupdHTTPAddresses := app.StringArray{}
flagSet.Var(&nsqlookupdHTTPAddresses, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
nsqdHTTPAddresses := app.StringArray{}
flagSet.Var(&nsqdHTTPAddresses, "nsqd-http-address", "nsqd HTTP address (may be given multiple times)")
adminUsers := app.StringArray{}
flagSet.Var(&adminUsers, "admin-user", "admin user (may be given multiple times; if specified, only these users will be able to perform privileged actions; acl-http-header is used to determine the authenticated user)")

return flagSet
}

type program struct {
once sync.Once
nsqadmin *nsqadmin.NSQAdmin
}

func main() {
prg := &program{}
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
logFatal("%s", err)
}
}

func (p *program) Init(env svc.Environment) error {
if env.IsWindowsService() {
dir := filepath.Dir(os.Args[0])
return os.Chdir(dir)
}
return nil
}

func (p *program) Start() error {
opts := nsqadmin.NewOptions()

flagSet := nsqadminFlagSet(opts)
flagSet.Parse(os.Args[1:])

if *showVersion {
if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
fmt.Println(version.String("nsqadmin"))
return
os.Exit(0)
}

exitChan := make(chan int)
signalChan := make(chan os.Signal, 1)
go func() {
<-signalChan
exitChan <- 1
}()
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

var cfg map[string]interface{}
if *config != "" {
_, err := toml.DecodeFile(*config, &cfg)
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
_, err := toml.DecodeFile(configFile, &cfg)
if err != nil {
log.Fatalf("ERROR: failed to load config file %s - %s", *config, err)
logFatal("failed to load config file %s - %s", configFile, err)
}
}

opts := nsqadmin.NewOptions()
options.Resolve(opts, flagSet, cfg)
nsqadmin := nsqadmin.New(opts)
nsqadmin, err := nsqadmin.New(opts)
if err != nil {
logFatal("failed to instantiate nsqadmin - %s", err)
}
p.nsqadmin = nsqadmin

go func() {
err := p.nsqadmin.Main()
if err != nil {
p.Stop()
os.Exit(1)
}
}()

return nil
}

func (p *program) Stop() error {
p.once.Do(func() {
p.nsqadmin.Exit()
})
return nil
}

nsqadmin.Main()
<-exitChan
nsqadmin.Exit()
func logFatal(f string, args ...interface{}) {
lg.LogFatal("[nsqadmin] ", f, args...)
}
5 changes: 2 additions & 3 deletions nsqadmin/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,12 +765,11 @@ func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httpr
}
case "log_level":
logLevelStr := string(body)
logLevel, err := lg.ParseLogLevel(logLevelStr, opts.Verbose)
logLevel, err := lg.ParseLogLevel(logLevelStr)
if err != nil {
return nil, http_api.Err{400, "INVALID_VALUE"}
}
opts.LogLevel = logLevelStr
opts.logLevel = logLevel
opts.LogLevel = logLevel
default:
return nil, http_api.Err{400, "INVALID_OPTION"}
}
Expand Down
2 changes: 1 addition & 1 deletion nsqadmin/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ const (

func (n *NSQAdmin) logf(level lg.LogLevel, f string, args ...interface{}) {
opts := n.getOpts()
lg.Logf(opts.Logger, opts.logLevel, level, f, args...)
lg.Logf(opts.Logger, opts.LogLevel, level, f, args...)
}
93 changes: 45 additions & 48 deletions nsqadmin/nsqadmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net"
Expand All @@ -16,7 +18,6 @@ import (
"sync/atomic"

"github.com/nsqio/nsq/internal/http_api"
"github.com/nsqio/nsq/internal/lg"
"github.com/nsqio/nsq/internal/util"
"github.com/nsqio/nsq/internal/version"
)
Expand All @@ -31,7 +32,7 @@ type NSQAdmin struct {
httpClientTLSConfig *tls.Config
}

func New(opts *Options) *NSQAdmin {
func New(opts *Options) (*NSQAdmin, error) {
if opts.Logger == nil {
opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
}
Expand All @@ -41,41 +42,20 @@ func New(opts *Options) *NSQAdmin {
}
n.swapOpts(opts)

var err error
opts.logLevel, err = lg.ParseLogLevel(opts.LogLevel, opts.Verbose)
if err != nil {
n.logf(LOG_FATAL, "%s", err)
os.Exit(1)
}

if len(opts.NSQDHTTPAddresses) == 0 && len(opts.NSQLookupdHTTPAddresses) == 0 {
n.logf(LOG_FATAL, "--nsqd-http-address or --lookupd-http-address required.")
os.Exit(1)
return nil, errors.New("--nsqd-http-address or --lookupd-http-address required")
}

if len(opts.NSQDHTTPAddresses) != 0 && len(opts.NSQLookupdHTTPAddresses) != 0 {
n.logf(LOG_FATAL, "use --nsqd-http-address or --lookupd-http-address not both")
os.Exit(1)
}

// verify that the supplied address is valid
verifyAddress := func(arg string, address string) *net.TCPAddr {
addr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
n.logf(LOG_FATAL, "failed to resolve %s address (%s) - %s", arg, address, err)
os.Exit(1)
}
return addr
return nil, errors.New("use --nsqd-http-address or --lookupd-http-address not both")
}

if opts.HTTPClientTLSCert != "" && opts.HTTPClientTLSKey == "" {
n.logf(LOG_FATAL, "--http-client-tls-key must be specified with --http-client-tls-cert")
os.Exit(1)
return nil, errors.New("--http-client-tls-key must be specified with --http-client-tls-cert")
}

if opts.HTTPClientTLSKey != "" && opts.HTTPClientTLSCert == "" {
n.logf(LOG_FATAL, "--http-client-tls-cert must be specified with --http-client-tls-key")
os.Exit(1)
return nil, errors.New("--http-client-tls-cert must be specified with --http-client-tls-key")
}

n.httpClientTLSConfig = &tls.Config{
Expand All @@ -84,40 +64,42 @@ func New(opts *Options) *NSQAdmin {
if opts.HTTPClientTLSCert != "" && opts.HTTPClientTLSKey != "" {
cert, err := tls.LoadX509KeyPair(opts.HTTPClientTLSCert, opts.HTTPClientTLSKey)
if err != nil {
n.logf(LOG_FATAL, "failed to LoadX509KeyPair %s, %s - %s",
return nil, fmt.Errorf("failed to LoadX509KeyPair %s, %s - %s",
opts.HTTPClientTLSCert, opts.HTTPClientTLSKey, err)
os.Exit(1)
}
n.httpClientTLSConfig.Certificates = []tls.Certificate{cert}
}
if opts.HTTPClientTLSRootCAFile != "" {
tlsCertPool := x509.NewCertPool()
caCertFile, err := ioutil.ReadFile(opts.HTTPClientTLSRootCAFile)
if err != nil {
n.logf(LOG_FATAL, "failed to read TLS root CA file %s - %s",
return nil, fmt.Errorf("failed to read TLS root CA file %s - %s",
opts.HTTPClientTLSRootCAFile, err)
os.Exit(1)
}
if !tlsCertPool.AppendCertsFromPEM(caCertFile) {
n.logf(LOG_FATAL, "failed to AppendCertsFromPEM %s", opts.HTTPClientTLSRootCAFile)
os.Exit(1)
return nil, fmt.Errorf("failed to AppendCertsFromPEM %s", opts.HTTPClientTLSRootCAFile)
}
n.httpClientTLSConfig.RootCAs = tlsCertPool
}

// require that both the hostname and port be specified
for _, address := range opts.NSQLookupdHTTPAddresses {
verifyAddress("--lookupd-http-address", address)
_, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return nil, fmt.Errorf("failed to resolve --lookupd-http-address (%s) - %s", address, err)
}
}

for _, address := range opts.NSQDHTTPAddresses {
verifyAddress("--nsqd-http-address", address)
_, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return nil, fmt.Errorf("failed to resolve --nsqd-http-address (%s) - %s", address, err)
}
}

if opts.ProxyGraphite {
url, err := url.Parse(opts.GraphiteURL)
if err != nil {
n.logf(LOG_FATAL, "failed to parse --graphite-url='%s' - %s", opts.GraphiteURL, err)
return nil, fmt.Errorf("failed to parse --graphite-url (%s) - %s", opts.GraphiteURL, err)
os.Exit(1)
}
n.graphiteURL = url
Expand All @@ -126,16 +108,21 @@ func New(opts *Options) *NSQAdmin {
if opts.AllowConfigFromCIDR != "" {
_, _, err := net.ParseCIDR(opts.AllowConfigFromCIDR)
if err != nil {
n.logf(LOG_FATAL, "failed to parse --allow-config-from-cidr='%s' - %s", opts.AllowConfigFromCIDR, err)
os.Exit(1)
return nil, fmt.Errorf("failed to parse --allow-config-from-cidr (%s) - %s", opts.AllowConfigFromCIDR, err)
}
}

opts.BasePath = normalizeBasePath(opts.BasePath)

n.logf(LOG_INFO, version.String("nsqadmin"))

return n
var err error
n.httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
if err != nil {
return nil, fmt.Errorf("listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
}

return n, nil
}

func normalizeBasePath(p string) string {
Expand Down Expand Up @@ -180,22 +167,32 @@ func (n *NSQAdmin) handleAdminActions() {
}
}

func (n *NSQAdmin) Main() {
httpListener, err := net.Listen("tcp", n.getOpts().HTTPAddress)
if err != nil {
n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
os.Exit(1)
func (n *NSQAdmin) Main() error {
exitCh := make(chan error)
var once sync.Once
exitFunc := func(err error) {
once.Do(func() {
if err != nil {
n.logf(LOG_FATAL, "%s", err)
}
exitCh <- err
})
}
n.httpListener = httpListener

httpServer := NewHTTPServer(&Context{n})
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpListener, http_api.CompressHandler(httpServer), "HTTP", n.logf)
exitFunc(http_api.Serve(n.httpListener, http_api.CompressHandler(httpServer), "HTTP", n.logf))
})
n.waitGroup.Wrap(n.handleAdminActions)

err := <-exitCh
return err
}

func (n *NSQAdmin) Exit() {
n.httpListener.Close()
if n.httpListener != nil {
n.httpListener.Close()
}
close(n.notifications)
n.waitGroup.Wait()
}
Loading

0 comments on commit 30a3c75

Please sign in to comment.