diff --git a/pkg/sql/pgwire/conn.go b/pkg/sql/pgwire/conn.go index 016131aea48b..9e4904fb671c 100644 --- a/pkg/sql/pgwire/conn.go +++ b/pkg/sql/pgwire/conn.go @@ -64,7 +64,7 @@ type conn struct { conn net.Conn sessionArgs sql.SessionArgs - metrics *ServerMetrics + metrics *tenantSpecificMetrics // startTime is the time when the connection attempt was first received // by the server. @@ -164,7 +164,7 @@ func (s *Server) serveConn( log.Infof(ctx, "new connection with options: %+v", sArgs) } - c := newConn(netConn, sArgs, &s.metrics, connStart, &s.execCfg.Settings.SV) + c := newConn(netConn, sArgs, &s.tenantMetrics, connStart, &s.execCfg.Settings.SV) c.alwaysLogAuthActivity = alwaysLogAuthActivity || atomic.LoadInt32(&s.testingAuthLogEnabled) > 0 if s.execCfg.PGWireTestingKnobs != nil { c.afterReadMsgTestingKnob = s.execCfg.PGWireTestingKnobs.AfterReadMsgTestingKnob @@ -184,7 +184,7 @@ var alwaysLogAuthActivity = envutil.EnvOrDefaultBool("COCKROACH_ALWAYS_LOG_AUTHN func newConn( netConn net.Conn, sArgs sql.SessionArgs, - metrics *ServerMetrics, + metrics *tenantSpecificMetrics, connStart time.Time, sv *settings.Values, ) *conn { diff --git a/pkg/sql/pgwire/conn_test.go b/pkg/sql/pgwire/conn_test.go index 70f1df1847d4..cad5bf65d5c1 100644 --- a/pkg/sql/pgwire/conn_test.go +++ b/pkg/sql/pgwire/conn_test.go @@ -541,7 +541,7 @@ func waitForClientConn(ln net.Listener) (*conn, error) { return nil, err } - metrics := makeServerMetrics(sql.MemoryMetrics{} /* sqlMemMetrics */, metric.TestSampleInterval) + metrics := makeTenantSpecificMetrics(sql.MemoryMetrics{} /* sqlMemMetrics */, metric.TestSampleInterval) pgwireConn := newConn(conn, sql.SessionArgs{ConnResultsBufferSize: 16 << 10}, &metrics, timeutil.Now(), nil) return pgwireConn, nil } @@ -1070,7 +1070,7 @@ func TestMaliciousInputs(t *testing.T) { }(tc) sqlMetrics := sql.MakeMemMetrics("test" /* endpoint */, time.Second /* histogramWindow */) - metrics := makeServerMetrics(sqlMetrics, time.Second /* histogramWindow */) + metrics := makeTenantSpecificMetrics(sqlMetrics, time.Second /* histogramWindow */) conn := newConn( r, diff --git a/pkg/sql/pgwire/server.go b/pkg/sql/pgwire/server.go index 735b9027a221..9b4111d6568c 100644 --- a/pkg/sql/pgwire/server.go +++ b/pkg/sql/pgwire/server.go @@ -119,25 +119,25 @@ const ( var ( MetaConns = metric.Metadata{ Name: "sql.conns", - Help: "Number of active sql connections", + Help: "Number of active SQL connections", Measurement: "Connections", Unit: metric.Unit_COUNT, } MetaNewConns = metric.Metadata{ Name: "sql.new_conns", - Help: "Counter of the number of sql connections created", + Help: "Counter of the number of SQL connections created", Measurement: "Connections", Unit: metric.Unit_COUNT, } MetaBytesIn = metric.Metadata{ Name: "sql.bytesin", - Help: "Number of sql bytes received", + Help: "Number of SQL bytes received", Measurement: "SQL Bytes", Unit: metric.Unit_BYTES, } MetaBytesOut = metric.Metadata{ Name: "sql.bytesout", - Help: "Number of sql bytes sent", + Help: "Number of SQL bytes sent", Measurement: "SQL Bytes", Unit: metric.Unit_BYTES, } @@ -149,7 +149,7 @@ var ( } MetaConnFailures = metric.Metadata{ Name: "sql.conn.failures", - Help: "Number of sql conection failures", + Help: "Number of SQL connection failures", Measurement: "Connections", Unit: metric.Unit_COUNT, } @@ -211,14 +211,15 @@ var ( // cancellation function has been called and the cancellation has taken place. type cancelChanMap map[chan struct{}]context.CancelFunc -// Server implements the server side of the PostgreSQL wire protocol. +// Server implements the server side of the PostgreSQL wire protocol for one +// specific tenant (i.e. its configuration is specific to one tenant). type Server struct { AmbientCtx log.AmbientContext cfg *base.Config SQLServer *sql.Server execCfg *sql.ExecutorConfig - metrics ServerMetrics + tenantMetrics tenantSpecificMetrics mu struct { syncutil.Mutex @@ -273,8 +274,9 @@ type Server struct { trustClientProvidedRemoteAddr syncutil.AtomicBool } -// ServerMetrics is the set of metrics for the pgwire server. -type ServerMetrics struct { +// tenantSpecificMetrics is the set of metrics for a pgwire server +// bound to a specific tenant. +type tenantSpecificMetrics struct { BytesInCount *metric.Counter BytesOutCount *metric.Counter Conns *metric.Gauge @@ -288,10 +290,10 @@ type ServerMetrics struct { SQLMemMetrics sql.MemoryMetrics } -func makeServerMetrics( +func makeTenantSpecificMetrics( sqlMemMetrics sql.MemoryMetrics, histogramWindow time.Duration, -) ServerMetrics { - return ServerMetrics{ +) tenantSpecificMetrics { + return tenantSpecificMetrics{ BytesInCount: metric.NewCounter(MetaBytesIn), BytesOutCount: metric.NewCounter(MetaBytesOut), Conns: metric.NewGauge(MetaConns), @@ -331,10 +333,10 @@ func MakeServer( executorConfig *sql.ExecutorConfig, ) *Server { server := &Server{ - AmbientCtx: ambientCtx, - cfg: cfg, - execCfg: executorConfig, - metrics: makeServerMetrics(sqlMemMetrics, histogramWindow), + AmbientCtx: ambientCtx, + cfg: cfg, + execCfg: executorConfig, + tenantMetrics: makeTenantSpecificMetrics(sqlMemMetrics, histogramWindow), } server.sqlMemoryPool = mon.NewMonitor("sql", mon.MemoryResource, @@ -355,8 +357,8 @@ func MakeServer( server.connMonitor = mon.NewMonitor("conn", mon.MemoryResource, - server.metrics.ConnMemMetrics.CurBytesCount, - server.metrics.ConnMemMetrics.MaxBytesHist, + server.tenantMetrics.ConnMemMetrics.CurBytesCount, + server.tenantMetrics.ConnMemMetrics.MaxBytesHist, int64(connReservationBatchSize)*baseSQLMemoryBudget, noteworthyConnMemoryUsageBytes, st) server.connMonitor.StartNoReserved(context.Background(), server.sqlMemoryPool) @@ -378,7 +380,7 @@ func MakeServer( // BytesOut returns the total number of bytes transmitted from this server. func (s *Server) BytesOut() uint64 { - return uint64(s.metrics.BytesOutCount.Count()) + return uint64(s.tenantMetrics.BytesOutCount.Count()) } // AnnotateCtxForIncomingConn annotates the provided context with a @@ -425,7 +427,7 @@ func (s *Server) IsDraining() bool { // Metrics returns the set of metrics structs. func (s *Server) Metrics() (res []interface{}) { return []interface{}{ - &s.metrics, + &s.tenantMetrics, &s.SQLServer.Metrics.StartedStatementCounters, &s.SQLServer.Metrics.ExecutedStatementCounters, &s.SQLServer.Metrics.EngineMetrics, @@ -725,7 +727,7 @@ func (s *Server) TestingEnableAuthLogging() { func (s *Server) ServeConn(ctx context.Context, conn net.Conn, socketType SocketType) (err error) { defer func() { if err != nil { - s.metrics.ConnFailures.Inc(1) + s.tenantMetrics.ConnFailures.Inc(1) } }() @@ -903,7 +905,7 @@ func (s *Server) ServeConn(ctx context.Context, conn net.Conn, socketType Socket // logged so that an operator can be aware of any possibly malicious requests. func (s *Server) handleCancel(ctx context.Context, conn net.Conn, buf *pgwirebase.ReadBuffer) { telemetry.Inc(sqltelemetry.CancelRequestCounter) - s.metrics.PGWireCancelTotalCount.Inc(1) + s.tenantMetrics.PGWireCancelTotalCount.Inc(1) resp, err := func() (*serverpb.CancelQueryByKeyResponse, error) { backendKeyDataBits, err := buf.GetUint64() @@ -929,10 +931,10 @@ func (s *Server) handleCancel(ctx context.Context, conn net.Conn, buf *pgwirebas }() if resp != nil && resp.Canceled { - s.metrics.PGWireCancelSuccessfulCount.Inc(1) + s.tenantMetrics.PGWireCancelSuccessfulCount.Inc(1) } else if err != nil { if respStatus := status.Convert(err); respStatus.Code() == codes.ResourceExhausted { - s.metrics.PGWireCancelIgnoredCount.Inc(1) + s.tenantMetrics.PGWireCancelIgnoredCount.Inc(1) } log.Sessions.Warningf(ctx, "unexpected while handling pgwire cancellation request: %v", err) } @@ -1309,7 +1311,7 @@ func (s *Server) maybeUpgradeToSecureConn( newConn = tls.Server(conn, tlsConfig) newConnType = hba.ConnHostSSL } - s.metrics.BytesOutCount.Inc(int64(n)) + s.tenantMetrics.BytesOutCount.Inc(int64(n)) // Finally, re-read the version/command from the client. newVersion, *buf, serverErr = s.readVersion(newConn) @@ -1350,10 +1352,10 @@ func (s *Server) registerConn( // since DrainClient() waits for that number to drop to zero, // so we don't want it to oscillate unnecessarily. if !rejectNewConnections { - s.metrics.NewConns.Inc(1) - s.metrics.Conns.Inc(1) + s.tenantMetrics.NewConns.Inc(1) + s.tenantMetrics.Conns.Inc(1) prevOnCloseFn := onCloseFn - onCloseFn = func() { prevOnCloseFn(); s.metrics.Conns.Dec(1) } + onCloseFn = func() { prevOnCloseFn(); s.tenantMetrics.Conns.Dec(1) } } return } @@ -1376,7 +1378,7 @@ func (s *Server) readVersion( if err != nil { return } - s.metrics.BytesInCount.Inc(int64(n)) + s.tenantMetrics.BytesInCount.Inc(int64(n)) return } @@ -1384,7 +1386,7 @@ func (s *Server) readVersion( // sequence. Later error sends during/after authentication are handled // in conn.go. func (s *Server) sendErr(ctx context.Context, conn net.Conn, err error) error { - msgBuilder := newWriteBuffer(s.metrics.BytesOutCount) + msgBuilder := newWriteBuffer(s.tenantMetrics.BytesOutCount) // We could, but do not, report server-side network errors while // trying to send the client error. This is because clients that // receive error payload are highly correlated with clients