Skip to content

Commit

Permalink
server,netutil: clean up the connManager logic
Browse files Browse the repository at this point in the history
Prior to this patch, we were using the same netutil.Server object to
manage both `net.Conn` created to serve HTTP connections, and
`net.Conn` created to serve SQL connections.

This was confusing, because a lot of the complexity specific for HTTP
connections (integration with the HTTP2 query handling, etc) is not
required for raw TCP connections as used by pgwire.

This commit clarifies this by separating the two roles.

This also cleans up the server initialization.

Release note: None
  • Loading branch information
knz committed Nov 14, 2022
1 parent 281aff2 commit 788c613
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 76 deletions.
16 changes: 1 addition & 15 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package server
import (
"context"
"fmt"
"net/http"
"os"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -88,7 +87,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/schedulerlatency"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -1121,20 +1119,10 @@ func (s *Server) PreStart(ctx context.Context) error {
return err
}

// connManager tracks incoming connections accepted via listeners
// and automatically closes them when the stopper indicates a
// shutdown.
// This handles both:
// - HTTP connections for the admin UI with an optional TLS handshake over HTTP.
// - SQL client connections with a TLS handshake over TCP.
// (gRPC connections are handled separately via s.grpc and perform
// their TLS handshake on their own)
connManager := netutil.MakeServer(workersCtx, s.stopper, uiTLSConfig, http.HandlerFunc(s.http.baseHandler))

// Start the admin UI server. This opens the HTTP listen socket,
// optionally sets up TLS, and dispatches the server worker for the
// web UI.
if err := s.http.start(ctx, workersCtx, connManager, uiTLSConfig, s.stopper); err != nil {
if err := s.http.start(ctx, workersCtx, uiTLSConfig, s.stopper); err != nil {
return err
}

Expand Down Expand Up @@ -1718,7 +1706,6 @@ func (s *Server) PreStart(ctx context.Context) error {
workersCtx,
s.stopper,
s.cfg.TestingKnobs,
connManager,
pgL,
orphanedLeasesTimeThresholdNanos,
); err != nil {
Expand Down Expand Up @@ -1794,7 +1781,6 @@ func (s *Server) AcceptClients(ctx context.Context) error {
if err := s.sqlServer.startServeSQL(
workersCtx,
s.stopper,
s.sqlServer.connManager,
s.sqlServer.pgL,
&s.cfg.SocketFile,
); err != nil {
Expand Down
11 changes: 6 additions & 5 deletions pkg/server/server_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,7 @@ func makeAdminAuthzCheckHandler(
// start starts the network listener for the HTTP interface
// and also starts accepting incoming HTTP connections.
func (s *httpServer) start(
ctx, workersCtx context.Context,
connManager netutil.Server,
uiTLSConfig *tls.Config,
stopper *stop.Stopper,
ctx, workersCtx context.Context, uiTLSConfig *tls.Config, stopper *stop.Stopper,
) error {
httpLn, err := ListenAndUpdateAddrs(ctx, &s.cfg.HTTPAddr, &s.cfg.HTTPAdvertiseAddr, "http")
if err != nil {
Expand Down Expand Up @@ -254,7 +251,7 @@ func (s *httpServer) start(
})
mux.Handle(healthPath, http.HandlerFunc(s.baseHandler))

plainRedirectServer := netutil.MakeServer(workersCtx, stopper, uiTLSConfig, mux)
plainRedirectServer := netutil.MakeHTTPServer(workersCtx, stopper, nil /* tlsConfig */, mux)

netutil.FatalIfUnexpected(plainRedirectServer.Serve(clearL))
}); err != nil {
Expand All @@ -264,6 +261,10 @@ func (s *httpServer) start(
httpLn = tls.NewListener(tlsL, uiTLSConfig)
}

// The connManager is responsible for tearing down the net.Conn
// objects when the stopper tells us to shut down.
connManager := netutil.MakeHTTPServer(workersCtx, stopper, uiTLSConfig, http.HandlerFunc(s.baseHandler))

// Serve the HTTP endpoint. This will be the original httpLn
// listening on --http-addr without TLS if uiTLSConfig was
// nil, or overridden above if uiTLSConfig was not nil to come from
Expand Down
19 changes: 7 additions & 12 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,6 @@ type SQLServer struct {

// pgL is the shared RPC/SQL listener, opened when RPC was initialized.
pgL net.Listener
// connManager is the connection manager to use to set up additional
// SQL listeners in AcceptClients().
connManager netutil.Server

// isReady is the health status of the node. When true, the node is healthy;
// load balancers and connection management tools treat the node as "ready".
Expand Down Expand Up @@ -1293,7 +1290,6 @@ func (s *SQLServer) preStart(
ctx context.Context,
stopper *stop.Stopper,
knobs base.TestingKnobs,
connManager netutil.Server,
pgL net.Listener,
orphanedLeasesTimeThresholdNanos int64,
) error {
Expand Down Expand Up @@ -1359,7 +1355,6 @@ func (s *SQLServer) preStart(
}
}

s.connManager = connManager
s.pgL = pgL
s.execCfg.GCJobNotifier.Start(ctx)
s.temporaryObjectCleaner.Start(ctx, stopper)
Expand Down Expand Up @@ -1527,22 +1522,22 @@ func (s *SQLServer) AnnotateCtx(ctx context.Context) context.Context {
// startServeSQL starts accepting incoming SQL connections over TCP.
// It also starts listening on the Unix socket, if that was configured.
func (s *SQLServer) startServeSQL(
ctx context.Context,
stopper *stop.Stopper,
connManager netutil.Server,
pgL net.Listener,
socketFileCfg *string,
ctx context.Context, stopper *stop.Stopper, pgL net.Listener, socketFileCfg *string,
) error {
log.Ops.Info(ctx, "serving sql connections")
// Start servicing SQL connections.

pgCtx := s.pgServer.AmbientCtx.AnnotateCtx(context.Background())
tcpKeepAlive := makeTCPKeepAliveManager()

// The connManager is responsible for tearing down the net.Conn
// objects when the stopper tells us to shut down.
connManager := netutil.MakeTCPServer(ctx, stopper)

_ = stopper.RunAsyncTaskEx(pgCtx,
stop.TaskOpts{TaskName: "pgwire-listener", SpanOpt: stop.SterileRootSpan},
func(ctx context.Context) {
err := connManager.ServeWith(ctx, stopper, pgL, func(ctx context.Context, conn net.Conn) {
err := connManager.ServeWith(ctx, pgL, func(ctx context.Context, conn net.Conn) {
connCtx := s.pgServer.AnnotateCtxForIncomingConn(ctx, conn)
tcpKeepAlive.configure(connCtx, conn)

Expand Down Expand Up @@ -1594,7 +1589,7 @@ func (s *SQLServer) startServeSQL(
if err := stopper.RunAsyncTaskEx(pgCtx,
stop.TaskOpts{TaskName: "unix-listener", SpanOpt: stop.SterileRootSpan},
func(ctx context.Context) {
err := connManager.ServeWith(ctx, stopper, unixLn, func(ctx context.Context, conn net.Conn) {
err := connManager.ServeWith(ctx, unixLn, func(ctx context.Context, conn net.Conn) {
connCtx := s.pgServer.AnnotateCtxForIncomingConn(ctx, conn)
if err := s.pgServer.ServeConn(connCtx, conn, pgwire.SocketUnix); err != nil {
log.Ops.Errorf(connCtx, "%v", err)
Expand Down
16 changes: 1 addition & 15 deletions pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package server
import (
"context"
"fmt"
"net/http"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -57,7 +56,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/schedulerlatency"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -333,20 +331,10 @@ func (s *SQLServerWrapper) PreStart(ctx context.Context) error {
return err
}

// connManager tracks incoming connections accepted via listeners
// and automatically closes them when the stopper indicates a
// shutdown.
// This handles both:
// - HTTP connections for the admin UI with an optional TLS handshake over HTTP.
// - SQL client connections with a TLS handshake over TCP.
// (gRPC connections are handled separately via s.grpc and perform
// their TLS handshake on their own)
connManager := netutil.MakeServer(workersCtx, s.stopper, uiTLSConfig, http.HandlerFunc(s.http.baseHandler))

// Start the admin UI server. This opens the HTTP listen socket,
// optionally sets up TLS, and dispatches the server worker for the
// web UI.
if err := s.http.start(ctx, workersCtx, connManager, uiTLSConfig, s.stopper); err != nil {
if err := s.http.start(ctx, workersCtx, uiTLSConfig, s.stopper); err != nil {
return err
}

Expand Down Expand Up @@ -585,7 +573,6 @@ func (s *SQLServerWrapper) PreStart(ctx context.Context) error {
workersCtx,
s.stopper,
s.sqlServer.cfg.TestingKnobs,
connManager,
pgL,
orphanedLeasesTimeThresholdNanos,
); err != nil {
Expand Down Expand Up @@ -654,7 +641,6 @@ func (s *SQLServerWrapper) AcceptClients(ctx context.Context) error {
if err := s.sqlServer.startServeSQL(
workersCtx,
s.stopper,
s.sqlServer.connManager,
s.sqlServer.pgL,
&s.sqlServer.cfg.SocketFile,
); err != nil {
Expand Down
84 changes: 55 additions & 29 deletions pkg/util/netutil/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,42 +62,31 @@ func ListenAndServeGRPC(

var httpLogger = log.NewStdLogger(severity.ERROR, "net/http")

// Server is a thin wrapper around http.Server. See MakeServer for more detail.
type Server struct {
// HTTPServer is a thin wrapper around http.Server. See MakeHTTPServer for more detail.
type HTTPServer struct {
*http.Server
}

// MakeServer constructs a Server that tracks active connections,
// MakeHTTPServer constructs a http.Server that tracks active connections,
// closing them when signaled by stopper.
//
// It can serve two different purposes simultaneously:
//
// - to serve as actual HTTP server, using the .Serve(net.Listener) method.
// - to serve as plain TCP server, using the .ServeWith(...) method.
//
// The latter is used e.g. to accept SQL client connections.
//
// When the HTTP facility is not used, the Go HTTP server object is
// still used internally to maintain/register the connections via the
// ConnState() method, for convenience.
func MakeServer(
func MakeHTTPServer(
ctx context.Context, stopper *stop.Stopper, tlsConfig *tls.Config, handler http.Handler,
) Server {
) HTTPServer {
var mu syncutil.Mutex
activeConns := make(map[net.Conn]struct{})
server := Server{
server := HTTPServer{
Server: &http.Server{
Handler: handler,
TLSConfig: tlsConfig,
ConnState: func(conn net.Conn, state http.ConnState) {
mu.Lock()
defer mu.Unlock()
switch state {
case http.StateNew:
activeConns[conn] = struct{}{}
case http.StateClosed:
delete(activeConns, conn)
}
mu.Unlock()
},
ErrorLog: httpLogger,
},
Expand All @@ -113,10 +102,10 @@ func MakeServer(
<-stopper.ShouldQuiesce()

mu.Lock()
defer mu.Unlock()
for conn := range activeConns {
conn.Close()
}
mu.Unlock()
}
if err := stopper.RunAsyncTask(ctx, "http2-wait-quiesce", waitQuiesce); err != nil {
waitQuiesce(ctx)
Expand All @@ -125,12 +114,51 @@ func MakeServer(
return server
}

// MakeTCPServer constructs a connection server that tracks active connections,
// closing them when signaled by stopper.
func MakeTCPServer(ctx context.Context, stopper *stop.Stopper) *TCPServer {
server := &TCPServer{
stopper: stopper,
activeConns: make(map[net.Conn]struct{}),
}

waitQuiesce := func(context.Context) {
<-stopper.ShouldQuiesce()

server.mu.Lock()
defer server.mu.Unlock()
for conn := range server.activeConns {
conn.Close()
}
}
if err := stopper.RunAsyncTask(ctx, "tcp-wait-quiesce", waitQuiesce); err != nil {
waitQuiesce(ctx)
}
return server
}

// TCPServer is wrapper around a map of active connections.
type TCPServer struct {
mu syncutil.Mutex
stopper *stop.Stopper
activeConns map[net.Conn]struct{}
}

func (s *TCPServer) addConn(n net.Conn) {
s.mu.Lock()
defer s.mu.Unlock()
s.activeConns[n] = struct{}{}
}

func (s *TCPServer) rmConn(n net.Conn) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.activeConns, n)
}

// ServeWith accepts connections on ln and serves them using serveConn.
func (s *Server) ServeWith(
ctx context.Context,
stopper *stop.Stopper,
l net.Listener,
serveConn func(context.Context, net.Conn),
func (s *TCPServer) ServeWith(
ctx context.Context, l net.Listener, serveConn func(context.Context, net.Conn),
) error {
// Inspired by net/http.(*Server).Serve
var tempDelay time.Duration // how long to sleep on accept failure
Expand All @@ -154,11 +182,9 @@ func (s *Server) ServeWith(
return e
}
tempDelay = 0
err := stopper.RunAsyncTask(ctx, "pgwire-serve", func(ctx context.Context) {
// NB: ConnState is used to manage the list of active connections that
// need draining; see MakeServer().
s.Server.ConnState(rw, http.StateNew) // before Serve can return
defer s.Server.ConnState(rw, http.StateClosed)
err := s.stopper.RunAsyncTask(ctx, "tcp-serve", func(ctx context.Context) {
s.addConn(rw)
defer s.rmConn(rw)
serveConn(ctx, rw)
})
if err != nil {
Expand Down

0 comments on commit 788c613

Please sign in to comment.