Skip to content

Commit

Permalink
pgwire: lift the PreServe() call to the server package
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
knz committed Dec 19, 2022
1 parent 09fac3c commit 4c6fd26
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 32 deletions.
42 changes: 39 additions & 3 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ type SQLServer struct {
stopper *stop.Stopper
stopTrigger *stopTrigger
sqlIDContainer *base.SQLIDContainer
pgPreServer *pgwire.PreServeConnHandler
pgServer *pgwire.Server
distSQLServer *distsql.ServerImpl
execCfg *sql.ExecutorConfig
Expand Down Expand Up @@ -1035,6 +1036,27 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
// Set up internal memory metrics for use by internal SQL executors.
// Don't add them to the registry now because it will be added as part of pgServer metrics.
sqlMemMetrics := sql.MakeMemMetrics("sql", cfg.HistogramWindowInterval())

// Initialize the pgwire pre-server, which initializes connections,
// sets up TLS and reads client status parameters.
//
// We are initializing preServeHandler here until the following issue is resolved:
// https://github.com/cockroachdb/cockroach/issues/84585
pgPreServer := pgwire.MakePreServeConnHandler(
cfg.AmbientCtx,
cfg.Config,
cfg.Settings,
cfg.rpcContext.GetServerTLSConfig,
cfg.HistogramWindowInterval(),
rootSQLMemoryMonitor,
)

for _, m := range pgPreServer.Metrics() {
cfg.registry.AddMetricStruct(m)
}

// Initialize the pgwire server which handles connections
// established via the pgPreServer.
pgServer := pgwire.MakeServer(
cfg.AmbientCtx,
cfg.Config,
Expand Down Expand Up @@ -1271,6 +1293,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
stopper: cfg.stopper,
stopTrigger: cfg.stopTrigger,
sqlIDContainer: cfg.nodeIDContainer,
pgPreServer: &pgPreServer,
pgServer: pgServer,
distSQLServer: distSQLServer,
execCfg: execCfg,
Expand Down Expand Up @@ -1566,7 +1589,13 @@ func (s *SQLServer) startServeSQL(
connCtx := s.pgServer.AnnotateCtxForIncomingConn(ctx, conn)
tcpKeepAlive.configure(connCtx, conn)

if err := s.pgServer.ServeConn(connCtx, conn, pgwire.SocketTCP); err != nil {
conn, status, err := s.pgPreServer.PreServe(connCtx, conn, pgwire.SocketTCP)
if err != nil {
log.Ops.Errorf(connCtx, "serving SQL client conn: %v", err)
return
}

if err := s.pgServer.ServeConn(connCtx, conn, status); err != nil {
log.Ops.Errorf(connCtx, "serving SQL client conn: %v", err)
}
})
Expand Down Expand Up @@ -1616,8 +1645,15 @@ func (s *SQLServer) startServeSQL(
func(ctx context.Context) {
err := connManager.ServeWith(ctx, unixLn, func(ctx context.Context, conn net.Conn) {
connCtx := s.pgServer.AnnotateCtxForIncomingConn(ctx, conn)
if err := s.pgServer.ServeConn(connCtx, conn, pgwire.SocketUnix); err != nil {
log.Ops.Errorf(connCtx, "%v", err)

conn, status, err := s.pgPreServer.PreServe(connCtx, conn, pgwire.SocketUnix)
if err != nil {
log.Ops.Errorf(connCtx, "serving SQL client conn: %v", err)
return
}

if err := s.pgServer.ServeConn(connCtx, conn, status); err != nil {
log.Ops.Errorf(connCtx, "serving SQL client conn: %v", err)
}
})
netutil.FatalIfUnexpected(err)
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/pgwire/pre_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirecancel"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -94,13 +95,14 @@ type PreServeConnHandler struct {
// MakePreServeConnHandler creates a PreServeConnHandler.
// sv refers to the setting values "outside" of the current tenant - i.e. from the storage cluster.
func MakePreServeConnHandler(
ctx context.Context,
ambientCtx log.AmbientContext,
cfg *base.Config,
st *cluster.Settings,
getTLSConfig func() (*tls.Config, error),
histogramWindow time.Duration,
parentMemoryMonitor *mon.BytesMonitor,
) PreServeConnHandler {
ctx := ambientCtx.AnnotateCtx(context.Background())
metrics := makeTenantIndependentMetrics(histogramWindow)
s := PreServeConnHandler{
errWriter: errWriter{
Expand Down
34 changes: 6 additions & 28 deletions pkg/sql/pgwire/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,6 @@ type Server struct {
SQLServer *sql.Server
execCfg *sql.ExecutorConfig

// preServeHandler is under Server for now, until this issue is addressed:
// https://github.com/cockroachdb/cockroach/issues/84585
preServeHandler PreServeConnHandler

tenantMetrics tenantSpecificMetrics

mu struct {
Expand Down Expand Up @@ -338,16 +334,6 @@ func MakeServer(
execCfg: executorConfig,

tenantMetrics: makeTenantSpecificMetrics(sqlMemMetrics, histogramWindow),

// We are initializing preServeHandler here until the following issue is resolved:
// https://github.com/cockroachdb/cockroach/issues/84585
preServeHandler: MakePreServeConnHandler(
ctx,
cfg, st,
executorConfig.RPCContext.GetServerTLSConfig,
histogramWindow,
parentMemoryMonitor,
),
}
server.sqlMemoryPool = mon.NewMonitor("sql",
mon.MemoryResource,
Expand Down Expand Up @@ -434,13 +420,8 @@ func (s *Server) IsDraining() bool {
}

// Metrics returns the set of metrics structs.
func (s *Server) Metrics() (res []interface{}) {
// We declare the metrics from preServeHandler here
// until this issue is resolved:
// https://github.com/cockroachdb/cockroach/issues/84585
res = s.preServeHandler.Metrics()

return append(res, []interface{}{
func (s *Server) Metrics() []interface{} {
return []interface{}{
&s.tenantMetrics,
&s.SQLServer.Metrics.StartedStatementCounters,
&s.SQLServer.Metrics.ExecutedStatementCounters,
Expand All @@ -453,7 +434,7 @@ func (s *Server) Metrics() (res []interface{}) {
&s.SQLServer.ServerMetrics.StatsMetrics,
&s.SQLServer.ServerMetrics.ContentionSubsystemMetrics,
&s.SQLServer.ServerMetrics.InsightsMetrics,
}...)
}
}

// Drain prevents new connections from being served and waits the duration of
Expand Down Expand Up @@ -738,7 +719,9 @@ func (s *Server) TestingEnableAuthLogging() {
// compatible with postgres.
//
// An error is returned if the initial handshake of the connection fails.
func (s *Server) ServeConn(ctx context.Context, conn net.Conn, socketType SocketType) (err error) {
func (s *Server) ServeConn(
ctx context.Context, conn net.Conn, preServeStatus PreServeStatus,
) (err error) {
defer func() {
if err != nil {
s.tenantMetrics.ConnFailures.Inc(1)
Expand Down Expand Up @@ -780,11 +763,6 @@ func (s *Server) ServeConn(ctx context.Context, conn net.Conn, socketType Socket
}
}()

conn, preServeStatus, err := s.preServeHandler.PreServe(ctx, conn, socketType)
if err != nil {
return err
}

if preServeStatus.State == PreServeCancel {
s.handleCancel(ctx, preServeStatus.CancelKey)
return nil
Expand Down

0 comments on commit 4c6fd26

Please sign in to comment.