Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
77442: sqlproxyccl: Add codeUnavailable to the list of error codes r=alyshanjahani-crl a=alyshanjahani-crl

Release justification: fixes for high-priority bug in existing functionality

Previously, if a tenant cluster had maxPods set to 0 the error returned by
directory.EnsureTenantAddr was not treated as a non-retryable error.

The tenant directory implementation used in CockroachCloud now identifies
this situation and returns a status error with codes.FailedPrecondition and
a descriptive message.

This patch adds a check for the FailedPrecondition error in
connector.lookupAddr.

Release note: None

77626: sql: re-enable txnidcache by default r=maryliag,ajwerner a=Azhng

Previously, TxnID cache was disabled by default due to performance
issues. This has since been resolved in #77220.
This commit re-enables TxnID Cache by default.

Release justification: low risk high reward change.
Release note: None

77675: cmd/roachtest: tweaks to sstable corruption test r=itsbilal a=nicktrav

A few minor changes to the existing roachtest:

Reduce the warehouse count on ingest back to 100 - this should be ample,
and it matches the warehouse count on the read path in a later part of
the test.

Change the stop options to wait until the process has been terminated
before continuing.

Re-work the command to search for SSTables to corrupt - use tables from
the most recent manifest (if multiple are present); consider SSTables
with either a start or end key representing a user table; shuffle tables
to distribute corruption over the LSM; perform filtering in the bash
command rather than in the test runner itself.

Release justification: Roachtest-only change.

Release note: None.

Touches #77321.

77700: ccl/sqlproxyccl: add connection migration-related metrics r=JeffSwenson a=jaylim-crl

#### ccl/sqlproxyccl: add metric that measures connection migration latency

Previously, we added support for connection migration in #76805. This commit
adds a new `proxy.conn_migration.attempted.latency` metric that tracks latency
for attempted connection migrations. Having this metric would be beneficial
as we can now know how long users were blocked during connection migrations.

Release justification: Low-risk metric-only change within sqlproxy.

Release note: None

#### ccl/sqlproxyccl: add metric that records the transfer response message size

Informs #76000, and follow up to #76805.

This commit adds a new proxy.conn_migration.transfer_response.message_size
metric that will track the distribution of the transfer response message size.
This will be used to tune a value for the SQL cluster setting:
sql.session_transfer.max_session_size.

Release justification: Low-risk metric-only change within sqlproxy.

Release note: None

Co-authored-by: Alyshan Jahani <alyshan@cockroachlabs.com>
Co-authored-by: Azhng <archer.xn@gmail.com>
Co-authored-by: Nick Travers <travers@cockroachlabs.com>
Co-authored-by: Jay <jay@cockroachlabs.com>
  • Loading branch information
5 people committed Mar 14, 2022
5 parents 1dd7ed3 + 5bd1ae2 + b1f6722 + 436026f + 454b896 commit 51cbdce
Show file tree
Hide file tree
Showing 15 changed files with 192 additions and 70 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ server.web_session_timeout duration 168h0m0s the duration that a newly created w
sql.auth.resolve_membership_single_scan.enabled boolean true determines whether to populate the role membership cache with a single scan
sql.contention.event_store.capacity byte size 64 MiB the in-memory storage capacity per-node of contention event store
sql.contention.event_store.duration_threshold duration 0s minimum contention duration to cause the contention events to be collected into crdb_internal.transaction_contention_events
sql.contention.txn_id_cache.max_size byte size 0 B the maximum byte size TxnID cache will use (set to 0 to disable)
sql.contention.txn_id_cache.max_size byte size 64 MiB the maximum byte size TxnID cache will use (set to 0 to disable)
sql.cross_db_fks.enabled boolean false if true, creating foreign key references across databases is allowed
sql.cross_db_sequence_owners.enabled boolean false if true, creating sequences owned by tables from other databases is allowed
sql.cross_db_sequence_references.enabled boolean false if true, sequences referenced by tables from other databases are allowed
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
<tr><td><code>sql.auth.resolve_membership_single_scan.enabled</code></td><td>boolean</td><td><code>true</code></td><td>determines whether to populate the role membership cache with a single scan</td></tr>
<tr><td><code>sql.contention.event_store.capacity</code></td><td>byte size</td><td><code>64 MiB</code></td><td>the in-memory storage capacity per-node of contention event store</td></tr>
<tr><td><code>sql.contention.event_store.duration_threshold</code></td><td>duration</td><td><code>0s</code></td><td>minimum contention duration to cause the contention events to be collected into crdb_internal.transaction_contention_events</td></tr>
<tr><td><code>sql.contention.txn_id_cache.max_size</code></td><td>byte size</td><td><code>0 B</code></td><td>the maximum byte size TxnID cache will use (set to 0 to disable)</td></tr>
<tr><td><code>sql.contention.txn_id_cache.max_size</code></td><td>byte size</td><td><code>64 MiB</code></td><td>the maximum byte size TxnID cache will use (set to 0 to disable)</td></tr>
<tr><td><code>sql.cross_db_fks.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, creating foreign key references across databases is allowed</td></tr>
<tr><td><code>sql.cross_db_sequence_owners.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, creating sequences owned by tables from other databases is allowed</td></tr>
<tr><td><code>sql.cross_db_sequence_references.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, sequences referenced by tables from other databases are allowed</td></tr>
Expand Down
9 changes: 0 additions & 9 deletions pkg/ccl/serverccl/statusccl/tenant_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,6 @@ func TestTenantStatusAPI(t *testing.T) {
testHelper := newTestTenantHelper(t, 3 /* tenantClusterSize */, knobs)
defer testHelper.cleanup(ctx, t)

for _, conn := range []*sqlutils.SQLRunner{
sqlutils.MakeSQLRunner(testHelper.hostCluster.ServerConn(0)),
testHelper.testCluster().tenantConn(0),
} {
conn.Exec(
t, "SET CLUSTER SETTING sql.contention.txn_id_cache.max_size = '10MB'",
)
}

t.Run("reset_sql_stats", func(t *testing.T) {
testResetSQLStatsRPCForTenant(ctx, t, testHelper)
})
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/ccl/sqlproxyccl/denylist",
"//pkg/ccl/sqlproxyccl/idle",
"//pkg/ccl/sqlproxyccl/interceptor",
Expand Down
41 changes: 31 additions & 10 deletions pkg/ccl/sqlproxyccl/conn_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
Expand Down Expand Up @@ -133,18 +134,22 @@ func (f *forwarder) runTransfer() (retErr error) {
// the context (with the span) gets cleaned up. Some ideas to fix this:
// (1) errgroup (?), (2) use the stopper instead of the go keyword - that
// should fork a new span, and avoid this issue.
tBegin := timeutil.Now()
logCtx := logtags.WithTags(context.Background(), logtags.FromContext(f.ctx))
defer func() {
latencyDur := timeutil.Since(tBegin)
f.metrics.ConnMigrationAttemptedLatency.RecordValue(latencyDur.Nanoseconds())

if !ctx.isRecoverable() {
log.Infof(logCtx, "transfer failed: connection closed, err=%v", retErr)
log.Infof(logCtx, "transfer failed: connection closed, latency=%v, err=%v", latencyDur, retErr)
f.metrics.ConnMigrationErrorFatalCount.Inc(1)
} else {
// Transfer was successful.
if retErr == nil {
log.Infof(logCtx, "transfer successful")
log.Infof(logCtx, "transfer successful, latency=%v", latencyDur)
f.metrics.ConnMigrationSuccessCount.Inc(1)
} else {
log.Infof(logCtx, "transfer failed: connection recovered, err=%v", retErr)
log.Infof(logCtx, "transfer failed: connection recovered, latency=%v, err=%v", latencyDur, retErr)
f.metrics.ConnMigrationErrorRecoverableCount.Inc(1)
}
if err := f.resumeProcessors(); err != nil {
Expand All @@ -165,7 +170,7 @@ func (f *forwarder) runTransfer() (retErr error) {

// Transfer the connection.
clientConn, serverConn := f.getConns()
newServerConn, err := transferConnection(ctx, f.connector, clientConn, serverConn)
newServerConn, err := transferConnection(ctx, f.connector, f.metrics, clientConn, serverConn)
if err != nil {
return errors.Wrap(err, "transferring connection")
}
Expand All @@ -179,7 +184,10 @@ func (f *forwarder) runTransfer() (retErr error) {
// connection, and returns the a new connection to the server that the
// connection got transferred to.
func transferConnection(
ctx *transferContext, connector *connector, clientConn, serverConn *interceptor.PGConn,
ctx *transferContext,
connector *connector,
metrics *metrics,
clientConn, serverConn *interceptor.PGConn,
) (_ *interceptor.PGConn, retErr error) {
ctx.markRecoverable(true)

Expand All @@ -198,7 +206,7 @@ func transferConnection(
}

transferErr, state, revivalToken, err := waitForShowTransferState(
ctx, serverConn.ToFrontendConn(), clientConn, transferKey)
ctx, serverConn.ToFrontendConn(), clientConn, transferKey, metrics)
if err != nil {
return nil, errors.Wrap(err, "waiting for transfer state")
}
Expand Down Expand Up @@ -306,6 +314,9 @@ var runShowTransferState = func(w io.Writer, transferKey string) error {
// Since ReadyForQuery may be for a previous pipelined query, this handles the
// forwarding of messages back to the client in case we don't see our state yet.
//
// metrics is optional, and if not nil, it will be used to record the transfer
// response message size in ConnMigrationTransferResponseMessageSize.
//
// WARNING: When using this, we assume that no other goroutines are using both
// serverConn and clientConn. In the context of a transfer, the response
// processor must be blocked to avoid concurrent reads from serverConn.
Expand All @@ -314,6 +325,7 @@ var waitForShowTransferState = func(
serverConn *interceptor.FrontendConn,
clientConn io.Writer,
transferKey string,
metrics *metrics,
) (transferErr string, state string, revivalToken string, retErr error) {
// Wait for a response that looks like the following:
//
Expand Down Expand Up @@ -358,7 +370,7 @@ var waitForShowTransferState = func(
}

// 2. Read DataRow.
if err := expectDataRow(ctx, serverConn, func(msg *pgproto3.DataRow) bool {
if err := expectDataRow(ctx, serverConn, func(msg *pgproto3.DataRow, size int) bool {
// This has to be 4 since we validated RowDescription earlier.
if len(msg.Values) != 4 {
return false
Expand All @@ -376,6 +388,11 @@ var waitForShowTransferState = func(
// referenced in msg will no longer be valid once we read the next pgwire
// message.
transferErr, state, revivalToken = string(msg.Values[0]), string(msg.Values[1]), string(msg.Values[2])

// Since the DataRow is valid, record response message size.
if metrics != nil {
metrics.ConnMigrationTransferResponseMessageSize.RecordValue(int64(size))
}
return true
}); err != nil {
return "", "", "", errors.Wrap(err, "expecting DataRow")
Expand Down Expand Up @@ -443,7 +460,7 @@ var runAndWaitForDeserializeSession = func(
}

// 2. Read DataRow.
if err := expectDataRow(ctx, serverConn, func(msg *pgproto3.DataRow) bool {
if err := expectDataRow(ctx, serverConn, func(msg *pgproto3.DataRow, _ int) bool {
return len(msg.Values) == 1 && string(msg.Values[0]) == "t"
}); err != nil {
return errors.Wrap(err, "expecting DataRow")
Expand Down Expand Up @@ -560,11 +577,15 @@ func waitForSmallRowDescription(
func expectDataRow(
ctx context.Context,
serverConn *interceptor.FrontendConn,
validateFn func(*pgproto3.DataRow) bool,
validateFn func(*pgproto3.DataRow, int) bool,
) error {
if ctx.Err() != nil {
return ctx.Err()
}
_, size, err := serverConn.PeekMsg()
if err != nil {
return errors.Wrap(err, "peeking message")
}
msg, err := serverConn.ReadMsg()
if err != nil {
return errors.Wrap(err, "reading message")
Expand All @@ -573,7 +594,7 @@ func expectDataRow(
if !ok {
return errors.Newf("unexpected message: %v", jsonOrRaw(msg))
}
if !validateFn(pgMsg) {
if !validateFn(pgMsg, size) {
return errors.Newf("validation failed for message: %v", jsonOrRaw(msg))
}
return nil
Expand Down
29 changes: 22 additions & 7 deletions pkg/ccl/sqlproxyccl/conn_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestTransferConnection(t *testing.T) {
ctx, cancel := newTransferContext(context.Background())
cancel()

conn, err := transferConnection(ctx, nil, nil, nil)
conn, err := transferConnection(ctx, nil, nil, nil, nil)
require.EqualError(t, err, context.Canceled.Error())
require.Nil(t, conn)
require.True(t, ctx.isRecoverable())
Expand All @@ -131,6 +131,7 @@ func TestTransferConnection(t *testing.T) {
conn, err := transferConnection(
ctx,
nil,
nil,
interceptor.NewPGConn(p1),
interceptor.NewPGConn(p2),
)
Expand All @@ -157,6 +158,7 @@ func TestTransferConnection(t *testing.T) {
serverConn *interceptor.FrontendConn,
clientConn io.Writer,
transferKey string,
_ *metrics,
) (string, string, string, error) {
require.Equal(t, ctx, tCtx)
require.NotNil(t, serverConn)
Expand All @@ -169,6 +171,7 @@ func TestTransferConnection(t *testing.T) {
conn, err := transferConnection(
ctx,
nil,
nil,
interceptor.NewPGConn(p1),
interceptor.NewPGConn(p2),
)
Expand All @@ -195,6 +198,7 @@ func TestTransferConnection(t *testing.T) {
serverConn *interceptor.FrontendConn,
clientConn io.Writer,
transferKey string,
_ *metrics,
) (string, string, string, error) {
require.Equal(t, ctx, tCtx)
require.NotNil(t, serverConn)
Expand All @@ -207,6 +211,7 @@ func TestTransferConnection(t *testing.T) {
conn, err := transferConnection(
ctx,
nil,
nil,
interceptor.NewPGConn(p1),
interceptor.NewPGConn(p2),
)
Expand All @@ -233,6 +238,7 @@ func TestTransferConnection(t *testing.T) {
serverConn *interceptor.FrontendConn,
clientConn io.Writer,
transferKey string,
_ *metrics,
) (string, string, string, error) {
require.Equal(t, ctx, tCtx)
require.NotNil(t, serverConn)
Expand All @@ -256,6 +262,7 @@ func TestTransferConnection(t *testing.T) {
conn, err := transferConnection(
ctx,
&connector{},
nil,
interceptor.NewPGConn(p1),
interceptor.NewPGConn(p2),
)
Expand All @@ -282,6 +289,7 @@ func TestTransferConnection(t *testing.T) {
serverConn *interceptor.FrontendConn,
clientConn io.Writer,
transferKey string,
_ *metrics,
) (string, string, string, error) {
require.Equal(t, ctx, tCtx)
require.NotNil(t, serverConn)
Expand Down Expand Up @@ -321,6 +329,7 @@ func TestTransferConnection(t *testing.T) {
conn, err := transferConnection(
ctx,
&connector{},
nil,
interceptor.NewPGConn(p1),
interceptor.NewPGConn(p2),
)
Expand Down Expand Up @@ -351,6 +360,7 @@ func TestTransferConnection(t *testing.T) {
serverConn *interceptor.FrontendConn,
clientConn io.Writer,
transferKey string,
_ *metrics,
) (string, string, string, error) {
require.Equal(t, ctx, tCtx)
require.NotNil(t, serverConn)
Expand Down Expand Up @@ -390,6 +400,7 @@ func TestTransferConnection(t *testing.T) {
conn, err := transferConnection(
ctx,
&connector{},
nil,
interceptor.NewPGConn(p1),
interceptor.NewPGConn(p2),
)
Expand Down Expand Up @@ -476,7 +487,7 @@ func TestWaitForShowTransferState(t *testing.T) {
tCtx, cancel := context.WithCancel(ctx)
cancel()

transferErr, state, token, err := waitForShowTransferState(tCtx, nil, nil, "")
transferErr, state, token, err := waitForShowTransferState(tCtx, nil, nil, "", nil)
require.True(t, errors.Is(err, context.Canceled))
require.Equal(t, "", transferErr)
require.Equal(t, "", state)
Expand Down Expand Up @@ -753,6 +764,7 @@ func TestWaitForShowTransferState(t *testing.T) {
interceptor.NewFrontendConn(serverProxy),
clientProxy,
"foo-transfer-key",
nil,
)
if tc.err == "" {
require.NoError(t, err)
Expand Down Expand Up @@ -1069,7 +1081,7 @@ func TestExpectDataRow(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

falseValidateFn := func(m *pgproto3.DataRow) bool { return false }
falseValidateFn := func(m *pgproto3.DataRow, s int) bool { return false }

t.Run("context_cancelled", func(t *testing.T) {
tCtx, cancel := context.WithCancel(ctx)
Expand All @@ -1085,7 +1097,7 @@ func TestExpectDataRow(t *testing.T) {
w.Close()

err := expectDataRow(ctx, interceptor.NewFrontendConn(r), falseValidateFn)
require.Regexp(t, "reading message", err)
require.Regexp(t, "peeking message", err)
})

t.Run("type_mismatch", func(t *testing.T) {
Expand Down Expand Up @@ -1119,15 +1131,18 @@ func TestExpectDataRow(t *testing.T) {
defer r.Close()
defer w.Close()

msg := &pgproto3.DataRow{Values: [][]byte{[]byte("foo")}}
go func() {
writeServerMsg(w, &pgproto3.DataRow{Values: [][]byte{[]byte("foo")}})
writeServerMsg(w, msg)
}()

err := expectDataRow(
ctx,
interceptor.NewFrontendConn(r),
func(m *pgproto3.DataRow) bool {
return len(m.Values) == 1 && string(m.Values[0]) == "foo"
func(m *pgproto3.DataRow, size int) bool {
return len(m.Values) == 1 &&
string(m.Values[0]) == "foo" &&
len(msg.Encode(nil)) == size
},
)
require.Nil(t, err)
Expand Down
15 changes: 10 additions & 5 deletions pkg/ccl/sqlproxyccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,13 +318,18 @@ func (c *connector) lookupAddr(ctx context.Context) (string, error) {
// First try to lookup tenant in the directory (if available).
if c.Directory != nil {
addr, err := c.Directory.EnsureTenantAddr(ctx, c.TenantID, c.ClusterName)
if err != nil {
if status.Code(err) != codes.NotFound {
return "", markAsRetriableConnectorError(err)
switch {
case err == nil:
return addr, nil
case status.Code(err) == codes.FailedPrecondition:
if st, ok := status.FromError(err); ok {
return "", newErrorf(codeUnavailable, "%v", st.Message())
}
return "", newErrorf(codeUnavailable, "unavailable")
case status.Code(err) != codes.NotFound:
return "", markAsRetriableConnectorError(err)
default:
// Fallback to old resolution rule.
} else {
return addr, nil
}
}

Expand Down
23 changes: 23 additions & 0 deletions pkg/ccl/sqlproxyccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,29 @@ func TestConnector_lookupAddr(t *testing.T) {
require.Equal(t, "", addr)
require.Equal(t, 1, ensureTenantAddrFnCount)
})

t.Run("directory with FailedPrecondition error", func(t *testing.T) {
var ensureTenantAddrFnCount int
c := &connector{
ClusterName: "my-foo",
TenantID: roachpb.MakeTenantID(10),
RoutingRule: "foo.bar",
}
c.Directory = &testTenantResolver{
ensureTenantAddrFn: func(fnCtx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error) {
ensureTenantAddrFnCount++
require.Equal(t, ctx, fnCtx)
require.Equal(t, c.TenantID, tenantID)
require.Equal(t, c.ClusterName, clusterName)
return "", status.Errorf(codes.FailedPrecondition, "foo")
},
}

addr, err := c.lookupAddr(ctx)
require.EqualError(t, err, "codeUnavailable: foo")
require.Equal(t, "", addr)
require.Equal(t, 1, ensureTenantAddrFnCount)
})
}

func TestConnector_dialSQLServer(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/sqlproxyccl/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ const (
// codeIdleDisconnect indicates that the connection was disconnected for
// being idle for longer than the specified timeout.
codeIdleDisconnect

// codeUnavailable indicates that the backend SQL server exists but is not
// accepting connections. For example, a tenant cluster that has maxPods set to 0.
codeUnavailable
)

// codeError is combines an error with one of the above codes to ease
Expand Down
Loading

0 comments on commit 51cbdce

Please sign in to comment.