Skip to content

Commit

Permalink
nsqlookupd: close all connections on exit to avoid t.Log races
Browse files Browse the repository at this point in the history
  • Loading branch information
benjsto committed Sep 19, 2019
1 parent 9faeb4a commit 61de19e
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 5 deletions.
13 changes: 12 additions & 1 deletion internal/protocol/tcp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net"
"runtime"
"strings"
"sync"

"github.com/nsqio/nsq/internal/lg"
)
Expand All @@ -16,6 +17,8 @@ type TCPHandler interface {
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
logf(lg.INFO, "TCP: listening on %s", listener.Addr())

var wg sync.WaitGroup

for {
clientConn, err := listener.Accept()
if err != nil {
Expand All @@ -30,9 +33,17 @@ func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) er
}
break
}
go handler.Handle(clientConn)

wg.Add(1)
go func() {
handler.Handle(clientConn)
wg.Done()
}()
}

// wait to return until all handler goroutines complete
wg.Wait()

logf(lg.INFO, "TCP: closing %s", listener.Addr())

return nil
Expand Down
2 changes: 2 additions & 0 deletions nsqlookupd/lookup_protocol_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func testIOLoopReturnsClientErr(t *testing.T, fakeConn test.FakeNetConn) {
test.Nil(t, err)
prot := &LookupProtocolV1{ctx: &Context{nsqlookupd: nsqlookupd}}

nsqlookupd.tcpServer = &tcpServer{ctx: prot.ctx}

errChan := make(chan error)
testIOLoop := func() {
errChan <- prot.IOLoop(fakeConn)
Expand Down
9 changes: 7 additions & 2 deletions nsqlookupd/nsqlookupd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type NSQLookupd struct {
opts *Options
tcpListener net.Listener
httpListener net.Listener
tcpServer *tcpServer
waitGroup util.WaitGroupWrapper
DB *RegistrationDB
}
Expand Down Expand Up @@ -63,9 +64,9 @@ func (l *NSQLookupd) Main() error {
})
}

tcpServer := &tcpServer{ctx: ctx}
l.tcpServer = &tcpServer{ctx: ctx}
l.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(l.tcpListener, tcpServer, l.logf))
exitFunc(protocol.TCPServer(l.tcpListener, l.tcpServer, l.logf))
})
httpServer := newHTTPServer(ctx)
l.waitGroup.Wrap(func() {
Expand All @@ -89,6 +90,10 @@ func (l *NSQLookupd) Exit() {
l.tcpListener.Close()
}

if l.tcpServer != nil {
l.tcpServer.CloseAll()
}

if l.httpListener != nil {
l.httpListener.Close()
}
Expand Down
16 changes: 14 additions & 2 deletions nsqlookupd/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package nsqlookupd
import (
"io"
"net"
"sync"

"github.com/nsqio/nsq/internal/protocol"
)

type tcpServer struct {
ctx *Context
ctx *Context
conns sync.Map
}

func (p *tcpServer) Handle(clientConn net.Conn) {
Expand Down Expand Up @@ -41,9 +43,19 @@ func (p *tcpServer) Handle(clientConn net.Conn) {
return
}

p.conns.Store(clientConn.RemoteAddr(), clientConn)

err = prot.IOLoop(clientConn)
if err != nil {
p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
return
}

p.conns.Delete(clientConn.RemoteAddr())
}

func (p *tcpServer) CloseAll() {
p.conns.Range(func(k, v interface{}) bool {
v.(net.Conn).Close()
return true
})
}

0 comments on commit 61de19e

Please sign in to comment.