Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd/nsqlookupd: properly handle fatal accept errors #1140

Merged
merged 13 commits into from
Mar 3, 2019
4 changes: 2 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@

[[constraint]]
name = "github.com/mreiferson/go-options"
revision = "77551d20752b"
revision = "0c63f026bcd6"

[[constraint]]
name = "github.com/nsqio/go-diskqueue"
Expand Down
2 changes: 1 addition & 1 deletion apps/nsq_to_file/nsq_to_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func main() {
options.Resolve(opts, fs, nil)

logger := log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
logLevel, err := lg.ParseLogLevel(opts.LogLevel, false)
logLevel, err := lg.ParseLogLevel(opts.LogLevel)
if err != nil {
log.Fatal("--log-level is invalid")
}
Expand Down
139 changes: 88 additions & 51 deletions apps/nsqadmin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,93 +3,130 @@ package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"

"github.com/BurntSushi/toml"
"github.com/judwhite/go-svc/svc"
"github.com/mreiferson/go-options"
"github.com/nsqio/nsq/internal/app"
"github.com/nsqio/nsq/internal/lg"
"github.com/nsqio/nsq/internal/version"
"github.com/nsqio/nsq/nsqadmin"
)

var (
flagSet = flag.NewFlagSet("nsqadmin", flag.ExitOnError)
func nsqadminFlagSet(opts *nsqadmin.Options) *flag.FlagSet {
flagSet := flag.NewFlagSet("nsqadmin", flag.ExitOnError)

basePath = flagSet.String("base-path", "/", "URL base path")
config = flagSet.String("config", "", "path to config file")
showVersion = flagSet.Bool("version", false, "print version string")
flagSet.String("config", "", "path to config file")
flagSet.Bool("version", false, "print version string")

logLevel = flagSet.String("log-level", "info", "set log verbosity: debug, info, warn, error, or fatal")
logPrefix = flagSet.String("log-prefix", "[nsqadmin] ", "log message prefix")
verbose = flagSet.Bool("verbose", false, "deprecated in favor of log-level")
logLevel := opts.LogLevel
flagSet.Var(&logLevel, "log-level", "set log verbosity: debug, info, warn, error, or fatal")
flagSet.String("log-prefix", "[nsqadmin] ", "log message prefix")
flagSet.Bool("verbose", false, "[deprecated] has no effect, use --log-level")

httpAddress = flagSet.String("http-address", "0.0.0.0:4171", "<addr>:<port> to listen on for HTTP clients")
flagSet.String("http-address", opts.HTTPAddress, "<addr>:<port> to listen on for HTTP clients")
flagSet.String("base-path", opts.BasePath, "URL base path")

graphiteURL = flagSet.String("graphite-url", "", "graphite HTTP address")
proxyGraphite = flagSet.Bool("proxy-graphite", false, "proxy HTTP requests to graphite")
flagSet.String("graphite-url", opts.GraphiteURL, "graphite HTTP address")
flagSet.Bool("proxy-graphite", false, "proxy HTTP requests to graphite")

statsdCounterFormat = flagSet.String("statsd-counter-format", "stats.counters.%s.count", "The counter stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string.")
statsdGaugeFormat = flagSet.String("statsd-gauge-format", "stats.gauges.%s", "The gauge stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string.")
statsdPrefix = flagSet.String("statsd-prefix", "nsq.%s", "prefix used for keys sent to statsd (%s for host replacement, must match nsqd)")
statsdInterval = flagSet.Duration("statsd-interval", 60*time.Second, "time interval nsqd is configured to push to statsd (must match nsqd)")
flagSet.String("statsd-counter-format", opts.StatsdCounterFormat, "The counter stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string.")
flagSet.String("statsd-gauge-format", opts.StatsdGaugeFormat, "The gauge stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string.")
flagSet.String("statsd-prefix", opts.StatsdPrefix, "prefix used for keys sent to statsd (%s for host replacement, must match nsqd)")
flagSet.Duration("statsd-interval", opts.StatsdInterval, "time interval nsqd is configured to push to statsd (must match nsqd)")

notificationHTTPEndpoint = flagSet.String("notification-http-endpoint", "", "HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent")
flagSet.String("notification-http-endpoint", "", "HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent")

httpConnectTimeout = flagSet.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect")
httpRequestTimeout = flagSet.Duration("http-client-request-timeout", 5*time.Second, "timeout for HTTP request")
flagSet.Duration("http-client-connect-timeout", opts.HTTPClientConnectTimeout, "timeout for HTTP connect")
flagSet.Duration("http-client-request-timeout", opts.HTTPClientRequestTimeout, "timeout for HTTP request")

httpClientTLSInsecureSkipVerify = flagSet.Bool("http-client-tls-insecure-skip-verify", false, "configure the HTTP client to skip verification of TLS certificates")
httpClientTLSRootCAFile = flagSet.String("http-client-tls-root-ca-file", "", "path to CA file for the HTTP client")
httpClientTLSCert = flagSet.String("http-client-tls-cert", "", "path to certificate file for the HTTP client")
httpClientTLSKey = flagSet.String("http-client-tls-key", "", "path to key file for the HTTP client")
flagSet.Bool("http-client-tls-insecure-skip-verify", false, "configure the HTTP client to skip verification of TLS certificates")
flagSet.String("http-client-tls-root-ca-file", "", "path to CA file for the HTTP client")
flagSet.String("http-client-tls-cert", "", "path to certificate file for the HTTP client")
flagSet.String("http-client-tls-key", "", "path to key file for the HTTP client")

allowConfigFromCIDR = flagSet.String("allow-config-from-cidr", "127.0.0.1/8", "A CIDR from which to allow HTTP requests to the /config endpoint")
aclHttpHeader = flagSet.String("acl-http-header", "X-Forwarded-User", "HTTP header to check for authenticated admin users")
flagSet.String("allow-config-from-cidr", opts.AllowConfigFromCIDR, "A CIDR from which to allow HTTP requests to the /config endpoint")
flagSet.String("acl-http-header", opts.AclHttpHeader, "HTTP header to check for authenticated admin users")

adminUsers = app.StringArray{}
nsqlookupdHTTPAddresses = app.StringArray{}
nsqdHTTPAddresses = app.StringArray{}
)

func init() {
nsqlookupdHTTPAddresses := app.StringArray{}
flagSet.Var(&nsqlookupdHTTPAddresses, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
nsqdHTTPAddresses := app.StringArray{}
flagSet.Var(&nsqdHTTPAddresses, "nsqd-http-address", "nsqd HTTP address (may be given multiple times)")
adminUsers := app.StringArray{}
flagSet.Var(&adminUsers, "admin-user", "admin user (may be given multiple times; if specified, only these users will be able to perform privileged actions; acl-http-header is used to determine the authenticated user)")

return flagSet
}

type program struct {
once sync.Once
nsqadmin *nsqadmin.NSQAdmin
}

func main() {
prg := &program{}
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
logFatal("%s", err)
}
}

func (p *program) Init(env svc.Environment) error {
if env.IsWindowsService() {
dir := filepath.Dir(os.Args[0])
return os.Chdir(dir)
}
return nil
}

func (p *program) Start() error {
opts := nsqadmin.NewOptions()

flagSet := nsqadminFlagSet(opts)
flagSet.Parse(os.Args[1:])

if *showVersion {
if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
fmt.Println(version.String("nsqadmin"))
return
os.Exit(0)
}

exitChan := make(chan int)
signalChan := make(chan os.Signal, 1)
go func() {
<-signalChan
exitChan <- 1
}()
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

var cfg map[string]interface{}
if *config != "" {
_, err := toml.DecodeFile(*config, &cfg)
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
_, err := toml.DecodeFile(configFile, &cfg)
if err != nil {
log.Fatalf("ERROR: failed to load config file %s - %s", *config, err)
logFatal("failed to load config file %s - %s", configFile, err)
}
}

opts := nsqadmin.NewOptions()
options.Resolve(opts, flagSet, cfg)
nsqadmin := nsqadmin.New(opts)
nsqadmin, err := nsqadmin.New(opts)
if err != nil {
logFatal("failed to instantiate nsqadmin - %s", err)
}
p.nsqadmin = nsqadmin

go func() {
err := p.nsqadmin.Main()
if err != nil {
p.Stop()
os.Exit(1)
}
}()

return nil
}

func (p *program) Stop() error {
p.once.Do(func() {
p.nsqadmin.Exit()
})
return nil
}

nsqadmin.Main()
<-exitChan
nsqadmin.Exit()
func logFatal(f string, args ...interface{}) {
lg.LogFatal("[nsqadmin] ", f, args...)
}
100 changes: 100 additions & 0 deletions apps/nsqd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package main

import (
"flag"
"fmt"
"math/rand"
"os"
"path/filepath"
"sync"
"syscall"
"time"

"github.com/BurntSushi/toml"
"github.com/judwhite/go-svc/svc"
"github.com/mreiferson/go-options"
"github.com/nsqio/nsq/internal/lg"
"github.com/nsqio/nsq/internal/version"
"github.com/nsqio/nsq/nsqd"
)

type program struct {
once sync.Once
nsqd *nsqd.NSQD
}

func main() {
prg := &program{}
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
logFatal("%s", err)
}
}

func (p *program) Init(env svc.Environment) error {
if env.IsWindowsService() {
dir := filepath.Dir(os.Args[0])
return os.Chdir(dir)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mreiferson It doesn't hurt anything to have this, but the package does this bit for you now. I generally consider it impolite to os.Chdir but in this case I thought it was warranted. https://github.com/judwhite/go-svc/blame/d83e900e4a688aad49f41263bebf8793f0bf6add/svc/svc_windows.go#L59-L66

return nil
}

func (p *program) Start() error {
opts := nsqd.NewOptions()

flagSet := nsqdFlagSet(opts)
flagSet.Parse(os.Args[1:])

rand.Seed(time.Now().UTC().UnixNano())

if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
fmt.Println(version.String("nsqd"))
os.Exit(0)
}

var cfg config
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
_, err := toml.DecodeFile(configFile, &cfg)
if err != nil {
logFatal("failed to load config file %s - %s", configFile, err)
}
}
cfg.Validate()

options.Resolve(opts, flagSet, cfg)
nsqd, err := nsqd.New(opts)
if err != nil {
logFatal("failed to instantiate nsqd - %s", err)
}
p.nsqd = nsqd

err = p.nsqd.LoadMetadata()
if err != nil {
logFatal("failed to load metadata - %s", err)
}
err = p.nsqd.PersistMetadata()
if err != nil {
logFatal("failed to persist metadata - %s", err)
}

go func() {
err := p.nsqd.Main()
if err != nil {
p.Stop()
os.Exit(1)
}
}()

return nil
}

func (p *program) Stop() error {
p.once.Do(func() {
p.nsqd.Exit()
})
return nil
}

func logFatal(f string, args ...interface{}) {
lg.LogFatal("[nsqd] ", f, args...)
}
2 changes: 2 additions & 0 deletions apps/nsqd/nsqd_test.go → apps/nsqd/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (

"github.com/BurntSushi/toml"
"github.com/mreiferson/go-options"
"github.com/nsqio/nsq/internal/test"
"github.com/nsqio/nsq/nsqd"
)

func TestConfigFlagParsing(t *testing.T) {
opts := nsqd.NewOptions()
opts.Logger = test.NewTestLogger(t)

flagSet := nsqdFlagSet(opts)
flagSet.Parse([]string{})
Expand Down
Loading