Skip to content

Commit

Permalink
nsqlookupd: synchronize goroutines to avoid t.Log races
Browse files Browse the repository at this point in the history
  • Loading branch information
benjsto committed Sep 19, 2019
1 parent 9faeb4a commit 1837898
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 10 deletions.
13 changes: 12 additions & 1 deletion internal/protocol/tcp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net"
"runtime"
"strings"
"sync"

"github.com/nsqio/nsq/internal/lg"
)
Expand All @@ -16,6 +17,8 @@ type TCPHandler interface {
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
logf(lg.INFO, "TCP: listening on %s", listener.Addr())

var wg sync.WaitGroup

for {
clientConn, err := listener.Accept()
if err != nil {
Expand All @@ -30,9 +33,17 @@ func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) er
}
break
}
go handler.Handle(clientConn)

wg.Add(1)
go func() {
handler.Handle(clientConn)
wg.Done()
}()
}

// wait to return until all handler goroutines complete
wg.Wait()

logf(lg.INFO, "TCP: closing %s", listener.Addr())

return nil
Expand Down
28 changes: 25 additions & 3 deletions nsqlookupd/lookup_protocol_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
)

type LookupProtocolV1 struct {
ctx *Context
ctx *Context
exitChan chan struct{}
}

func (p *LookupProtocolV1) IOLoop(conn net.Conn) error {
Expand All @@ -27,9 +28,30 @@ func (p *LookupProtocolV1) IOLoop(conn net.Conn) error {

client := NewClientV1(conn)
reader := bufio.NewReader(client)
readChan := make(chan string)
errChan := make(chan error)

for {
line, err = reader.ReadString('\n')
if err != nil {
exitLoop := false

// do this read in a goroutine so we can exit this loop if needed
go func() {
line, e := reader.ReadString('\n')
readChan <- line
errChan <- e
}()

// now wait until we either get an exit signal or a readline completes
select {
case <-p.exitChan:
exitLoop = true
break
case line = <-readChan:
err = <-errChan
break
}

if exitLoop || err != nil {
break
}

Expand Down
2 changes: 1 addition & 1 deletion nsqlookupd/lookup_protocol_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func testIOLoopReturnsClientErr(t *testing.T, fakeConn test.FakeNetConn) {

nsqlookupd, err := New(opts)
test.Nil(t, err)
prot := &LookupProtocolV1{ctx: &Context{nsqlookupd: nsqlookupd}}
prot := &LookupProtocolV1{ctx: &Context{nsqlookupd: nsqlookupd}, exitChan: make(chan struct{})}

errChan := make(chan error)
testIOLoop := func() {
Expand Down
9 changes: 6 additions & 3 deletions nsqlookupd/nsqlookupd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type NSQLookupd struct {
httpListener net.Listener
waitGroup util.WaitGroupWrapper
DB *RegistrationDB
exitChan chan struct{}
}

func New(opts *Options) (*NSQLookupd, error) {
Expand All @@ -29,8 +30,9 @@ func New(opts *Options) (*NSQLookupd, error) {
opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
}
l := &NSQLookupd{
opts: opts,
DB: NewRegistrationDB(),
opts: opts,
DB: NewRegistrationDB(),
exitChan: make(chan struct{}),
}

l.logf(LOG_INFO, version.String("nsqlookupd"))
Expand Down Expand Up @@ -63,7 +65,7 @@ func (l *NSQLookupd) Main() error {
})
}

tcpServer := &tcpServer{ctx: ctx}
tcpServer := &tcpServer{ctx: ctx, exitChan: l.exitChan}
l.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(l.tcpListener, tcpServer, l.logf))
})
Expand All @@ -85,6 +87,7 @@ func (l *NSQLookupd) RealHTTPAddr() *net.TCPAddr {
}

func (l *NSQLookupd) Exit() {
close(l.exitChan)
if l.tcpListener != nil {
l.tcpListener.Close()
}
Expand Down
5 changes: 3 additions & 2 deletions nsqlookupd/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
)

type tcpServer struct {
ctx *Context
ctx *Context
exitChan chan struct{}
}

func (p *tcpServer) Handle(clientConn net.Conn) {
Expand All @@ -32,7 +33,7 @@ func (p *tcpServer) Handle(clientConn net.Conn) {
var prot protocol.Protocol
switch protocolMagic {
case " V1":
prot = &LookupProtocolV1{ctx: p.ctx}
prot = &LookupProtocolV1{ctx: p.ctx, exitChan: p.exitChan}
default:
protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL"))
clientConn.Close()
Expand Down

0 comments on commit 1837898

Please sign in to comment.