From 25052020aa91135c5bf1ad80e71e28d1b4642d15 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Sun, 4 Oct 2015 09:31:39 -0700 Subject: [PATCH] nsqd: flock --data-path for unix-like platforms --- internal/dirlock/dirlock.go | 38 +++++++++++++++++++++++++++++ internal/dirlock/dirlock_windows.go | 21 ++++++++++++++++ nsqd/nsqd.go | 17 +++++++++++++ 3 files changed, 76 insertions(+) create mode 100644 internal/dirlock/dirlock.go create mode 100644 internal/dirlock/dirlock_windows.go diff --git a/internal/dirlock/dirlock.go b/internal/dirlock/dirlock.go new file mode 100644 index 000000000..a731d7c39 --- /dev/null +++ b/internal/dirlock/dirlock.go @@ -0,0 +1,38 @@ +// +build !windows + +package dirlock + +import ( + "fmt" + "os" + "syscall" +) + +type DirLock struct { + dir string + f *os.File +} + +func New(dir string) *DirLock { + return &DirLock{ + dir: dir, + } +} + +func (l *DirLock) Lock() error { + f, err := os.Open(l.dir) + if err != nil { + return err + } + l.f = f + err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + if err != nil { + return fmt.Errorf("ERROR: cannot flock directory %s - %s", l.dir, err) + } + return nil +} + +func (l *DirLock) Unlock() error { + defer l.f.Close() + return syscall.Flock(int(l.f.Fd()), syscall.LOCK_UN) +} diff --git a/internal/dirlock/dirlock_windows.go b/internal/dirlock/dirlock_windows.go new file mode 100644 index 000000000..66ca7916e --- /dev/null +++ b/internal/dirlock/dirlock_windows.go @@ -0,0 +1,21 @@ +// +build windows + +package dirlock + +type DirLock struct { + dir string +} + +func New(dir string) *DirLock { + return &DirLock{ + dir: dir, + } +} + +func (l *DirLock) Lock() error { + return nil +} + +func (l *DirLock) Unlock() error { + return nil +} diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index ede97ca7e..cad766772 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -19,6 +19,7 @@ import ( "github.com/bitly/go-simplejson" "github.com/nsqio/nsq/internal/clusterinfo" + "github.com/nsqio/nsq/internal/dirlock" "github.com/nsqio/nsq/internal/http_api" "github.com/nsqio/nsq/internal/protocol" "github.com/nsqio/nsq/internal/statsd" @@ -45,6 +46,7 @@ type NSQD struct { opts atomic.Value + dl *dirlock.DirLock flag int32 errMtx sync.RWMutex err error @@ -71,6 +73,12 @@ type NSQD struct { } func New(opts *Options) *NSQD { + dataPath := opts.DataPath + if opts.DataPath == "" { + cwd, _ := os.Getwd() + dataPath = cwd + } + n := &NSQD{ flag: flagHealthy, startTime: time.Now(), @@ -80,9 +88,16 @@ func New(opts *Options) *NSQD { notifyChan: make(chan interface{}), optsNotificationChan: make(chan struct{}, 1), ci: clusterinfo.New(opts.Logger, http_api.NewClient(nil)), + dl: dirlock.New(dataPath), } n.swapOpts(opts) + err := n.dl.Lock() + if err != nil { + n.logf("FATAL: --data-path=%s in use", dataPath) + os.Exit(1) + } + if opts.MaxDeflateLevel < 1 || opts.MaxDeflateLevel > 9 { n.logf("FATAL: --max-deflate-level must be [1,9]") os.Exit(1) @@ -435,6 +450,8 @@ func (n *NSQD) Exit() { // could potentially starve items in process and deadlock) close(n.exitChan) n.waitGroup.Wait() + + n.dl.Unlock() } // GetTopic performs a thread safe operation