-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
connector.go
415 lines (375 loc) · 15 KB
/
connector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt
package sqlproxyccl
import (
"context"
"crypto/tls"
"fmt"
"net"
"strings"
"time"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/netutil/addr"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/errors"
pgproto3 "github.com/jackc/pgproto3/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// sessionRevivalTokenStartupParam indicates the name of the parameter that
// will activate token-based authentication if present in the startup message.
const sessionRevivalTokenStartupParam = "crdb:session_revival_token_base64"
// remoteAddrStartupParam contains the remote address of the original client.
const remoteAddrStartupParam = "crdb:remote_addr"
// TenantResolver is an interface for the tenant directory. Currently only
// tenant.Directory implements it.
//
// TODO(jaylim-crl): Rename this to Directory, and the current tenant.Directory
// to tenant.directory. This needs to be moved into the tenant package as well.
// This is added here to aid testing.
type TenantResolver interface {
// EnsureTenantAddr returns an IP address of one of the given tenant's SQL
// processes based on the tenantID and clusterName fields. This should block
// until the process associated with the IP is ready.
//
// If no matching pods are found (e.g. cluster name mismatch, or tenant was
// deleted), this will return a GRPC NotFound error.
EnsureTenantAddr(
ctx context.Context,
tenantID roachpb.TenantID,
clusterName string,
) (string, error)
// LookupTenantAddrs returns the IP addresses for all available SQL
// processes for the given tenant. It returns a GRPC NotFound error if the
// tenant does not exist.
//
// Unlike EnsureTenantAddr which blocks until there is an associated
// process, LookupTenantAddrs will just return an empty set if no processes
// are available for the tenant.
LookupTenantAddrs(ctx context.Context, tenantID roachpb.TenantID) ([]string, error)
// ReportFailure is used to indicate to the resolver that a connection
// attempt to connect to a particular SQL tenant pod have failed.
ReportFailure(ctx context.Context, tenantID roachpb.TenantID, addr string) error
}
// connector is a per-session tenant-associated component that can be used to
// obtain a connection to the tenant cluster. This will also handle the
// authentication phase. All connections returned by the connector should
// already be ready to accept regular pgwire messages (e.g. SQL queries).
type connector struct {
// ClusterName and TenantID corresponds to the tenant identifiers associated
// with this connector.
//
// NOTE: These fields are required.
ClusterName string
TenantID roachpb.TenantID
// Directory corresponds to the tenant directory, which will be used to
// resolve tenants to their corresponding IP addresses. If this isn't set,
// we will fallback to use RoutingRule.
//
// TODO(jaylim-crl): Replace this with a Directory interface, and remove
// the RoutingRule field. RoutingRule should not be in here.
//
// NOTE: This field is optional.
Directory TenantResolver
// RoutingRule refers to the static rule that will be used when resolving
// tenants. This will be used directly whenever the Directory field isn't
// specified, or as a fallback if one was specified.
//
// The literal "{{clusterName}}" will be replaced with ClusterName within
// the RoutingRule string.
//
// NOTE: This field is optional, if Directory isn't set.
RoutingRule string
// StartupMsg represents the startup message associated with the client.
// This will be used when establishing a pgwire connection with the SQL pod.
//
// NOTE: This field is required.
StartupMsg *pgproto3.StartupMessage
// TLSConfig represents the client TLS config used by the connector when
// connecting with the SQL pod. If the ServerName field is set, this will
// be overridden during connection establishment. Set to nil if we are
// connecting to an insecure cluster.
//
// NOTE: This field is optional.
TLSConfig *tls.Config
// IdleMonitorWrapperFn is used to wrap the connection to the SQL pod with
// an idle monitor. If not specified, the raw connection to the SQL pod
// will be returned.
//
// In the case of connecting with an authentication phase, the connection
// will be wrapped before starting the authentication.
//
// NOTE: This field is optional.
IdleMonitorWrapperFn func(serverConn net.Conn) net.Conn
// Testing knobs for internal connector calls. If specified, these will
// be called instead of the actual logic.
testingKnobs struct {
dialTenantCluster func(ctx context.Context) (net.Conn, error)
lookupAddr func(ctx context.Context) (string, error)
dialSQLServer func(serverAddr string) (net.Conn, error)
}
}
// OpenTenantConnWithToken opens a connection to the tenant cluster using the
// token-based authentication during connection migration.
func (c *connector) OpenTenantConnWithToken(
ctx context.Context, token string,
) (retServerConn net.Conn, retErr error) {
c.StartupMsg.Parameters[sessionRevivalTokenStartupParam] = token
defer func() {
// Delete token after return.
delete(c.StartupMsg.Parameters, sessionRevivalTokenStartupParam)
}()
serverConn, err := c.dialTenantCluster(ctx)
if err != nil {
return nil, err
}
defer func() {
if retErr != nil {
serverConn.Close()
}
}()
if c.IdleMonitorWrapperFn != nil {
serverConn = c.IdleMonitorWrapperFn(serverConn)
}
// When we use token-based authentication, we will still get the initial
// connection data messages (e.g. ParameterStatus and BackendKeyData).
// Since this method is only used during connection migration (i.e. proxy
// is connecting to the SQL pod), we'll discard all of the messages, and
// only return once we've seen a ReadyForQuery message.
//
// NOTE: This will need to be updated when we implement query cancellation.
if err := readTokenAuthResult(serverConn); err != nil {
return nil, err
}
log.Infof(ctx, "connected to %s through token-based auth", serverConn.RemoteAddr())
return serverConn, nil
}
// OpenTenantConnWithAuth opens a connection to the tenant cluster using
// normal authentication methods (e.g. password, etc.). Once a connection to
// one of the tenant's SQL pod has been established, we will transfer
// request/response flow between clientConn and the new connection to the
// authenticator, which implies that this will be blocked until authentication
// succeeds, or when an error is returned.
//
// sentToClient will be set to true if an error occurred during the
// authenticator phase since errors would have already been sent to the client.
func (c *connector) OpenTenantConnWithAuth(
ctx context.Context, clientConn net.Conn, throttleHook func(throttler.AttemptStatus) error,
) (retServerConn net.Conn, sentToClient bool, retErr error) {
// Just a safety check, but this shouldn't happen since we will block the
// startup param in the frontend admitter. The only case where we actually
// need to delete this param is if OpenTenantConnWithToken was called
// previously, but that wouldn't happen based on the current proxy logic.
delete(c.StartupMsg.Parameters, sessionRevivalTokenStartupParam)
serverConn, err := c.dialTenantCluster(ctx)
if err != nil {
return nil, false, err
}
defer func() {
if retErr != nil {
serverConn.Close()
}
}()
if c.IdleMonitorWrapperFn != nil {
serverConn = c.IdleMonitorWrapperFn(serverConn)
}
// Perform user authentication for non-token-based auth methods. This will
// block until the server has authenticated the client.
if err := authenticate(clientConn, serverConn, throttleHook); err != nil {
return nil, true, err
}
log.Infof(ctx, "connected to %s through normal auth", serverConn.RemoteAddr())
return serverConn, false, nil
}
// dialTenantCluster returns a connection to the tenant cluster associated
// with the connector. Once a connection has been established, the pgwire
// startup message will be relayed to the server.
func (c *connector) dialTenantCluster(ctx context.Context) (net.Conn, error) {
if c.testingKnobs.dialTenantCluster != nil {
return c.testingKnobs.dialTenantCluster(ctx)
}
// Repeatedly try to make a connection until context is canceled, or until
// we get a non-retriable error. This is preferable to terminating client
// connections, because in most cases those connections will simply be
// retried, further increasing load on the system.
retryOpts := retry.Options{
InitialBackoff: 10 * time.Millisecond,
MaxBackoff: 5 * time.Second,
}
lookupAddrErr := log.Every(time.Minute)
dialSQLServerErr := log.Every(time.Minute)
reportFailureErr := log.Every(time.Minute)
var lookupAddrErrs, dialSQLServerErrs, reportFailureErrs int
var crdbConn net.Conn
var serverAddr string
var err error
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
// Retrieve a SQL pod address to connect to.
serverAddr, err = c.lookupAddr(ctx)
if err != nil {
if isRetriableConnectorError(err) {
lookupAddrErrs++
if lookupAddrErr.ShouldLog() {
log.Ops.Errorf(ctx, "lookup address (%d errors skipped): %v",
lookupAddrErrs, err)
lookupAddrErrs = 0
}
continue
}
return nil, err
}
// Make a connection to the SQL pod.
crdbConn, err = c.dialSQLServer(serverAddr)
if err != nil {
if isRetriableConnectorError(err) {
dialSQLServerErrs++
if dialSQLServerErr.ShouldLog() {
log.Ops.Errorf(ctx, "dial SQL server (%d errors skipped): %v",
dialSQLServerErrs, err)
dialSQLServerErrs = 0
}
// Report the failure to the directory so that it can refresh
// any stale information that may have caused the problem.
if c.Directory != nil {
if err = reportFailureToDirectory(
ctx, c.TenantID, serverAddr, c.Directory,
); err != nil {
reportFailureErrs++
if reportFailureErr.ShouldLog() {
log.Ops.Errorf(ctx,
"report failure (%d errors skipped): %v",
reportFailureErrs,
err,
)
reportFailureErrs = 0
}
}
}
continue
}
return nil, err
}
return crdbConn, nil
}
// err will never be nil here regardless of whether we retry infinitely or
// a bounded number of times. In our case, since we retry infinitely, the
// only possibility is when ctx's Done channel is closed (which implies that
// ctx.Err() != nil.
//
// If the error is already marked, just return that.
if errors.IsAny(err, context.Canceled, context.DeadlineExceeded) {
return nil, err
}
// Otherwise, mark the error, and return that.
return nil, errors.Mark(err, ctx.Err())
}
// resolveTCPAddr indirection to allow test hooks.
var resolveTCPAddr = net.ResolveTCPAddr
// lookupAddr returns an address (that must include both host and port)
// pointing to one of the SQL pods for the tenant associated with this
// connector.
//
// This will be called within an infinite backoff loop. If an error is
// transient, this will return an error that has been marked with
// errRetryConnectorSentinel (i.e. markAsRetriableConnectorError).
func (c *connector) lookupAddr(ctx context.Context) (string, error) {
if c.testingKnobs.lookupAddr != nil {
return c.testingKnobs.lookupAddr(ctx)
}
// First try to lookup tenant in the directory (if available).
if c.Directory != nil {
addr, err := c.Directory.EnsureTenantAddr(ctx, c.TenantID, c.ClusterName)
if err != nil {
if status.Code(err) == codes.FailedPrecondition {
if st, ok := status.FromError(err); ok {
return "", newErrorf(codeUnavailable, st.Message())
}
} else if status.Code(err) != codes.NotFound {
return "", markAsRetriableConnectorError(err)
}
// Fallback to old resolution rule.
} else {
return addr, nil
}
}
// Derive DNS address and then try to resolve it. If it does not exist, then
// map to a GRPC NotFound error.
//
// TODO(jaylim-crl): This code is temporary. Remove this once we have fully
// replaced this with a Directory interface. This fallback does not need
// to exist.
addr := strings.ReplaceAll(
c.RoutingRule, "{{clusterName}}",
fmt.Sprintf("%s-%d", c.ClusterName, c.TenantID.ToUint64()),
)
if _, err := resolveTCPAddr("tcp", addr); err != nil {
log.Errorf(ctx, "could not retrieve SQL server address: %v", err.Error())
return "", newErrorf(codeParamsRoutingFailed,
"cluster %s-%d not found", c.ClusterName, c.TenantID.ToUint64())
}
return addr, nil
}
// dialSQLServer dials the given address for the SQL pod, and forwards the
// startup message to it. If the connector specifies a TLS connection, it will
// also attempt to upgrade the PG connection to use TLS.
//
// This will be called within an infinite backoff loop. If an error is
// transient, this will return an error that has been marked with
// errRetryConnectorSentinel (i.e. markAsRetriableConnectorError).
func (c *connector) dialSQLServer(serverAddr string) (net.Conn, error) {
if c.testingKnobs.dialSQLServer != nil {
return c.testingKnobs.dialSQLServer(serverAddr)
}
// Use a TLS config if one was provided. If TLSConfig is nil, Clone will
// return nil.
tlsConf := c.TLSConfig.Clone()
if tlsConf != nil {
// serverAddr will always have a port. We use an empty string as the
// default port as we only care about extracting the host.
outgoingHost, _, err := addr.SplitHostPort(serverAddr, "" /* defaultPort */)
if err != nil {
return nil, err
}
// Always set ServerName. If InsecureSkipVerify is true, this will
// be ignored.
tlsConf.ServerName = outgoingHost
}
conn, err := BackendDial(c.StartupMsg, serverAddr, tlsConf)
if err != nil {
var codeErr *codeError
if errors.As(err, &codeErr) && codeErr.code == codeBackendDown {
return nil, markAsRetriableConnectorError(err)
}
return nil, err
}
return conn, nil
}
// errRetryConnectorSentinel exists to allow more robust retection of retry
// errors even if they are wrapped.
var errRetryConnectorSentinel = errors.New("retry connector error")
// markAsRetriableConnectorError marks the given error with
// errRetryConnectorSentinel, which will trigger the connector to retry if such
// error returns.
func markAsRetriableConnectorError(err error) error {
return errors.Mark(err, errRetryConnectorSentinel)
}
// isRetriableConnectorError checks whether a given error is retriable. This
// should be called on errors which are transient so that the connector can
// retry on such errors.
func isRetriableConnectorError(err error) bool {
return errors.Is(err, errRetryConnectorSentinel)
}
// reportFailureToDirectory is a hookable function that calls the given tenant
// directory's ReportFailure method.
var reportFailureToDirectory = func(
ctx context.Context, tenantID roachpb.TenantID, addr string, directory TenantResolver,
) error {
return directory.ReportFailure(ctx, tenantID, addr)
}