Skip to content

Commit

Permalink
nsqd: return error if pause state is not changed
Browse files Browse the repository at this point in the history
so the caller of pause or unpause can know if the topic or channel
was already paused or unpaused

also avoids a little bit of work if no change
  • Loading branch information
ploxiln committed Jul 14, 2018
1 parent 6ab87aa commit 31e0aab
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 9 deletions.
13 changes: 10 additions & 3 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,19 @@ func (c *Channel) UnPause() error {
}

func (c *Channel) doPause(pause bool) error {
setv := int32(0)
if pause {
atomic.StoreInt32(&c.paused, 1)
} else {
atomic.StoreInt32(&c.paused, 0)
setv = int32(1)
}
prev := atomic.SwapInt32(&c.paused, setv)

if prev == setv {
if pause {
return ErrAlreadyPaused
} else {
return ErrAlreadyUnPaused
}
}
c.RLock()
for _, client := range c.clients {
if pause {
Expand Down
10 changes: 8 additions & 2 deletions nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,14 @@ func (s *httpServer) doPauseTopic(w http.ResponseWriter, req *http.Request, ps h
return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
}

if strings.Contains(req.URL.Path, "unpause") {
if strings.HasSuffix(req.URL.Path, "/unpause") {
err = topic.UnPause()
} else {
err = topic.Pause()
}
if err == ErrAlreadyPaused || err == ErrAlreadyUnPaused {
return nil, http_api.Err{400, err.Error()}
}
if err != nil {
s.ctx.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, err)
return nil, http_api.Err{500, "INTERNAL_ERROR"}
Expand Down Expand Up @@ -452,11 +455,14 @@ func (s *httpServer) doPauseChannel(w http.ResponseWriter, req *http.Request, ps
return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
}

if strings.Contains(req.URL.Path, "unpause") {
if strings.HasSuffix(req.URL.Path, "/unpause") {
err = channel.UnPause()
} else {
err = channel.Pause()
}
if err == ErrAlreadyPaused || err == ErrAlreadyUnPaused {
return nil, http_api.Err{400, err.Error()}
}
if err != nil {
s.ctx.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, err)
return nil, http_api.Err{500, "INTERNAL_ERROR"}
Expand Down
17 changes: 13 additions & 4 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"github.com/nsqio/nsq/internal/util"
)

var ErrAlreadyPaused = errors.New("ALREADY_PAUSED")
var ErrAlreadyUnPaused = errors.New("ALREADY_UNPAUSED")

type Topic struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
messageCount uint64
Expand Down Expand Up @@ -452,17 +455,23 @@ func (t *Topic) UnPause() error {
}

func (t *Topic) doPause(pause bool) error {
setv := int32(0)
if pause {
atomic.StoreInt32(&t.paused, 1)
} else {
atomic.StoreInt32(&t.paused, 0)
setv = int32(1)
}
prev := atomic.SwapInt32(&t.paused, setv)

if prev == setv {
if pause {
return ErrAlreadyPaused
} else {
return ErrAlreadyUnPaused
}
}
select {
case t.pauseChan <- 1:
case <-t.exitChan:
}

return nil
}

Expand Down

0 comments on commit 31e0aab

Please sign in to comment.