diff --git a/internal/dirlock/dirlock.go b/internal/dirlock/dirlock.go new file mode 100644 index 000000000..a731d7c39 --- /dev/null +++ b/internal/dirlock/dirlock.go @@ -0,0 +1,38 @@ +// +build !windows + +package dirlock + +import ( + "fmt" + "os" + "syscall" +) + +type DirLock struct { + dir string + f *os.File +} + +func New(dir string) *DirLock { + return &DirLock{ + dir: dir, + } +} + +func (l *DirLock) Lock() error { + f, err := os.Open(l.dir) + if err != nil { + return err + } + l.f = f + err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + if err != nil { + return fmt.Errorf("ERROR: cannot flock directory %s - %s", l.dir, err) + } + return nil +} + +func (l *DirLock) Unlock() error { + defer l.f.Close() + return syscall.Flock(int(l.f.Fd()), syscall.LOCK_UN) +} diff --git a/internal/dirlock/dirlock_windows.go b/internal/dirlock/dirlock_windows.go new file mode 100644 index 000000000..66ca7916e --- /dev/null +++ b/internal/dirlock/dirlock_windows.go @@ -0,0 +1,21 @@ +// +build windows + +package dirlock + +type DirLock struct { + dir string +} + +func New(dir string) *DirLock { + return &DirLock{ + dir: dir, + } +} + +func (l *DirLock) Lock() error { + return nil +} + +func (l *DirLock) Unlock() error { + return nil +} diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index ede97ca7e..9d8d2eb0d 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -19,6 +19,7 @@ import ( "github.com/bitly/go-simplejson" "github.com/nsqio/nsq/internal/clusterinfo" + "github.com/nsqio/nsq/internal/dirlock" "github.com/nsqio/nsq/internal/http_api" "github.com/nsqio/nsq/internal/protocol" "github.com/nsqio/nsq/internal/statsd" @@ -45,6 +46,7 @@ type NSQD struct { opts atomic.Value + dl *dirlock.DirLock flag int32 errMtx sync.RWMutex err error @@ -80,9 +82,16 @@ func New(opts *Options) *NSQD { notifyChan: make(chan interface{}), optsNotificationChan: make(chan struct{}, 1), ci: clusterinfo.New(opts.Logger, http_api.NewClient(nil)), + dl: dirlock.New(opts.DataPath), } n.swapOpts(opts) + err := n.dl.Lock() + if err != nil { + n.logf("FATAL: --data-path=%s in use", opts.DataPath) + os.Exit(1) + } + if opts.MaxDeflateLevel < 1 || opts.MaxDeflateLevel > 9 { n.logf("FATAL: --max-deflate-level must be [1,9]") os.Exit(1) @@ -435,6 +444,8 @@ func (n *NSQD) Exit() { // could potentially starve items in process and deadlock) close(n.exitChan) n.waitGroup.Wait() + + n.dl.Unlock() } // GetTopic performs a thread safe operation diff --git a/nsqd/options.go b/nsqd/options.go index 72641d51c..6ff7d7a15 100644 --- a/nsqd/options.go +++ b/nsqd/options.go @@ -92,6 +92,8 @@ func NewOptions() *Options { HTTPSAddress: "0.0.0.0:4152", BroadcastAddress: hostname, + DataPath: ".", + NSQLookupdTCPAddresses: make([]string, 0), AuthHTTPAddresses: make([]string, 0),