From b4437f154be64f3f3b9752d04b82d1919602c8f8 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 16 Sep 2019 14:59:34 -0600 Subject: [PATCH] [FIXED] Data race between processMsg() and Stats() Resolves #520 Signed-off-by: Ivan Kozlovic --- .travis.yml | 4 ++-- nats.go | 14 ++++++-------- nats_test.go | 36 ++++++++++++++++++++++++++++++++++++ scripts/cov.sh | 8 ++++---- 4 files changed, 48 insertions(+), 14 deletions(-) diff --git a/.travis.yml b/.travis.yml index 9e73bb2dd..2594b74ea 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,8 +16,8 @@ install: before_script: - $(exit $(go fmt ./... | wc -l)) - go vet ./... -- misspell -error -locale US . +- find . -type f -name "*.go" | xargs misspell -error -locale US - staticcheck ./... script: - go test -i -race ./... -- if [[ "$TRAVIS_GO_VERSION" =~ 1.12 ]]; then ./scripts/cov.sh TRAVIS; else go test -race ./...; fi +- if [[ "$TRAVIS_GO_VERSION" =~ 1.12 ]]; then ./scripts/cov.sh TRAVIS; else go test -race -v -p=1 ./... --failfast; fi diff --git a/nats.go b/nats.go index b3a87449a..2ab010b5d 100644 --- a/nats.go +++ b/nats.go @@ -2164,8 +2164,8 @@ func (nc *Conn) processMsg(data []byte) { nc.subsMu.RLock() // Stats - nc.InMsgs++ - nc.InBytes += uint64(len(data)) + atomic.AddUint64(&nc.InMsgs, 1) + atomic.AddUint64(&nc.InBytes, uint64(len(data))) sub := nc.subs[nc.ps.ma.sid] if sub == nil { @@ -3881,18 +3881,16 @@ func (nc *Conn) isDrainingPubs() bool { // Stats will return a race safe copy of the Statistics section for the connection. func (nc *Conn) Stats() Statistics { - // Stats are updated either under connection's mu or subsMu mutexes. - // Lock both to safely get them. + // Stats are updated either under connection's mu or with atomic operations + // for inbound stats in processMsg(). nc.mu.Lock() - nc.subsMu.RLock() stats := Statistics{ - InMsgs: nc.InMsgs, - InBytes: nc.InBytes, + InMsgs: atomic.LoadUint64(&nc.InMsgs), + InBytes: atomic.LoadUint64(&nc.InBytes), OutMsgs: nc.OutMsgs, OutBytes: nc.OutBytes, Reconnects: nc.Reconnects, } - nc.subsMu.RUnlock() nc.mu.Unlock() return stats } diff --git a/nats_test.go b/nats_test.go index f62f16f2c..0eb649e30 100644 --- a/nats_test.go +++ b/nats_test.go @@ -2181,3 +2181,39 @@ func TestAuthErrorOnReconnect(t *testing.T) { t.Fatalf("Wrong status: %d\n", nc.Status()) } } + +func TestStatsRace(t *testing.T) { + o := natsserver.DefaultTestOptions + o.Port = -1 + s := RunServerWithOptions(&o) + defer s.Shutdown() + + nc, err := Connect(fmt.Sprintf("nats://%s:%d", o.Host, o.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + wg := sync.WaitGroup{} + wg.Add(1) + ch := make(chan bool) + go func() { + defer wg.Done() + for { + select { + case <-ch: + return + default: + nc.Stats() + } + } + }() + + nc.Subscribe("foo", func(_ *Msg) {}) + for i := 0; i < 1000; i++ { + nc.Publish("foo", []byte("hello")) + } + + close(ch) + wg.Wait() +} diff --git a/scripts/cov.sh b/scripts/cov.sh index 398b4657d..760e42f06 100755 --- a/scripts/cov.sh +++ b/scripts/cov.sh @@ -3,10 +3,10 @@ rm -rf ./cov mkdir cov -go test -v -race -covermode=atomic -coverprofile=./cov/nats.out -go test -v -race -covermode=atomic -coverprofile=./cov/test.out -coverpkg=github.com/nats-io/nats.go ./test -go test -v -race -covermode=atomic -coverprofile=./cov/builtin.out -coverpkg=github.com/nats-io/nats.go/encoders/builtin ./test -run EncBuiltin -go test -v -race -covermode=atomic -coverprofile=./cov/protobuf.out -coverpkg=github.com/nats-io/nats.go/encoders/protobuf ./test -run EncProto +go test --failfast -v -race -covermode=atomic -coverprofile=./cov/nats.out +go test --failfast -v -race -covermode=atomic -coverprofile=./cov/test.out -coverpkg=github.com/nats-io/nats.go ./test +go test --failfast -v -race -covermode=atomic -coverprofile=./cov/builtin.out -coverpkg=github.com/nats-io/nats.go/encoders/builtin ./test -run EncBuiltin +go test --failfast -v -race -covermode=atomic -coverprofile=./cov/protobuf.out -coverpkg=github.com/nats-io/nats.go/encoders/protobuf ./test -run EncProto gocovmerge ./cov/*.out > acc.out rm -rf ./cov