Skip to content

Commit

Permalink
Avoid t.Log data races flagged up by the race detector
Browse files Browse the repository at this point in the history
This commit picks nsqio#1190 and backports
it to this version of NSQ to avoid data races in t.Log flagged up by the
race detector
  • Loading branch information
suhailpatel committed Oct 6, 2021
1 parent 63d06df commit 1449a74
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 5 deletions.
13 changes: 12 additions & 1 deletion internal/protocol/tcp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net"
"runtime"
"strings"
"sync"

"github.com/nsqio/nsq/internal/lg"
)
Expand All @@ -15,6 +16,8 @@ type TCPHandler interface {
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
logf(lg.INFO, "TCP: listening on %s", listener.Addr())

var wg sync.WaitGroup

for {
clientConn, err := listener.Accept()
if err != nil {
Expand All @@ -29,8 +32,16 @@ func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
}
break
}
go handler.Handle(clientConn)

wg.Add(1)
go func() {
handler.Handle(clientConn)
wg.Done()
}()
}

// wait to return until all handler goroutines complete
wg.Wait()

logf(lg.INFO, "TCP: closing %s", listener.Addr())
}
2 changes: 2 additions & 0 deletions nsqlookupd/lookup_protocol_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func testIOLoopReturnsClientErr(t *testing.T, fakeConn test.FakeNetConn) {

prot := &LookupProtocolV1{ctx: &Context{nsqlookupd: New(opts)}}

prot.ctx.nsqlookupd.tcpServer = &tcpServer{ctx: prot.ctx}

errChan := make(chan error)
testIOLoop := func() {
errChan <- prot.IOLoop(fakeConn)
Expand Down
9 changes: 7 additions & 2 deletions nsqlookupd/nsqlookupd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type NSQLookupd struct {
opts *Options
tcpListener net.Listener
httpListener net.Listener
tcpServer *tcpServer
waitGroup util.WaitGroupWrapper
DB *RegistrationDB
}
Expand Down Expand Up @@ -53,9 +54,9 @@ func (l *NSQLookupd) Main() {
l.Lock()
l.tcpListener = tcpListener
l.Unlock()
tcpServer := &tcpServer{ctx: ctx}
l.tcpServer = &tcpServer{ctx: ctx}
l.waitGroup.Wrap(func() {
protocol.TCPServer(tcpListener, tcpServer, l.logf)
protocol.TCPServer(tcpListener, l.tcpServer, l.logf)
})

httpListener, err := net.Listen("tcp", l.opts.HTTPAddress)
Expand Down Expand Up @@ -89,6 +90,10 @@ func (l *NSQLookupd) Exit() {
l.tcpListener.Close()
}

if l.tcpServer != nil {
l.tcpServer.CloseAll()
}

if l.httpListener != nil {
l.httpListener.Close()
}
Expand Down
16 changes: 14 additions & 2 deletions nsqlookupd/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package nsqlookupd
import (
"io"
"net"
"sync"

"github.com/nsqio/nsq/internal/protocol"
)

type tcpServer struct {
ctx *Context
ctx *Context
conns sync.Map
}

func (p *tcpServer) Handle(clientConn net.Conn) {
Expand Down Expand Up @@ -41,9 +43,19 @@ func (p *tcpServer) Handle(clientConn net.Conn) {
return
}

p.conns.Store(clientConn.RemoteAddr(), clientConn)

err = prot.IOLoop(clientConn)
if err != nil {
p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
return
}

p.conns.Delete(clientConn.RemoteAddr())
}

func (p *tcpServer) CloseAll() {
p.conns.Range(func(k, v interface{}) bool {
v.(net.Conn).Close()
return true
})
}

0 comments on commit 1449a74

Please sign in to comment.