From 788c613f014e477b971ed1770ca1b61265abfbd6 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 11 Nov 2022 14:40:13 +0100 Subject: [PATCH] server,netutil: clean up the connManager logic Prior to this patch, we were using the same netutil.Server object to manage both `net.Conn` created to serve HTTP connections, and `net.Conn` created to serve SQL connections. This was confusing, because a lot of the complexity specific for HTTP connections (integration with the HTTP2 query handling, etc) is not required for raw TCP connections as used by pgwire. This commit clarifies this by separating the two roles. This also cleans up the server initialization. Release note: None --- pkg/server/server.go | 16 +------- pkg/server/server_http.go | 11 ++--- pkg/server/server_sql.go | 19 ++++----- pkg/server/tenant.go | 16 +------- pkg/util/netutil/net.go | 84 +++++++++++++++++++++++++-------------- 5 files changed, 70 insertions(+), 76 deletions(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index 999d6beddf95..10151cececbd 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -13,7 +13,6 @@ package server import ( "context" "fmt" - "net/http" "os" "path/filepath" "reflect" @@ -88,7 +87,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/mon" - "github.com/cockroachdb/cockroach/pkg/util/netutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/schedulerlatency" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -1121,20 +1119,10 @@ func (s *Server) PreStart(ctx context.Context) error { return err } - // connManager tracks incoming connections accepted via listeners - // and automatically closes them when the stopper indicates a - // shutdown. - // This handles both: - // - HTTP connections for the admin UI with an optional TLS handshake over HTTP. - // - SQL client connections with a TLS handshake over TCP. - // (gRPC connections are handled separately via s.grpc and perform - // their TLS handshake on their own) - connManager := netutil.MakeServer(workersCtx, s.stopper, uiTLSConfig, http.HandlerFunc(s.http.baseHandler)) - // Start the admin UI server. This opens the HTTP listen socket, // optionally sets up TLS, and dispatches the server worker for the // web UI. - if err := s.http.start(ctx, workersCtx, connManager, uiTLSConfig, s.stopper); err != nil { + if err := s.http.start(ctx, workersCtx, uiTLSConfig, s.stopper); err != nil { return err } @@ -1718,7 +1706,6 @@ func (s *Server) PreStart(ctx context.Context) error { workersCtx, s.stopper, s.cfg.TestingKnobs, - connManager, pgL, orphanedLeasesTimeThresholdNanos, ); err != nil { @@ -1794,7 +1781,6 @@ func (s *Server) AcceptClients(ctx context.Context) error { if err := s.sqlServer.startServeSQL( workersCtx, s.stopper, - s.sqlServer.connManager, s.sqlServer.pgL, &s.cfg.SocketFile, ); err != nil { diff --git a/pkg/server/server_http.go b/pkg/server/server_http.go index 1faba919e336..7fdc8ba024ce 100644 --- a/pkg/server/server_http.go +++ b/pkg/server/server_http.go @@ -200,10 +200,7 @@ func makeAdminAuthzCheckHandler( // start starts the network listener for the HTTP interface // and also starts accepting incoming HTTP connections. func (s *httpServer) start( - ctx, workersCtx context.Context, - connManager netutil.Server, - uiTLSConfig *tls.Config, - stopper *stop.Stopper, + ctx, workersCtx context.Context, uiTLSConfig *tls.Config, stopper *stop.Stopper, ) error { httpLn, err := ListenAndUpdateAddrs(ctx, &s.cfg.HTTPAddr, &s.cfg.HTTPAdvertiseAddr, "http") if err != nil { @@ -254,7 +251,7 @@ func (s *httpServer) start( }) mux.Handle(healthPath, http.HandlerFunc(s.baseHandler)) - plainRedirectServer := netutil.MakeServer(workersCtx, stopper, uiTLSConfig, mux) + plainRedirectServer := netutil.MakeHTTPServer(workersCtx, stopper, nil /* tlsConfig */, mux) netutil.FatalIfUnexpected(plainRedirectServer.Serve(clearL)) }); err != nil { @@ -264,6 +261,10 @@ func (s *httpServer) start( httpLn = tls.NewListener(tlsL, uiTLSConfig) } + // The connManager is responsible for tearing down the net.Conn + // objects when the stopper tells us to shut down. + connManager := netutil.MakeHTTPServer(workersCtx, stopper, uiTLSConfig, http.HandlerFunc(s.baseHandler)) + // Serve the HTTP endpoint. This will be the original httpLn // listening on --http-addr without TLS if uiTLSConfig was // nil, or overridden above if uiTLSConfig was not nil to come from diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 7aa89c6cf830..5c28204eae56 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -180,9 +180,6 @@ type SQLServer struct { // pgL is the shared RPC/SQL listener, opened when RPC was initialized. pgL net.Listener - // connManager is the connection manager to use to set up additional - // SQL listeners in AcceptClients(). - connManager netutil.Server // isReady is the health status of the node. When true, the node is healthy; // load balancers and connection management tools treat the node as "ready". @@ -1293,7 +1290,6 @@ func (s *SQLServer) preStart( ctx context.Context, stopper *stop.Stopper, knobs base.TestingKnobs, - connManager netutil.Server, pgL net.Listener, orphanedLeasesTimeThresholdNanos int64, ) error { @@ -1359,7 +1355,6 @@ func (s *SQLServer) preStart( } } - s.connManager = connManager s.pgL = pgL s.execCfg.GCJobNotifier.Start(ctx) s.temporaryObjectCleaner.Start(ctx, stopper) @@ -1527,11 +1522,7 @@ func (s *SQLServer) AnnotateCtx(ctx context.Context) context.Context { // startServeSQL starts accepting incoming SQL connections over TCP. // It also starts listening on the Unix socket, if that was configured. func (s *SQLServer) startServeSQL( - ctx context.Context, - stopper *stop.Stopper, - connManager netutil.Server, - pgL net.Listener, - socketFileCfg *string, + ctx context.Context, stopper *stop.Stopper, pgL net.Listener, socketFileCfg *string, ) error { log.Ops.Info(ctx, "serving sql connections") // Start servicing SQL connections. @@ -1539,10 +1530,14 @@ func (s *SQLServer) startServeSQL( pgCtx := s.pgServer.AmbientCtx.AnnotateCtx(context.Background()) tcpKeepAlive := makeTCPKeepAliveManager() + // The connManager is responsible for tearing down the net.Conn + // objects when the stopper tells us to shut down. + connManager := netutil.MakeTCPServer(ctx, stopper) + _ = stopper.RunAsyncTaskEx(pgCtx, stop.TaskOpts{TaskName: "pgwire-listener", SpanOpt: stop.SterileRootSpan}, func(ctx context.Context) { - err := connManager.ServeWith(ctx, stopper, pgL, func(ctx context.Context, conn net.Conn) { + err := connManager.ServeWith(ctx, pgL, func(ctx context.Context, conn net.Conn) { connCtx := s.pgServer.AnnotateCtxForIncomingConn(ctx, conn) tcpKeepAlive.configure(connCtx, conn) @@ -1594,7 +1589,7 @@ func (s *SQLServer) startServeSQL( if err := stopper.RunAsyncTaskEx(pgCtx, stop.TaskOpts{TaskName: "unix-listener", SpanOpt: stop.SterileRootSpan}, func(ctx context.Context) { - err := connManager.ServeWith(ctx, stopper, unixLn, func(ctx context.Context, conn net.Conn) { + err := connManager.ServeWith(ctx, unixLn, func(ctx context.Context, conn net.Conn) { connCtx := s.pgServer.AnnotateCtxForIncomingConn(ctx, conn) if err := s.pgServer.ServeConn(connCtx, conn, pgwire.SocketUnix); err != nil { log.Ops.Errorf(connCtx, "%v", err) diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 6dacbbd1c0df..a183bc720406 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -13,7 +13,6 @@ package server import ( "context" "fmt" - "net/http" "os" "path/filepath" "strconv" @@ -57,7 +56,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" - "github.com/cockroachdb/cockroach/pkg/util/netutil" "github.com/cockroachdb/cockroach/pkg/util/schedulerlatency" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -333,20 +331,10 @@ func (s *SQLServerWrapper) PreStart(ctx context.Context) error { return err } - // connManager tracks incoming connections accepted via listeners - // and automatically closes them when the stopper indicates a - // shutdown. - // This handles both: - // - HTTP connections for the admin UI with an optional TLS handshake over HTTP. - // - SQL client connections with a TLS handshake over TCP. - // (gRPC connections are handled separately via s.grpc and perform - // their TLS handshake on their own) - connManager := netutil.MakeServer(workersCtx, s.stopper, uiTLSConfig, http.HandlerFunc(s.http.baseHandler)) - // Start the admin UI server. This opens the HTTP listen socket, // optionally sets up TLS, and dispatches the server worker for the // web UI. - if err := s.http.start(ctx, workersCtx, connManager, uiTLSConfig, s.stopper); err != nil { + if err := s.http.start(ctx, workersCtx, uiTLSConfig, s.stopper); err != nil { return err } @@ -585,7 +573,6 @@ func (s *SQLServerWrapper) PreStart(ctx context.Context) error { workersCtx, s.stopper, s.sqlServer.cfg.TestingKnobs, - connManager, pgL, orphanedLeasesTimeThresholdNanos, ); err != nil { @@ -654,7 +641,6 @@ func (s *SQLServerWrapper) AcceptClients(ctx context.Context) error { if err := s.sqlServer.startServeSQL( workersCtx, s.stopper, - s.sqlServer.connManager, s.sqlServer.pgL, &s.sqlServer.cfg.SocketFile, ); err != nil { diff --git a/pkg/util/netutil/net.go b/pkg/util/netutil/net.go index 30aeefff29b1..76cad5b855c3 100644 --- a/pkg/util/netutil/net.go +++ b/pkg/util/netutil/net.go @@ -62,42 +62,31 @@ func ListenAndServeGRPC( var httpLogger = log.NewStdLogger(severity.ERROR, "net/http") -// Server is a thin wrapper around http.Server. See MakeServer for more detail. -type Server struct { +// HTTPServer is a thin wrapper around http.Server. See MakeHTTPServer for more detail. +type HTTPServer struct { *http.Server } -// MakeServer constructs a Server that tracks active connections, +// MakeHTTPServer constructs a http.Server that tracks active connections, // closing them when signaled by stopper. -// -// It can serve two different purposes simultaneously: -// -// - to serve as actual HTTP server, using the .Serve(net.Listener) method. -// - to serve as plain TCP server, using the .ServeWith(...) method. -// -// The latter is used e.g. to accept SQL client connections. -// -// When the HTTP facility is not used, the Go HTTP server object is -// still used internally to maintain/register the connections via the -// ConnState() method, for convenience. -func MakeServer( +func MakeHTTPServer( ctx context.Context, stopper *stop.Stopper, tlsConfig *tls.Config, handler http.Handler, -) Server { +) HTTPServer { var mu syncutil.Mutex activeConns := make(map[net.Conn]struct{}) - server := Server{ + server := HTTPServer{ Server: &http.Server{ Handler: handler, TLSConfig: tlsConfig, ConnState: func(conn net.Conn, state http.ConnState) { mu.Lock() + defer mu.Unlock() switch state { case http.StateNew: activeConns[conn] = struct{}{} case http.StateClosed: delete(activeConns, conn) } - mu.Unlock() }, ErrorLog: httpLogger, }, @@ -113,10 +102,10 @@ func MakeServer( <-stopper.ShouldQuiesce() mu.Lock() + defer mu.Unlock() for conn := range activeConns { conn.Close() } - mu.Unlock() } if err := stopper.RunAsyncTask(ctx, "http2-wait-quiesce", waitQuiesce); err != nil { waitQuiesce(ctx) @@ -125,12 +114,51 @@ func MakeServer( return server } +// MakeTCPServer constructs a connection server that tracks active connections, +// closing them when signaled by stopper. +func MakeTCPServer(ctx context.Context, stopper *stop.Stopper) *TCPServer { + server := &TCPServer{ + stopper: stopper, + activeConns: make(map[net.Conn]struct{}), + } + + waitQuiesce := func(context.Context) { + <-stopper.ShouldQuiesce() + + server.mu.Lock() + defer server.mu.Unlock() + for conn := range server.activeConns { + conn.Close() + } + } + if err := stopper.RunAsyncTask(ctx, "tcp-wait-quiesce", waitQuiesce); err != nil { + waitQuiesce(ctx) + } + return server +} + +// TCPServer is wrapper around a map of active connections. +type TCPServer struct { + mu syncutil.Mutex + stopper *stop.Stopper + activeConns map[net.Conn]struct{} +} + +func (s *TCPServer) addConn(n net.Conn) { + s.mu.Lock() + defer s.mu.Unlock() + s.activeConns[n] = struct{}{} +} + +func (s *TCPServer) rmConn(n net.Conn) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.activeConns, n) +} + // ServeWith accepts connections on ln and serves them using serveConn. -func (s *Server) ServeWith( - ctx context.Context, - stopper *stop.Stopper, - l net.Listener, - serveConn func(context.Context, net.Conn), +func (s *TCPServer) ServeWith( + ctx context.Context, l net.Listener, serveConn func(context.Context, net.Conn), ) error { // Inspired by net/http.(*Server).Serve var tempDelay time.Duration // how long to sleep on accept failure @@ -154,11 +182,9 @@ func (s *Server) ServeWith( return e } tempDelay = 0 - err := stopper.RunAsyncTask(ctx, "pgwire-serve", func(ctx context.Context) { - // NB: ConnState is used to manage the list of active connections that - // need draining; see MakeServer(). - s.Server.ConnState(rw, http.StateNew) // before Serve can return - defer s.Server.ConnState(rw, http.StateClosed) + err := s.stopper.RunAsyncTask(ctx, "tcp-serve", func(ctx context.Context) { + s.addConn(rw) + defer s.rmConn(rw) serveConn(ctx, rw) }) if err != nil {