Skip to content

Commit

Permalink
pgwire: split the pre-conn reserved memory account
Browse files Browse the repository at this point in the history
This commit extracts the memory reservation into
a tenant-independent memory pool.

Release note: None
  • Loading branch information
knz committed Dec 6, 2022
1 parent 194103b commit 78cace3
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 22 deletions.
8 changes: 4 additions & 4 deletions pkg/sql/pgwire/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/jackc/pgconn"
"github.com/jackc/pgproto3/v2"
"github.com/jackc/pgx/v4"
pgproto3 "github.com/jackc/pgproto3/v2"
pgx "github.com/jackc/pgx/v4"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -1993,7 +1993,7 @@ func TestConnCloseReleasesReservedMem(t *testing.T) {
ctx := context.Background()
defer s.Stopper().Stop(ctx)

before := s.PGServer().(*Server).connMonitor.AllocBytes()
before := s.PGServer().(*Server).tenantSpecificConnMonitor.AllocBytes()

pgURL, cleanupFunc := sqlutils.PGUrl(
t, s.ServingSQLAddr(), "testConnClose" /* prefix */, url.User(username.RootUser),
Expand All @@ -2012,6 +2012,6 @@ func TestConnCloseReleasesReservedMem(t *testing.T) {
require.Regexp(t, "pq: option .* is invalid", err.Error())

// Check that no accounted-for memory is leaked, after the connection attempt fails.
after := s.PGServer().(*Server).connMonitor.AllocBytes()
after := s.PGServer().(*Server).tenantSpecificConnMonitor.AllocBytes()
require.Equal(t, before, after)
}
51 changes: 45 additions & 6 deletions pkg/sql/pgwire/pre_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (
"context"
"crypto/tls"
"net"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
)

// Fully-qualified names for metrics.
Expand Down Expand Up @@ -47,6 +49,18 @@ var (
Measurement: "Connections",
Unit: metric.Unit_COUNT,
}
MetaPreServeMaxBytes = metric.Metadata{
Name: "sql.pre_serve.mem.max",
Help: "Memory usage for SQL connections prior to routing the connection to a specific tenant",
Measurement: "Memory",
Unit: metric.Unit_BYTES,
}
MetaPreServeCurBytes = metric.Metadata{
Name: "sql.pre_serve.mem.cur",
Help: "Current memory usage for SQL connections prior to routing the connection to a specific tenant",
Measurement: "Memory",
Unit: metric.Unit_BYTES,
}
)

// PreServeConnHandler implements the early initialization of an incoming
Expand All @@ -59,23 +73,44 @@ type PreServeConnHandler struct {
tenantIndependentMetrics tenantIndependentMetrics

getTLSConfig func() (*tls.Config, error)

// tenantIndependentConnMonitor is the pool where the
// memory usage for the initial connection overhead
// is accounted for. After the connection is attributed
// to a specific tenant, the account for the initial
// connection overhead is transferred to the per-tenant
// monitor.
tenantIndependentConnMonitor *mon.BytesMonitor
}

// MakePreServeConnHandler creates a PreServeConnHandler.
// sv refers to the setting values "outside" of the current tenant - i.e. from the storage cluster.
func MakePreServeConnHandler(
cfg *base.Config, sv *settings.Values, getTLSConfig func() (*tls.Config, error),
ctx context.Context,
cfg *base.Config,
st *cluster.Settings,
getTLSConfig func() (*tls.Config, error),
histogramWindow time.Duration,
parentMemoryMonitor *mon.BytesMonitor,
) PreServeConnHandler {
metrics := makeTenantIndependentMetrics()
return PreServeConnHandler{
metrics := makeTenantIndependentMetrics(histogramWindow)
s := PreServeConnHandler{
errWriter: errWriter{
sv: sv,
sv: &st.SV,
msgBuilder: newWriteBuffer(metrics.PreServeBytesOutCount),
},
cfg: cfg,
tenantIndependentMetrics: metrics,
getTLSConfig: getTLSConfig,

tenantIndependentConnMonitor: mon.NewMonitor("pre-conn",
mon.MemoryResource,
metrics.PreServeCurBytes,
metrics.PreServeMaxBytes,
int64(connReservationBatchSize)*baseSQLMemoryBudget, noteworthyConnMemoryUsageBytes, st),
}
s.tenantIndependentConnMonitor.StartNoReserved(ctx, parentMemoryMonitor)
return s
}

// tenantIndependentMetrics is the set of metrics for the
Expand All @@ -86,14 +121,18 @@ type tenantIndependentMetrics struct {
PreServeBytesOutCount *metric.Counter
PreServeConnFailures *metric.Counter
PreServeNewConns *metric.Counter
PreServeMaxBytes *metric.Histogram
PreServeCurBytes *metric.Gauge
}

func makeTenantIndependentMetrics() tenantIndependentMetrics {
func makeTenantIndependentMetrics(histogramWindow time.Duration) tenantIndependentMetrics {
return tenantIndependentMetrics{
PreServeBytesInCount: metric.NewCounter(MetaPreServeBytesIn),
PreServeBytesOutCount: metric.NewCounter(MetaPreServeBytesOut),
PreServeNewConns: metric.NewCounter(MetaPreServeNewConns),
PreServeConnFailures: metric.NewCounter(MetaPreServeConnFailures),
PreServeMaxBytes: metric.NewHistogram(MetaPreServeMaxBytes, histogramWindow, metric.MemoryUsage64MBBuckets),
PreServeCurBytes: metric.NewGauge(MetaPreServeCurBytes),
}
}

Expand Down
39 changes: 27 additions & 12 deletions pkg/sql/pgwire/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,13 @@ type Server struct {
identityMap *identmap.Conf
}

// sqlMemoryPool is the parent memory pool for all SQL memory allocations
// for this tenant, including SQL query execution, etc.
sqlMemoryPool *mon.BytesMonitor
connMonitor *mon.BytesMonitor

// tenantSpecificConnMonitor is the pool where the memory usage for the
// initial connection overhead is accounted for.
tenantSpecificConnMonitor *mon.BytesMonitor

// testing{Conn,Auth}LogEnabled is used in unit tests in this
// package to force-enable conn/auth logging without dancing around
Expand Down Expand Up @@ -336,6 +341,7 @@ func MakeServer(
histogramWindow time.Duration,
executorConfig *sql.ExecutorConfig,
) *Server {
ctx := ambientCtx.AnnotateCtx(context.Background())
server := &Server{
AmbientCtx: ambientCtx,
cfg: cfg,
Expand All @@ -346,8 +352,12 @@ func MakeServer(
// We are initializing preServeHandler here until the following issue is resolved:
// https://github.com/cockroachdb/cockroach/issues/84585
preServeHandler: MakePreServeConnHandler(
cfg, &st.SV,
executorConfig.RPCContext.GetServerTLSConfig),
ctx,
cfg, st,
executorConfig.RPCContext.GetServerTLSConfig,
histogramWindow,
parentMemoryMonitor,
),
}
server.sqlMemoryPool = mon.NewMonitor("sql",
mon.MemoryResource,
Expand All @@ -360,30 +370,28 @@ func MakeServer(
nil, /* curCount */
nil, /* maxHist */
0, noteworthySQLMemoryUsageBytes, st)
server.sqlMemoryPool.StartNoReserved(context.Background(), parentMemoryMonitor)
server.sqlMemoryPool.StartNoReserved(ctx, parentMemoryMonitor)
server.SQLServer = sql.NewServer(executorConfig, server.sqlMemoryPool)

// TODO(knz,ben): Use a cluster setting for this.
server.trustClientProvidedRemoteAddr.Set(trustClientProvidedRemoteAddrOverride)

server.connMonitor = mon.NewMonitor("conn",
server.tenantSpecificConnMonitor = mon.NewMonitor("conn",
mon.MemoryResource,
server.tenantMetrics.ConnMemMetrics.CurBytesCount,
server.tenantMetrics.ConnMemMetrics.MaxBytesHist,
int64(connReservationBatchSize)*baseSQLMemoryBudget, noteworthyConnMemoryUsageBytes, st)
server.connMonitor.StartNoReserved(context.Background(), server.sqlMemoryPool)
server.tenantSpecificConnMonitor.StartNoReserved(ctx, server.sqlMemoryPool)

server.mu.Lock()
server.mu.connCancelMap = make(cancelChanMap)
server.mu.Unlock()

connAuthConf.SetOnChange(&st.SV, func(ctx context.Context) {
loadLocalHBAConfigUponRemoteSettingChange(
ambientCtx.AnnotateCtx(context.Background()), server, st)
loadLocalHBAConfigUponRemoteSettingChange(ctx, server, st)
})
ConnIdentityMapConf.SetOnChange(&st.SV, func(ctx context.Context) {
loadLocalIdentityMapUponRemoteSettingChange(
ambientCtx.AnnotateCtx(context.Background()), server, st)
loadLocalIdentityMapUponRemoteSettingChange(ctx, server, st)
})

return server
Expand Down Expand Up @@ -861,7 +869,7 @@ func (s *Server) ServeConn(ctx context.Context, conn net.Conn, socketType Socket
// reduces pressure on the shared pool because the server monitor allocates in
// chunks from the shared pool and these chunks should be larger than
// baseSQLMemoryBudget.
reserved := s.connMonitor.MakeBoundAccount()
reserved := s.preServeHandler.tenantIndependentConnMonitor.MakeBoundAccount()
if err := reserved.Grow(ctx, baseSQLMemoryBudget); err != nil {
return errors.Wrapf(err, "unable to pre-allocate %d bytes for this connection",
baseSQLMemoryBudget)
Expand All @@ -880,6 +888,13 @@ func (s *Server) ServeConn(ctx context.Context, conn net.Conn, socketType Socket
return s.sendErr(ctx, conn, err)
}

// Transfer the memory account into this tenant.
tenantReserved, err := s.tenantSpecificConnMonitor.TransferAccount(ctx, &reserved)
if err != nil {
reserved.Close(ctx)
return s.sendErr(ctx, conn, err)
}

// Populate the client address field in the context tags and the
// shared struct for structured logging.
// Only now do we know the remote client address for sure (it may have
Expand All @@ -900,7 +915,7 @@ func (s *Server) ServeConn(ctx context.Context, conn net.Conn, socketType Socket
// This includes authentication.
s.serveConn(
ctx, conn, sArgs,
&reserved,
&tenantReserved,
connStart,
authOptions{
connType: connType,
Expand Down
18 changes: 18 additions & 0 deletions pkg/util/mon/bytes_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,24 @@ func (mm *BytesMonitor) MakeBoundAccount() BoundAccount {
return BoundAccount{mon: mm}
}

// TransferAccount creates a new account with the budget
// allocated in the given origAccount.
// The new account is owned by this monitor.
//
// If the operation succeeds, origAccount is released.
// If an error occurs, origAccount remains open and the caller
// remains responsible for closing / shrinking it.
func (mm *BytesMonitor) TransferAccount(
ctx context.Context, origAccount *BoundAccount,
) (newAccount BoundAccount, err error) {
b := mm.MakeBoundAccount()
if err = b.Grow(ctx, origAccount.used); err != nil {
return newAccount, err
}
origAccount.Close(ctx)
return b, nil
}

// Init initializes a BoundAccount, connecting it to the given monitor. It is
// similar to MakeBoundAccount, but allows the caller to save a BoundAccount
// allocation.
Expand Down

0 comments on commit 78cace3

Please sign in to comment.