Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ccl/sqlproxyccl: add connection migration-related metrics #77700

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/ccl/sqlproxyccl/denylist",
"//pkg/ccl/sqlproxyccl/idle",
"//pkg/ccl/sqlproxyccl/interceptor",
Expand Down
41 changes: 31 additions & 10 deletions pkg/ccl/sqlproxyccl/conn_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
Expand Down Expand Up @@ -133,18 +134,22 @@ func (f *forwarder) runTransfer() (retErr error) {
// the context (with the span) gets cleaned up. Some ideas to fix this:
// (1) errgroup (?), (2) use the stopper instead of the go keyword - that
// should fork a new span, and avoid this issue.
tBegin := timeutil.Now()
logCtx := logtags.WithTags(context.Background(), logtags.FromContext(f.ctx))
defer func() {
latencyDur := timeutil.Since(tBegin)
f.metrics.ConnMigrationAttemptedLatency.RecordValue(latencyDur.Nanoseconds())

if !ctx.isRecoverable() {
log.Infof(logCtx, "transfer failed: connection closed, err=%v", retErr)
log.Infof(logCtx, "transfer failed: connection closed, latency=%v, err=%v", latencyDur, retErr)
f.metrics.ConnMigrationErrorFatalCount.Inc(1)
} else {
// Transfer was successful.
if retErr == nil {
log.Infof(logCtx, "transfer successful")
log.Infof(logCtx, "transfer successful, latency=%v", latencyDur)
f.metrics.ConnMigrationSuccessCount.Inc(1)
} else {
log.Infof(logCtx, "transfer failed: connection recovered, err=%v", retErr)
log.Infof(logCtx, "transfer failed: connection recovered, latency=%v, err=%v", latencyDur, retErr)
f.metrics.ConnMigrationErrorRecoverableCount.Inc(1)
}
if err := f.resumeProcessors(); err != nil {
Expand All @@ -165,7 +170,7 @@ func (f *forwarder) runTransfer() (retErr error) {

// Transfer the connection.
clientConn, serverConn := f.getConns()
newServerConn, err := transferConnection(ctx, f.connector, clientConn, serverConn)
newServerConn, err := transferConnection(ctx, f.connector, f.metrics, clientConn, serverConn)
if err != nil {
return errors.Wrap(err, "transferring connection")
}
Expand All @@ -179,7 +184,10 @@ func (f *forwarder) runTransfer() (retErr error) {
// connection, and returns the a new connection to the server that the
// connection got transferred to.
func transferConnection(
ctx *transferContext, connector *connector, clientConn, serverConn *interceptor.PGConn,
ctx *transferContext,
connector *connector,
metrics *metrics,
clientConn, serverConn *interceptor.PGConn,
) (_ *interceptor.PGConn, retErr error) {
ctx.markRecoverable(true)

Expand All @@ -198,7 +206,7 @@ func transferConnection(
}

transferErr, state, revivalToken, err := waitForShowTransferState(
ctx, serverConn.ToFrontendConn(), clientConn, transferKey)
ctx, serverConn.ToFrontendConn(), clientConn, transferKey, metrics)
if err != nil {
return nil, errors.Wrap(err, "waiting for transfer state")
}
Expand Down Expand Up @@ -306,6 +314,9 @@ var runShowTransferState = func(w io.Writer, transferKey string) error {
// Since ReadyForQuery may be for a previous pipelined query, this handles the
// forwarding of messages back to the client in case we don't see our state yet.
//
// metrics is optional, and if not nil, it will be used to record the transfer
// response message size in ConnMigrationTransferResponseMessageSize.
//
// WARNING: When using this, we assume that no other goroutines are using both
// serverConn and clientConn. In the context of a transfer, the response
// processor must be blocked to avoid concurrent reads from serverConn.
Expand All @@ -314,6 +325,7 @@ var waitForShowTransferState = func(
serverConn *interceptor.FrontendConn,
clientConn io.Writer,
transferKey string,
metrics *metrics,
) (transferErr string, state string, revivalToken string, retErr error) {
// Wait for a response that looks like the following:
//
Expand Down Expand Up @@ -358,7 +370,7 @@ var waitForShowTransferState = func(
}

// 2. Read DataRow.
if err := expectDataRow(ctx, serverConn, func(msg *pgproto3.DataRow) bool {
if err := expectDataRow(ctx, serverConn, func(msg *pgproto3.DataRow, size int) bool {
// This has to be 4 since we validated RowDescription earlier.
if len(msg.Values) != 4 {
return false
Expand All @@ -376,6 +388,11 @@ var waitForShowTransferState = func(
// referenced in msg will no longer be valid once we read the next pgwire
// message.
transferErr, state, revivalToken = string(msg.Values[0]), string(msg.Values[1]), string(msg.Values[2])

// Since the DataRow is valid, record response message size.
if metrics != nil {
metrics.ConnMigrationTransferResponseMessageSize.RecordValue(int64(size))
}
return true
}); err != nil {
return "", "", "", errors.Wrap(err, "expecting DataRow")
Expand Down Expand Up @@ -443,7 +460,7 @@ var runAndWaitForDeserializeSession = func(
}

// 2. Read DataRow.
if err := expectDataRow(ctx, serverConn, func(msg *pgproto3.DataRow) bool {
if err := expectDataRow(ctx, serverConn, func(msg *pgproto3.DataRow, _ int) bool {
return len(msg.Values) == 1 && string(msg.Values[0]) == "t"
}); err != nil {
return errors.Wrap(err, "expecting DataRow")
Expand Down Expand Up @@ -560,11 +577,15 @@ func waitForSmallRowDescription(
func expectDataRow(
ctx context.Context,
serverConn *interceptor.FrontendConn,
validateFn func(*pgproto3.DataRow) bool,
validateFn func(*pgproto3.DataRow, int) bool,
) error {
if ctx.Err() != nil {
return ctx.Err()
}
_, size, err := serverConn.PeekMsg()
if err != nil {
return errors.Wrap(err, "peeking message")
}
msg, err := serverConn.ReadMsg()
if err != nil {
return errors.Wrap(err, "reading message")
Expand All @@ -573,7 +594,7 @@ func expectDataRow(
if !ok {
return errors.Newf("unexpected message: %v", jsonOrRaw(msg))
}
if !validateFn(pgMsg) {
if !validateFn(pgMsg, size) {
return errors.Newf("validation failed for message: %v", jsonOrRaw(msg))
}
return nil
Expand Down
29 changes: 22 additions & 7 deletions pkg/ccl/sqlproxyccl/conn_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestTransferConnection(t *testing.T) {
ctx, cancel := newTransferContext(context.Background())
cancel()

conn, err := transferConnection(ctx, nil, nil, nil)
conn, err := transferConnection(ctx, nil, nil, nil, nil)
require.EqualError(t, err, context.Canceled.Error())
require.Nil(t, conn)
require.True(t, ctx.isRecoverable())
Expand All @@ -131,6 +131,7 @@ func TestTransferConnection(t *testing.T) {
conn, err := transferConnection(
ctx,
nil,
nil,
interceptor.NewPGConn(p1),
interceptor.NewPGConn(p2),
)
Expand All @@ -157,6 +158,7 @@ func TestTransferConnection(t *testing.T) {
serverConn *interceptor.FrontendConn,
clientConn io.Writer,
transferKey string,
_ *metrics,
) (string, string, string, error) {
require.Equal(t, ctx, tCtx)
require.NotNil(t, serverConn)
Expand All @@ -169,6 +171,7 @@ func TestTransferConnection(t *testing.T) {
conn, err := transferConnection(
ctx,
nil,
nil,
interceptor.NewPGConn(p1),
interceptor.NewPGConn(p2),
)
Expand All @@ -195,6 +198,7 @@ func TestTransferConnection(t *testing.T) {
serverConn *interceptor.FrontendConn,
clientConn io.Writer,
transferKey string,
_ *metrics,
) (string, string, string, error) {
require.Equal(t, ctx, tCtx)
require.NotNil(t, serverConn)
Expand All @@ -207,6 +211,7 @@ func TestTransferConnection(t *testing.T) {
conn, err := transferConnection(
ctx,
nil,
nil,
interceptor.NewPGConn(p1),
interceptor.NewPGConn(p2),
)
Expand All @@ -233,6 +238,7 @@ func TestTransferConnection(t *testing.T) {
serverConn *interceptor.FrontendConn,
clientConn io.Writer,
transferKey string,
_ *metrics,
) (string, string, string, error) {
require.Equal(t, ctx, tCtx)
require.NotNil(t, serverConn)
Expand All @@ -256,6 +262,7 @@ func TestTransferConnection(t *testing.T) {
conn, err := transferConnection(
ctx,
&connector{},
nil,
interceptor.NewPGConn(p1),
interceptor.NewPGConn(p2),
)
Expand All @@ -282,6 +289,7 @@ func TestTransferConnection(t *testing.T) {
serverConn *interceptor.FrontendConn,
clientConn io.Writer,
transferKey string,
_ *metrics,
) (string, string, string, error) {
require.Equal(t, ctx, tCtx)
require.NotNil(t, serverConn)
Expand Down Expand Up @@ -321,6 +329,7 @@ func TestTransferConnection(t *testing.T) {
conn, err := transferConnection(
ctx,
&connector{},
nil,
interceptor.NewPGConn(p1),
interceptor.NewPGConn(p2),
)
Expand Down Expand Up @@ -351,6 +360,7 @@ func TestTransferConnection(t *testing.T) {
serverConn *interceptor.FrontendConn,
clientConn io.Writer,
transferKey string,
_ *metrics,
) (string, string, string, error) {
require.Equal(t, ctx, tCtx)
require.NotNil(t, serverConn)
Expand Down Expand Up @@ -390,6 +400,7 @@ func TestTransferConnection(t *testing.T) {
conn, err := transferConnection(
ctx,
&connector{},
nil,
interceptor.NewPGConn(p1),
interceptor.NewPGConn(p2),
)
Expand Down Expand Up @@ -476,7 +487,7 @@ func TestWaitForShowTransferState(t *testing.T) {
tCtx, cancel := context.WithCancel(ctx)
cancel()

transferErr, state, token, err := waitForShowTransferState(tCtx, nil, nil, "")
transferErr, state, token, err := waitForShowTransferState(tCtx, nil, nil, "", nil)
require.True(t, errors.Is(err, context.Canceled))
require.Equal(t, "", transferErr)
require.Equal(t, "", state)
Expand Down Expand Up @@ -753,6 +764,7 @@ func TestWaitForShowTransferState(t *testing.T) {
interceptor.NewFrontendConn(serverProxy),
clientProxy,
"foo-transfer-key",
nil,
)
if tc.err == "" {
require.NoError(t, err)
Expand Down Expand Up @@ -1069,7 +1081,7 @@ func TestExpectDataRow(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

falseValidateFn := func(m *pgproto3.DataRow) bool { return false }
falseValidateFn := func(m *pgproto3.DataRow, s int) bool { return false }

t.Run("context_cancelled", func(t *testing.T) {
tCtx, cancel := context.WithCancel(ctx)
Expand All @@ -1085,7 +1097,7 @@ func TestExpectDataRow(t *testing.T) {
w.Close()

err := expectDataRow(ctx, interceptor.NewFrontendConn(r), falseValidateFn)
require.Regexp(t, "reading message", err)
require.Regexp(t, "peeking message", err)
})

t.Run("type_mismatch", func(t *testing.T) {
Expand Down Expand Up @@ -1119,15 +1131,18 @@ func TestExpectDataRow(t *testing.T) {
defer r.Close()
defer w.Close()

msg := &pgproto3.DataRow{Values: [][]byte{[]byte("foo")}}
go func() {
writeServerMsg(w, &pgproto3.DataRow{Values: [][]byte{[]byte("foo")}})
writeServerMsg(w, msg)
}()

err := expectDataRow(
ctx,
interceptor.NewFrontendConn(r),
func(m *pgproto3.DataRow) bool {
return len(m.Values) == 1 && string(m.Values[0]) == "foo"
func(m *pgproto3.DataRow, size int) bool {
return len(m.Values) == 1 &&
string(m.Values[0]) == "foo" &&
len(msg.Encode(nil)) == size
},
)
require.Nil(t, err)
Expand Down
Loading