Skip to content

Commit

Permalink
server: do not check decommission list for tenants
Browse files Browse the repository at this point in the history
Previously, the system tenant would return PermissionDenied if the
tenant's instance_id was equivalent to a decommissioned node's id.

Now, the system tenant does not check the decommissioned node list if
the incoming node_id belongs to a non-system tenant.

This PR feeds the request context down to the OnOutgoingPing and
OnIncomingPing callbacks. Previously the callbacks were using the
ambient context. The only use of the context was a storage.MVCCGet call
in nodeTombstoneStorage.IsDecommissioned.

Release note: None
  • Loading branch information
jeffswenson committed Feb 2, 2022
1 parent ecab739 commit d3ed364
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 11 deletions.
3 changes: 3 additions & 0 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,4 +309,7 @@ type TestTenantArgs struct {

// TracingDefault controls whether the tracing will be on or off by default.
TracingDefault tracing.TracingMode

// RPCHeartbeatInterval controls how often the tenant sends Ping requests.
RPCHeartbeatInterval time.Duration
}
5 changes: 4 additions & 1 deletion pkg/ccl/serverccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ go_library(

go_test(
name = "serverccl_test",
size = "medium",
size = "large",
srcs = [
"admin_test.go",
"main_test.go",
"role_authentication_test.go",
"server_sql_test.go",
"tenant_decommissioned_host_test.go",
"tenant_vars_test.go",
],
embed = [":serverccl"],
Expand All @@ -24,6 +25,7 @@ go_test(
"//pkg/ccl/kvccl",
"//pkg/ccl/utilccl",
"//pkg/ccl/utilccl/licenseccl",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/security/securitytest",
Expand All @@ -33,6 +35,7 @@ go_test(
"//pkg/sql/distsql",
"//pkg/sql/tests",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
Expand Down
77 changes: 77 additions & 0 deletions pkg/ccl/serverccl/tenant_decommissioned_host_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package serverccl

import (
"context"
gosql "database/sql"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func TestTenantWithDecommissionedID(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// This is a regression test for a multi-tenant bug. Each tenant sql server
// is assigned an InstanceID. The InstanceID corresponds to the id column in
// the system.sql_instances table. The sql process sets rpcContext.NodeID =
// InstanceID and PingRequest.NodeID = rpcContext.NodeID.
//
// When a KV node recieves a ping, it checks the NodeID against a
// decommissioned node tombstone list. Until PR #75766, this caused the KV
// node to reject pings from sql servers. The rejected pings would manifest
// as sql connection timeouts.

skip.UnderStress(t, "decommissioning times out under stress")

ctx := context.Background()
tc := serverutils.StartNewTestCluster(t, 4, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

server := tc.Server(0)
decommissionID := tc.Server(3).NodeID()
require.NoError(t, server.Decommission(ctx, livenesspb.MembershipStatus_DECOMMISSIONING, []roachpb.NodeID{decommissionID}))
require.NoError(t, server.Decommission(ctx, livenesspb.MembershipStatus_DECOMMISSIONED, []roachpb.NodeID{decommissionID}))

tenantID := serverutils.TestTenantID()

var tenantSQLServer serverutils.TestTenantInterface
var tenantDB *gosql.DB
for instanceID := 1; instanceID <= int(decommissionID); instanceID++ {
sqlServer, tenant := serverutils.StartTenant(t, server, base.TestTenantArgs{
TenantID: tenantID,
Existing: instanceID != 1,
// Set a low heartbeat interval. The first heartbeat succeeds
// because the tenant needs to communicate with the kv node to
// determine its instance id.
RPCHeartbeatInterval: time.Millisecond * 5,
})
if sqlServer.RPCContext().NodeID.Get() == decommissionID {
tenantSQLServer = sqlServer
tenantDB = tenant
} else {
tenant.Close()
}
}
require.NotNil(t, tenantSQLServer)
defer tenantDB.Close()

_, err := tenantDB.Exec("CREATE ROLE test_user WITH PASSWORD 'password'")
require.NoError(t, err)
}
8 changes: 4 additions & 4 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,10 @@ type ContextOptions struct {
// preliminary checks but before recording clock offset information.
//
// It can inject an error.
OnIncomingPing func(*PingRequest) error
OnIncomingPing func(context.Context, *PingRequest) error
// OnOutgoingPing intercepts outgoing PingRequests. It may inject an
// error.
OnOutgoingPing func(*PingRequest) error
OnOutgoingPing func(context.Context, *PingRequest) error
Knobs ContextTestingKnobs

// NodeID is the node ID / SQL instance ID container shared
Expand Down Expand Up @@ -1419,7 +1419,7 @@ func (rpcCtx *Context) runHeartbeat(
ServerVersion: rpcCtx.Settings.Version.BinaryVersion(),
}

interceptor := func(*PingRequest) error { return nil }
interceptor := func(context.Context, *PingRequest) error { return nil }
if fn := rpcCtx.OnOutgoingPing; fn != nil {
interceptor = fn
}
Expand All @@ -1429,7 +1429,7 @@ func (rpcCtx *Context) runHeartbeat(
ping := func(ctx context.Context) error {
// NB: We want the request to fail-fast (the default), otherwise we won't
// be notified of transport failures.
if err := interceptor(request); err != nil {
if err := interceptor(ctx, request); err != nil {
returnErr = true
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,13 @@ func TestPingInterceptors(t *testing.T) {
Clock: hlc.NewClock(hlc.UnixNano, 500*time.Millisecond),
Stopper: stop.NewStopper(),
Settings: cluster.MakeTestingClusterSettings(),
OnOutgoingPing: func(req *PingRequest) error {
OnOutgoingPing: func(ctx context.Context, req *PingRequest) error {
if req.TargetNodeID == blockedTargetNodeID {
return errBoomSend
}
return nil
},
OnIncomingPing: func(req *PingRequest) error {
OnIncomingPing: func(ctx context.Context, req *PingRequest) error {
if req.OriginNodeID == blockedOriginNodeID {
return errBoomRecv
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/rpc/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type HeartbeatService struct {
clusterName string
disableClusterNameVerification bool

onHandlePing func(*PingRequest) error // see ContextOptions.OnIncomingPing
onHandlePing func(context.Context, *PingRequest) error // see ContextOptions.OnIncomingPing

// TestingAllowNamedRPCToAnonymousServer, when defined (in tests),
// disables errors in case a heartbeat requests a specific node ID but
Expand Down Expand Up @@ -169,7 +169,7 @@ func (hs *HeartbeatService) Ping(ctx context.Context, args *PingRequest) (*PingR
}

if fn := hs.onHandlePing; fn != nil {
if err := fn(args); err != nil {
if err := fn(ctx, args); err != nil {
return nil, err
}
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,18 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
Clock: clock,
Stopper: stopper,
Settings: cfg.Settings,
OnOutgoingPing: func(req *rpc.PingRequest) error {
OnOutgoingPing: func(ctx context.Context, req *rpc.PingRequest) error {
// Outgoing ping will block requests with codes.FailedPrecondition to
// notify caller that this replica is decommissioned but others could
// still be tried as caller node is valid, but not the destination.
return checkPingFor(ctx, req.TargetNodeID, codes.FailedPrecondition)
},
OnIncomingPing: func(req *rpc.PingRequest) error {
OnIncomingPing: func(ctx context.Context, req *rpc.PingRequest) error {
// Decommission state is only tracked for the system tenant.
if tenantID, isTenant := roachpb.TenantFromContext(ctx); isTenant &&
!roachpb.IsSystemTenantID(tenantID.ToUint64()) {
return nil
}
// Incoming ping will reject requests with codes.PermissionDenied to
// signal remote node that it is not considered valid anymore and
// operations should fail immediately.
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,9 @@ func (ts *TestServer) StartTenant(
tenantKnobs.ClusterSettingsUpdater = st.MakeUpdater()
}
}
if params.RPCHeartbeatInterval != 0 {
baseCfg.RPCHeartbeatInterval = params.RPCHeartbeatInterval
}
sqlServer, addr, httpAddr, err := StartTenant(
ctx,
stopper,
Expand Down

0 comments on commit d3ed364

Please sign in to comment.