Skip to content

Commit

Permalink
Merge #76805
Browse files Browse the repository at this point in the history
76805: ccl/sqlproxyccl: complete connection migration support in the forwarder r=JeffSwenson a=jaylim-crl

#### ccl/sqlproxyccl: add last message details to the forwarder's processor

This commit adds last message details to the forwarder's processor, and in
particular, lastMessageType and lastMessageTransferredAt. The latter is
implemented using a simple logical clock, which will be used to determine
ordering of events. These two will be used during connection migration to
determine a safe transfer point.

At the same time, we plumb the connector and metrics into the processor, which
will be used in subsequent commits.

Release justification: Low risk, sqlproxy-only change.

Release note: None

#### ccl/sqlproxyccl: support waitResumed on the processor to block until resumption

Previously, there could be a race where suspend() was called right after
resuming the processors. If the processor goroutines have not started, suspend
will implicitly return, leading to a violation of an invariant, where we want
the processors to be suspended before proceeding. This commit adds a new
waitResumed method on the processor that allows callers to block until the
processors have been resumed.

Release justification: sqlproxy-only change.

Release note: None

#### ccl/sqlproxyccl: complete connection migration support in the forwarder

Informs #76000.

This commit completes the connection migration feature in the the forwarder
within sqlproxy. The idea is as described in the RFC.

A couple of new sqlproxy metrics have been added as well:
- proxy.conn_migration.success
- proxy.conn_migration.error_fatal
- proxy.conn_migration.error_recoverable
- proxy.conn_migration.attempted

For more details, see metrics.go in the sqlproxyccl package.

Release justification: This completes the first half of the connection
migration feature. This is low risk as part of the code is guarded behind the
connection migration feature, which is currently not being used in production.
To add on, CockroachCloud is the only user of sqlproxy.

Release note: None


Co-authored-by: Jay <jay@cockroachlabs.com>
  • Loading branch information
craig[bot] and jaylim-crl committed Mar 13, 2022
2 parents 4531173 + 8610953 commit dc5432d
Show file tree
Hide file tree
Showing 13 changed files with 1,583 additions and 120 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_library(
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_jackc_pgproto3_v2//:pgproto3",
Expand Down Expand Up @@ -78,6 +79,8 @@ go_test(
"//pkg/sql",
"//pkg/sql/pgwire",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/pgwire/pgwirebase",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
Expand All @@ -90,6 +93,7 @@ go_test(
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_jackc_pgconn//:pgconn",
"@com_github_jackc_pgproto3_v2//:pgproto3",
Expand Down
270 changes: 269 additions & 1 deletion pkg/ccl/sqlproxyccl/conn_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,281 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/interceptor"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
pgproto3 "github.com/jackc/pgproto3/v2"
)

// defaultTransferTimeout corresponds to the timeout period for the connection
// migration process. If the timeout gets triggered, and we're in a non
// recoverable state, the connection will be closed.
//
// This is a variable instead of a constant to support testing hooks.
var defaultTransferTimeout = 15 * time.Second

// Used in testing.
var transferConnectionConnectorTestHook func(context.Context, string) (net.Conn, error) = nil

type transferContext struct {
context.Context
mu struct {
syncutil.Mutex
recoverableConn bool
}
}

func newTransferContext(backgroundCtx context.Context) (*transferContext, context.CancelFunc) {
transferCtx, cancel := context.WithTimeout(backgroundCtx, defaultTransferTimeout) // nolint:context
ctx := &transferContext{
Context: transferCtx,
}
ctx.mu.recoverableConn = true
return ctx, cancel
}

func (t *transferContext) markRecoverable(r bool) {
t.mu.Lock()
defer t.mu.Unlock()
t.mu.recoverableConn = r
}

func (t *transferContext) isRecoverable() bool {
t.mu.Lock()
defer t.mu.Unlock()
return t.mu.recoverableConn
}

// tryBeginTransfer returns true if the transfer can be started, and false
// otherwise. If the transfer can be started, it updates the state of the
// forwarder to indicate that a transfer is in progress, and a cleanup function
// will be returned.
func (f *forwarder) tryBeginTransfer() (started bool, cleanupFn func()) {
f.mu.Lock()
defer f.mu.Unlock()

// Transfer is already in progress. No concurrent transfers are allowed.
if f.mu.isTransferring {
return false, nil
}

if !isSafeTransferPoint(f.mu.request, f.mu.response) {
return false, nil
}

f.mu.isTransferring = true

return true, func() {
f.mu.Lock()
defer f.mu.Unlock()
f.mu.isTransferring = false
}
}

var errTransferCannotStart = errors.New("transfer cannot be started")

func (f *forwarder) runTransfer() (retErr error) {
// A previous non-recoverable transfer would have closed the forwarder, so
// return right away.
if f.ctx.Err() != nil {
return f.ctx.Err()
}

started, cleanupFn := f.tryBeginTransfer()
if !started {
return errTransferCannotStart
}
defer cleanupFn()

f.metrics.ConnMigrationAttemptedCount.Inc(1)

// Create a transfer context, and timeout handler which gets triggered
// whenever the context expires. We have to close the forwarder because
// the transfer may be blocked on I/O, and the only way for now is to close
// the connections. This then allow runTransfer to return and cleanup.
ctx, cancel := newTransferContext(f.ctx)
defer cancel()

// Use a separate handler for timeouts. This is the only way to handle
// blocked I/Os as described above.
go func() {
<-ctx.Done()
if !ctx.isRecoverable() {
f.Close()
}
}()

// Use a separate context for logging because f.ctx will be closed whenever
// the connection is non-recoverable.
//
// TODO(jaylim-crl): There's a possible "use of Span after Finish" issue
// where proxy_handler.handle returns before this function returns because
// we're calling f.Close() in the timeout goroutine. When handle returns,
// the context (with the span) gets cleaned up. Some ideas to fix this:
// (1) errgroup (?), (2) use the stopper instead of the go keyword - that
// should fork a new span, and avoid this issue.
logCtx := logtags.WithTags(context.Background(), logtags.FromContext(f.ctx))
defer func() {
if !ctx.isRecoverable() {
log.Infof(logCtx, "transfer failed: connection closed, err=%v", retErr)
f.metrics.ConnMigrationErrorFatalCount.Inc(1)
} else {
// Transfer was successful.
if retErr == nil {
log.Infof(logCtx, "transfer successful")
f.metrics.ConnMigrationSuccessCount.Inc(1)
} else {
log.Infof(logCtx, "transfer failed: connection recovered, err=%v", retErr)
f.metrics.ConnMigrationErrorRecoverableCount.Inc(1)
}
if err := f.resumeProcessors(); err != nil {
log.Infof(logCtx, "unable to resume processors: %v", err)
f.Close()
}
}
}()

// Suspend both processors before starting the transfer.
request, response := f.getProcessors()
if err := request.suspend(ctx); err != nil {
return errors.Wrap(err, "suspending request processor")
}
if err := response.suspend(ctx); err != nil {
return errors.Wrap(err, "suspending response processor")
}

// Transfer the connection.
clientConn, serverConn := f.getConns()
newServerConn, err := transferConnection(ctx, f.connector, clientConn, serverConn)
if err != nil {
return errors.Wrap(err, "transferring connection")
}

// Transfer was successful.
f.replaceServerConn(newServerConn)
return nil
}

// transferConnection performs the transfer operation for the current server
// connection, and returns the a new connection to the server that the
// connection got transferred to.
func transferConnection(
ctx *transferContext, connector *connector, clientConn, serverConn *interceptor.PGConn,
) (_ *interceptor.PGConn, retErr error) {
ctx.markRecoverable(true)

// Context was cancelled.
if ctx.Err() != nil {
return nil, ctx.Err()
}

transferKey := uuid.MakeV4().String()

// Send the SHOW TRANSFER STATE statement. At this point, connection is
// non-recoverable because the message has already been sent to the server.
ctx.markRecoverable(false)
if err := runShowTransferState(serverConn, transferKey); err != nil {
return nil, errors.Wrap(err, "sending transfer request")
}

transferErr, state, revivalToken, err := waitForShowTransferState(
ctx, serverConn.ToFrontendConn(), clientConn, transferKey)
if err != nil {
return nil, errors.Wrap(err, "waiting for transfer state")
}

// Failures after this point are recoverable, and connections should not be
// terminated.
ctx.markRecoverable(true)

// If we consumed until ReadyForQuery without errors, but the transfer state
// response returns an error, we could still resume the connection, but the
// transfer process will need to be aborted.
//
// This case may happen pretty frequently (e.g. open transactions, temporary
// tables, etc.).
if transferErr != "" {
return nil, errors.Newf("%s", transferErr)
}

// Connect to a new SQL pod.
//
// TODO(jaylim-crl): There is a possibility where the same pod will get
// selected. Some ideas to solve this: pass in the remote address of
// serverConn to avoid choosing that pod, or maybe a filter callback?
// We can also consider adding a target pod as an argument to RequestTransfer.
// That way a central component gets to choose where the connections go.
connectFn := connector.OpenTenantConnWithToken
if transferConnectionConnectorTestHook != nil {
connectFn = transferConnectionConnectorTestHook
}
netConn, err := connectFn(ctx, revivalToken)
if err != nil {
return nil, errors.Wrap(err, "opening connection")
}
defer func() {
if retErr != nil {
netConn.Close()
}
}()
newServerConn := interceptor.NewPGConn(netConn)

// Deserialize session state within the new SQL pod.
if err := runAndWaitForDeserializeSession(
ctx, newServerConn.ToFrontendConn(), state,
); err != nil {
return nil, errors.Wrap(err, "deserializing session")
}

return newServerConn, nil
}

// isSafeTransferPoint returns true if we're at a point where we're safe to
// transfer, and false otherwise.
var isSafeTransferPoint = func(request *processor, response *processor) bool {
request.mu.Lock()
response.mu.Lock()
defer request.mu.Unlock()
defer response.mu.Unlock()

// Three conditions when evaluating a safe transfer point:
// 1. The last message sent to the SQL pod was a Sync(S) or SimpleQuery(Q),
// and a ReadyForQuery(Z) has been received after.
// 2. The last message sent to the SQL pod was a CopyDone(c), and a
// ReadyForQuery(Z) has been received after.
// 3. The last message sent to the SQL pod was a CopyFail(f), and a
// ReadyForQuery(Z) has been received after.

// The conditions above are not possible if this is true. They cannot be
// equal since the same logical clock is used (except during initialization).
if request.mu.lastMessageTransferredAt > response.mu.lastMessageTransferredAt {
return false
}

// We need to check zero values here to handle the initialization case
// since we would still want to be able to transfer connections which have
// not made any queries to the server.
switch pgwirebase.ClientMessageType(request.mu.lastMessageType) {
case pgwirebase.ClientMessageType(0),
pgwirebase.ClientMsgSync,
pgwirebase.ClientMsgSimpleQuery,
pgwirebase.ClientMsgCopyDone,
pgwirebase.ClientMsgCopyFail:

serverMsg := pgwirebase.ServerMessageType(response.mu.lastMessageType)
return serverMsg == pgwirebase.ServerMsgReady || serverMsg == pgwirebase.ServerMessageType(0)
default:
return false
}
}

// runShowTransferState sends a SHOW TRANSFER STATE query with the input
// transferKey to the given writer. The transferKey will be used to uniquely
// identify the request when parsing the response messages in
Expand All @@ -28,7 +296,7 @@ import (
// Unlike runAndWaitForDeserializeSession, we split the SHOW TRANSFER STATE
// operation into `run` and `wait` since doing so allows us to send the query
// ahead of time.
func runShowTransferState(w io.Writer, transferKey string) error {
var runShowTransferState = func(w io.Writer, transferKey string) error {
return writeQuery(w, "SHOW TRANSFER STATE WITH '%s'", transferKey)
}

Expand Down
Loading

0 comments on commit dc5432d

Please sign in to comment.