Skip to content

Commit

Permalink
Merge #92574
Browse files Browse the repository at this point in the history
92574: pgwire: unexport ServerMetrics, rename to tenantSpecificMetrics r=stevendanna a=knz

Parent PRs:
- [x] #84608
- [x] #91739
- [x] #91744

Epic: CRDB-14537

Co-authored-by: Raphael 'kena' Poss <knz@thaumogen.net>
  • Loading branch information
craig[bot] and knz committed Nov 29, 2022
2 parents 1af116f + f6b1af7 commit b5cab45
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 35 deletions.
6 changes: 3 additions & 3 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type conn struct {
conn net.Conn

sessionArgs sql.SessionArgs
metrics *ServerMetrics
metrics *tenantSpecificMetrics

// startTime is the time when the connection attempt was first received
// by the server.
Expand Down Expand Up @@ -164,7 +164,7 @@ func (s *Server) serveConn(
log.Infof(ctx, "new connection with options: %+v", sArgs)
}

c := newConn(netConn, sArgs, &s.metrics, connStart, &s.execCfg.Settings.SV)
c := newConn(netConn, sArgs, &s.tenantMetrics, connStart, &s.execCfg.Settings.SV)
c.alwaysLogAuthActivity = alwaysLogAuthActivity || atomic.LoadInt32(&s.testingAuthLogEnabled) > 0
if s.execCfg.PGWireTestingKnobs != nil {
c.afterReadMsgTestingKnob = s.execCfg.PGWireTestingKnobs.AfterReadMsgTestingKnob
Expand All @@ -184,7 +184,7 @@ var alwaysLogAuthActivity = envutil.EnvOrDefaultBool("COCKROACH_ALWAYS_LOG_AUTHN
func newConn(
netConn net.Conn,
sArgs sql.SessionArgs,
metrics *ServerMetrics,
metrics *tenantSpecificMetrics,
connStart time.Time,
sv *settings.Values,
) *conn {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/pgwire/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func waitForClientConn(ln net.Listener) (*conn, error) {
return nil, err
}

metrics := makeServerMetrics(sql.MemoryMetrics{} /* sqlMemMetrics */, metric.TestSampleInterval)
metrics := makeTenantSpecificMetrics(sql.MemoryMetrics{} /* sqlMemMetrics */, metric.TestSampleInterval)
pgwireConn := newConn(conn, sql.SessionArgs{ConnResultsBufferSize: 16 << 10}, &metrics, timeutil.Now(), nil)
return pgwireConn, nil
}
Expand Down Expand Up @@ -1070,7 +1070,7 @@ func TestMaliciousInputs(t *testing.T) {
}(tc)

sqlMetrics := sql.MakeMemMetrics("test" /* endpoint */, time.Second /* histogramWindow */)
metrics := makeServerMetrics(sqlMetrics, time.Second /* histogramWindow */)
metrics := makeTenantSpecificMetrics(sqlMetrics, time.Second /* histogramWindow */)

conn := newConn(
r,
Expand Down
62 changes: 32 additions & 30 deletions pkg/sql/pgwire/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,25 +119,25 @@ const (
var (
MetaConns = metric.Metadata{
Name: "sql.conns",
Help: "Number of active sql connections",
Help: "Number of active SQL connections",
Measurement: "Connections",
Unit: metric.Unit_COUNT,
}
MetaNewConns = metric.Metadata{
Name: "sql.new_conns",
Help: "Counter of the number of sql connections created",
Help: "Counter of the number of SQL connections created",
Measurement: "Connections",
Unit: metric.Unit_COUNT,
}
MetaBytesIn = metric.Metadata{
Name: "sql.bytesin",
Help: "Number of sql bytes received",
Help: "Number of SQL bytes received",
Measurement: "SQL Bytes",
Unit: metric.Unit_BYTES,
}
MetaBytesOut = metric.Metadata{
Name: "sql.bytesout",
Help: "Number of sql bytes sent",
Help: "Number of SQL bytes sent",
Measurement: "SQL Bytes",
Unit: metric.Unit_BYTES,
}
Expand All @@ -149,7 +149,7 @@ var (
}
MetaConnFailures = metric.Metadata{
Name: "sql.conn.failures",
Help: "Number of sql conection failures",
Help: "Number of SQL connection failures",
Measurement: "Connections",
Unit: metric.Unit_COUNT,
}
Expand Down Expand Up @@ -211,14 +211,15 @@ var (
// cancellation function has been called and the cancellation has taken place.
type cancelChanMap map[chan struct{}]context.CancelFunc

// Server implements the server side of the PostgreSQL wire protocol.
// Server implements the server side of the PostgreSQL wire protocol for one
// specific tenant (i.e. its configuration is specific to one tenant).
type Server struct {
AmbientCtx log.AmbientContext
cfg *base.Config
SQLServer *sql.Server
execCfg *sql.ExecutorConfig

metrics ServerMetrics
tenantMetrics tenantSpecificMetrics

mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -273,8 +274,9 @@ type Server struct {
trustClientProvidedRemoteAddr syncutil.AtomicBool
}

// ServerMetrics is the set of metrics for the pgwire server.
type ServerMetrics struct {
// tenantSpecificMetrics is the set of metrics for a pgwire server
// bound to a specific tenant.
type tenantSpecificMetrics struct {
BytesInCount *metric.Counter
BytesOutCount *metric.Counter
Conns *metric.Gauge
Expand All @@ -288,10 +290,10 @@ type ServerMetrics struct {
SQLMemMetrics sql.MemoryMetrics
}

func makeServerMetrics(
func makeTenantSpecificMetrics(
sqlMemMetrics sql.MemoryMetrics, histogramWindow time.Duration,
) ServerMetrics {
return ServerMetrics{
) tenantSpecificMetrics {
return tenantSpecificMetrics{
BytesInCount: metric.NewCounter(MetaBytesIn),
BytesOutCount: metric.NewCounter(MetaBytesOut),
Conns: metric.NewGauge(MetaConns),
Expand Down Expand Up @@ -331,10 +333,10 @@ func MakeServer(
executorConfig *sql.ExecutorConfig,
) *Server {
server := &Server{
AmbientCtx: ambientCtx,
cfg: cfg,
execCfg: executorConfig,
metrics: makeServerMetrics(sqlMemMetrics, histogramWindow),
AmbientCtx: ambientCtx,
cfg: cfg,
execCfg: executorConfig,
tenantMetrics: makeTenantSpecificMetrics(sqlMemMetrics, histogramWindow),
}
server.sqlMemoryPool = mon.NewMonitor("sql",
mon.MemoryResource,
Expand All @@ -355,8 +357,8 @@ func MakeServer(

server.connMonitor = mon.NewMonitor("conn",
mon.MemoryResource,
server.metrics.ConnMemMetrics.CurBytesCount,
server.metrics.ConnMemMetrics.MaxBytesHist,
server.tenantMetrics.ConnMemMetrics.CurBytesCount,
server.tenantMetrics.ConnMemMetrics.MaxBytesHist,
int64(connReservationBatchSize)*baseSQLMemoryBudget, noteworthyConnMemoryUsageBytes, st)
server.connMonitor.StartNoReserved(context.Background(), server.sqlMemoryPool)

Expand All @@ -378,7 +380,7 @@ func MakeServer(

// BytesOut returns the total number of bytes transmitted from this server.
func (s *Server) BytesOut() uint64 {
return uint64(s.metrics.BytesOutCount.Count())
return uint64(s.tenantMetrics.BytesOutCount.Count())
}

// AnnotateCtxForIncomingConn annotates the provided context with a
Expand Down Expand Up @@ -425,7 +427,7 @@ func (s *Server) IsDraining() bool {
// Metrics returns the set of metrics structs.
func (s *Server) Metrics() (res []interface{}) {
return []interface{}{
&s.metrics,
&s.tenantMetrics,
&s.SQLServer.Metrics.StartedStatementCounters,
&s.SQLServer.Metrics.ExecutedStatementCounters,
&s.SQLServer.Metrics.EngineMetrics,
Expand Down Expand Up @@ -725,7 +727,7 @@ func (s *Server) TestingEnableAuthLogging() {
func (s *Server) ServeConn(ctx context.Context, conn net.Conn, socketType SocketType) (err error) {
defer func() {
if err != nil {
s.metrics.ConnFailures.Inc(1)
s.tenantMetrics.ConnFailures.Inc(1)
}
}()

Expand Down Expand Up @@ -903,7 +905,7 @@ func (s *Server) ServeConn(ctx context.Context, conn net.Conn, socketType Socket
// logged so that an operator can be aware of any possibly malicious requests.
func (s *Server) handleCancel(ctx context.Context, conn net.Conn, buf *pgwirebase.ReadBuffer) {
telemetry.Inc(sqltelemetry.CancelRequestCounter)
s.metrics.PGWireCancelTotalCount.Inc(1)
s.tenantMetrics.PGWireCancelTotalCount.Inc(1)

resp, err := func() (*serverpb.CancelQueryByKeyResponse, error) {
backendKeyDataBits, err := buf.GetUint64()
Expand All @@ -929,10 +931,10 @@ func (s *Server) handleCancel(ctx context.Context, conn net.Conn, buf *pgwirebas
}()

if resp != nil && resp.Canceled {
s.metrics.PGWireCancelSuccessfulCount.Inc(1)
s.tenantMetrics.PGWireCancelSuccessfulCount.Inc(1)
} else if err != nil {
if respStatus := status.Convert(err); respStatus.Code() == codes.ResourceExhausted {
s.metrics.PGWireCancelIgnoredCount.Inc(1)
s.tenantMetrics.PGWireCancelIgnoredCount.Inc(1)
}
log.Sessions.Warningf(ctx, "unexpected while handling pgwire cancellation request: %v", err)
}
Expand Down Expand Up @@ -1309,7 +1311,7 @@ func (s *Server) maybeUpgradeToSecureConn(
newConn = tls.Server(conn, tlsConfig)
newConnType = hba.ConnHostSSL
}
s.metrics.BytesOutCount.Inc(int64(n))
s.tenantMetrics.BytesOutCount.Inc(int64(n))

// Finally, re-read the version/command from the client.
newVersion, *buf, serverErr = s.readVersion(newConn)
Expand Down Expand Up @@ -1350,10 +1352,10 @@ func (s *Server) registerConn(
// since DrainClient() waits for that number to drop to zero,
// so we don't want it to oscillate unnecessarily.
if !rejectNewConnections {
s.metrics.NewConns.Inc(1)
s.metrics.Conns.Inc(1)
s.tenantMetrics.NewConns.Inc(1)
s.tenantMetrics.Conns.Inc(1)
prevOnCloseFn := onCloseFn
onCloseFn = func() { prevOnCloseFn(); s.metrics.Conns.Dec(1) }
onCloseFn = func() { prevOnCloseFn(); s.tenantMetrics.Conns.Dec(1) }
}
return
}
Expand All @@ -1376,15 +1378,15 @@ func (s *Server) readVersion(
if err != nil {
return
}
s.metrics.BytesInCount.Inc(int64(n))
s.tenantMetrics.BytesInCount.Inc(int64(n))
return
}

// sendErr sends errors to the client during the connection startup
// sequence. Later error sends during/after authentication are handled
// in conn.go.
func (s *Server) sendErr(ctx context.Context, conn net.Conn, err error) error {
msgBuilder := newWriteBuffer(s.metrics.BytesOutCount)
msgBuilder := newWriteBuffer(s.tenantMetrics.BytesOutCount)
// We could, but do not, report server-side network errors while
// trying to send the client error. This is because clients that
// receive error payload are highly correlated with clients
Expand Down

0 comments on commit b5cab45

Please sign in to comment.