diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 9d417e84938a..cd389140c290 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -51,6 +51,7 @@ server.eventlog.enabled boolean true if set, logged notable events are also stor server.eventlog.ttl duration 2160h0m0s if nonzero, entries in system.eventlog older than this duration are deleted every 10m0s. Should not be lowered below 24 hours. server.host_based_authentication.configuration string host-based authentication configuration to use during connection authentication server.identity_map.configuration string system-identity to database-username mappings +server.max_connections_per_gateway integer -1 the maximum number of non-superuser SQL connections per gateway allowed at a given time (note: this will only limit future connection attempts and will not affect already established connections). Negative values result in unlimited number of connections. Superusers are not affected by this limit. server.oidc_authentication.autologin boolean false if true, logged-out visitors to the DB Console will be automatically redirected to the OIDC login endpoint server.oidc_authentication.button_text string Login with your OIDC provider text to show on button on DB Console login page to login with your OIDC provider (only shown if OIDC is enabled) server.oidc_authentication.claim_json_key string sets JSON key of principal to extract from payload after OIDC authentication completes (usually email or sid) @@ -179,4 +180,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 21.2-68 set the active cluster version in the format '.' +version version 21.2-70 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index e7a5246acbee..565244253dcf 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -63,6 +63,7 @@ server.eventlog.ttlduration2160h0m0sif nonzero, entries in system.eventlog older than this duration are deleted every 10m0s. Should not be lowered below 24 hours. server.host_based_authentication.configurationstringhost-based authentication configuration to use during connection authentication server.identity_map.configurationstringsystem-identity to database-username mappings +server.max_connections_per_gatewayinteger-1the maximum number of non-superuser SQL connections per gateway allowed at a given time (note: this will only limit future connection attempts and will not affect already established connections). Negative values result in unlimited number of connections. Superusers are not affected by this limit. server.oidc_authentication.autologinbooleanfalseif true, logged-out visitors to the DB Console will be automatically redirected to the OIDC login endpoint server.oidc_authentication.button_textstringLogin with your OIDC providertext to show on button on DB Console login page to login with your OIDC provider (only shown if OIDC is enabled) server.oidc_authentication.claim_json_keystringsets JSON key of principal to extract from payload after OIDC authentication completes (usually email or sid) @@ -192,6 +193,6 @@ trace.jaeger.agentstringthe address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion21.2-68set the active cluster version in the format '.' +versionversion21.2-70set the active cluster version in the format '.' diff --git a/docs/generated/sql/bnf/restore.bnf b/docs/generated/sql/bnf/restore.bnf index fc480ff4ecb8..d907009fe1a9 100644 --- a/docs/generated/sql/bnf/restore.bnf +++ b/docs/generated/sql/bnf/restore.bnf @@ -23,3 +23,15 @@ restore_stmt ::= | 'RESTORE' ( 'TABLE' table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' ( destination | '(' partitioned_backup_location ( ',' partitioned_backup_location )* ')' ) 'WITH' restore_options_list | 'RESTORE' ( 'TABLE' table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' ( destination | '(' partitioned_backup_location ( ',' partitioned_backup_location )* ')' ) 'WITH' 'OPTIONS' '(' restore_options_list ')' | 'RESTORE' ( 'TABLE' table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' ( destination | '(' partitioned_backup_location ( ',' partitioned_backup_location )* ')' ) + | 'RESTORE' 'SYSTEM' 'USERS' 'FROM' ( destination | '(' partitioned_backup_location ( ',' partitioned_backup_location )* ')' ) 'AS' 'OF' 'SYSTEM' 'TIME' timestamp 'WITH' restore_options_list + | 'RESTORE' 'SYSTEM' 'USERS' 'FROM' ( destination | '(' partitioned_backup_location ( ',' partitioned_backup_location )* ')' ) 'AS' 'OF' 'SYSTEM' 'TIME' timestamp 'WITH' 'OPTIONS' '(' restore_options_list ')' + | 'RESTORE' 'SYSTEM' 'USERS' 'FROM' ( destination | '(' partitioned_backup_location ( ',' partitioned_backup_location )* ')' ) 'AS' 'OF' 'SYSTEM' 'TIME' timestamp + | 'RESTORE' 'SYSTEM' 'USERS' 'FROM' ( destination | '(' partitioned_backup_location ( ',' partitioned_backup_location )* ')' ) 'WITH' restore_options_list + | 'RESTORE' 'SYSTEM' 'USERS' 'FROM' ( destination | '(' partitioned_backup_location ( ',' partitioned_backup_location )* ')' ) 'WITH' 'OPTIONS' '(' restore_options_list ')' + | 'RESTORE' 'SYSTEM' 'USERS' 'FROM' ( destination | '(' partitioned_backup_location ( ',' partitioned_backup_location )* ')' ) + | 'RESTORE' 'SYSTEM' 'USERS' 'FROM' subdirectory 'IN' ( destination | '(' partitioned_backup_location ( ',' partitioned_backup_location )* ')' ) 'AS' 'OF' 'SYSTEM' 'TIME' timestamp 'WITH' restore_options_list + | 'RESTORE' 'SYSTEM' 'USERS' 'FROM' subdirectory 'IN' ( destination | '(' partitioned_backup_location ( ',' partitioned_backup_location )* ')' ) 'AS' 'OF' 'SYSTEM' 'TIME' timestamp 'WITH' 'OPTIONS' '(' restore_options_list ')' + | 'RESTORE' 'SYSTEM' 'USERS' 'FROM' subdirectory 'IN' ( destination | '(' partitioned_backup_location ( ',' partitioned_backup_location )* ')' ) 'AS' 'OF' 'SYSTEM' 'TIME' timestamp + | 'RESTORE' 'SYSTEM' 'USERS' 'FROM' subdirectory 'IN' ( destination | '(' partitioned_backup_location ( ',' partitioned_backup_location )* ')' ) 'WITH' restore_options_list + | 'RESTORE' 'SYSTEM' 'USERS' 'FROM' subdirectory 'IN' ( destination | '(' partitioned_backup_location ( ',' partitioned_backup_location )* ')' ) 'WITH' 'OPTIONS' '(' restore_options_list ')' + | 'RESTORE' 'SYSTEM' 'USERS' 'FROM' subdirectory 'IN' ( destination | '(' partitioned_backup_location ( ',' partitioned_backup_location )* ')' ) diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index be434350b262..415a43a4b6fa 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -188,6 +188,8 @@ restore_stmt ::= | 'RESTORE' 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options | 'RESTORE' targets 'FROM' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options | 'RESTORE' targets 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options + | 'RESTORE' 'SYSTEM' 'USERS' 'FROM' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options + | 'RESTORE' 'SYSTEM' 'USERS' 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options | 'RESTORE' targets 'FROM' 'REPLICATION' 'STREAM' 'FROM' string_or_placeholder_opt_list opt_as_of_clause resume_stmt ::= diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index cb859281a5fc..3f4cb01c1e99 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -9476,3 +9476,91 @@ func TestExportRequestBelowGCThresholdOnDataExcludedFromBackup(t *testing.T) { _, err = conn.Exec(fmt.Sprintf("BACKUP TABLE foo TO $1 AS OF SYSTEM TIME '%s'", tsBefore), localFoo) require.NoError(t, err) } + +// TestBackupRestoreSystemUsers tests RESTORE SYSTEM USERS feature which allows user to +// restore users from a backup into current cluster and regrant roles. +func TestBackupRestoreSystemUsers(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + sqlDB, tempDir, cleanupFn := createEmptyCluster(t, singleNode) + _, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitManualReplication, base.TestClusterArgs{}) + defer cleanupFn() + defer cleanupEmptyCluster() + + sqlDB.Exec(t, `CREATE USER app; CREATE USER test`) + sqlDB.Exec(t, `CREATE ROLE app_role; CREATE ROLE test_role`) + sqlDB.Exec(t, `GRANT app_role TO test_role;`) // 'test_role' is a member of 'app_role' + sqlDB.Exec(t, `GRANT admin, app_role TO app; GRANT test_role TO test`) + sqlDB.Exec(t, `CREATE DATABASE db; CREATE TABLE db.foo (ind INT)`) + sqlDB.Exec(t, `BACKUP TO $1`, localFoo+"/1") + sqlDB.Exec(t, `BACKUP DATABASE db TO $1`, localFoo+"/2") + sqlDB.Exec(t, `BACKUP TABLE system.users TO $1`, localFoo+"/3") + + // User 'test' exists in both clusters but 'app' only exists in the backup + sqlDBRestore.Exec(t, `CREATE USER test`) + sqlDBRestore.Exec(t, `CREATE DATABASE db`) + // Create multiple databases to make max descriptor ID be larger than max descriptor ID + // in the backup to test if we correctly generate new descriptor IDs + sqlDBRestore.Exec(t, `CREATE DATABASE db1; CREATE DATABASE db2; CREATE DATABASE db3`) + + t.Run("system users", func(t *testing.T) { + sqlDBRestore.Exec(t, "RESTORE SYSTEM USERS FROM $1", localFoo+"/1") + + // Role 'app_role' and user 'app' will be added, and 'app' is granted with 'app_role' + // User test will remain untouched with no role granted + sqlDBRestore.CheckQueryResults(t, "SELECT * FROM system.users", [][]string{ + {"admin", "", "true"}, + {"app", "NULL", "false"}, + {"app_role", "NULL", "true"}, + {"root", "", "false"}, + {"test", "NULL", "false"}, + {"test_role", "NULL", "true"}, + }) + sqlDBRestore.CheckQueryResults(t, "SELECT * FROM system.role_members", [][]string{ + {"admin", "app", "false"}, + {"admin", "root", "true"}, + {"app_role", "app", "false"}, + {"app_role", "test_role", "false"}, + }) + sqlDBRestore.CheckQueryResults(t, "SHOW USERS", [][]string{ + {"admin", "", "{}"}, + {"app", "", "{admin,app_role}"}, + {"app_role", "", "{}"}, + {"root", "", "{admin}"}, + {"test", "", "{}"}, + {"test_role", "", "{app_role}"}, + }) + }) + + t.Run("restore-from-backup-with-no-system-users", func(t *testing.T) { + sqlDBRestore.ExpectErr(t, "cannot restore system users as no system.users table in the backup", + "RESTORE SYSTEM USERS FROM $1", localFoo+"/2") + }) + + _, sqlDBRestore1, cleanupEmptyCluster1 := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitManualReplication, base.TestClusterArgs{}) + defer cleanupEmptyCluster1() + t.Run("restore-from-backup-with-no-system-role-members", func(t *testing.T) { + sqlDBRestore1.Exec(t, "RESTORE SYSTEM USERS FROM $1", localFoo+"/3") + + sqlDBRestore1.CheckQueryResults(t, "SELECT * FROM system.users", [][]string{ + {"admin", "", "true"}, + {"app", "NULL", "false"}, + {"app_role", "NULL", "true"}, + {"root", "", "false"}, + {"test", "NULL", "false"}, + {"test_role", "NULL", "true"}, + }) + sqlDBRestore1.CheckQueryResults(t, "SELECT * FROM system.role_members", [][]string{ + {"admin", "root", "true"}, + }) + sqlDBRestore1.CheckQueryResults(t, "SHOW USERS", [][]string{ + {"admin", "", "{}"}, + {"app", "", "{}"}, + {"app_role", "", "{}"}, + {"root", "", "{admin}"}, + {"test", "", "{}"}, + {"test_role", "", "{}"}, + }) + }) +} diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 2fece94fc279..52cd600bc935 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1629,6 +1629,15 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro // Reload the details as we may have updated the job. details = r.job.Details().(jobspb.RestoreDetails) + if err := r.cleanupTempSystemTables(ctx, nil /* txn */); err != nil { + return err + } + } else if details.RestoreSystemUsers { + if err := r.restoreSystemUsers(ctx, p.ExecCfg().DB, mainData.systemTables); err != nil { + return err + } + details = r.job.Details().(jobspb.RestoreDetails) + if err := r.cleanupTempSystemTables(ctx, nil /* txn */); err != nil { return err } @@ -1786,7 +1795,7 @@ func (r *restoreResumer) notifyStatsRefresherOfNewTables() { // This is the last of the IDs pre-allocated by the restore planner. // TODO(postamar): Store it directly in the details instead? This is brittle. func tempSystemDatabaseID(details jobspb.RestoreDetails) descpb.ID { - if details.DescriptorCoverage != tree.AllDescriptors { + if details.DescriptorCoverage != tree.AllDescriptors && !details.RestoreSystemUsers { return descpb.InvalidID } var maxPreAllocatedID descpb.ID @@ -2555,6 +2564,59 @@ type systemTableNameWithConfig struct { config systemBackupConfiguration } +// Restore system.users from the backup into the restoring cluster. Only recreate users +// which are in a backup of system.users but do not currently exist (ignoring those who do) +// and re-grant roles for users if the backup has system.role_members. +func (r *restoreResumer) restoreSystemUsers( + ctx context.Context, db *kv.DB, systemTables []catalog.TableDescriptor, +) error { + executor := r.execCfg.InternalExecutor + return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + selectNonExistentUsers := "SELECT * FROM crdb_temp_system.users temp " + + "WHERE NOT EXISTS (SELECT * FROM system.users u WHERE temp.username = u.username)" + users, err := executor.QueryBuffered(ctx, "get-users", + txn, selectNonExistentUsers) + if err != nil { + return err + } + + insertUser := `INSERT INTO system.users ("username", "hashedPassword", "isRole") VALUES ($1, $2, $3)` + newUsernames := make(map[string]bool) + for _, user := range users { + newUsernames[user[0].String()] = true + if _, err = executor.Exec(ctx, "insert-non-existent-users", txn, insertUser, + user[0], user[1], user[2]); err != nil { + return err + } + } + + // We skip granting roles if the backup does not contain system.role_members. + if len(systemTables) == 1 { + return nil + } + + selectNonExistentRoleMembers := "SELECT * FROM crdb_temp_system.role_members temp_rm WHERE " + + "NOT EXISTS (SELECT * FROM system.role_members rm WHERE temp_rm.role = rm.role AND temp_rm.member = rm.member)" + roleMembers, err := executor.QueryBuffered(ctx, "get-role-members", + txn, selectNonExistentRoleMembers) + if err != nil { + return err + } + + insertRoleMember := `INSERT INTO system.role_members ("role", "member", "isAdmin") VALUES ($1, $2, $3)` + for _, roleMember := range roleMembers { + // Only grant roles to users that don't currently exist, i.e., new users we just added + if _, ok := newUsernames[roleMember[1].String()]; ok { + if _, err = executor.Exec(ctx, "insert-non-existent-role-members", txn, insertRoleMember, + roleMember[0], roleMember[1], roleMember[2]); err != nil { + return err + } + } + } + return nil + }) +} + // restoreSystemTables atomically replaces the contents of the system tables // with the data from the restored system tables. func (r *restoreResumer) restoreSystemTables( diff --git a/pkg/ccl/backupccl/restore_planning.go b/pkg/ccl/backupccl/restore_planning.go index 9c24d4935362..b738089edcd8 100644 --- a/pkg/ccl/backupccl/restore_planning.go +++ b/pkg/ccl/backupccl/restore_planning.go @@ -310,6 +310,7 @@ func allocateDescriptorRewrites( opts tree.RestoreOptions, intoDB string, newDBName string, + restoreSystemUsers bool, ) (DescRewriteMap, error) { descriptorRewrites := make(DescRewriteMap) @@ -421,38 +422,47 @@ func allocateDescriptorRewrites( } needsNewParentIDs := make(map[string][]descpb.ID) - // Increment the DescIDSequenceKey so that it is higher than the max desc ID - // in the backup. This generator keeps produced the next descriptor ID. + + // Increment the DescIDSequenceKey so that it is higher than both the max desc ID + // in the backup and current max desc ID in the restoring cluster. This generator + // keeps produced the next descriptor ID. var tempSysDBID descpb.ID - if descriptorCoverage == tree.AllDescriptors { + if descriptorCoverage == tree.AllDescriptors || restoreSystemUsers { var err error - // Restore the key which generates descriptor IDs. - if err = p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - v, err := txn.Get(ctx, p.ExecCfg().Codec.DescIDSequenceKey()) + if descriptorCoverage == tree.AllDescriptors { + // Restore the key which generates descriptor IDs. + if err = p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + v, err := txn.Get(ctx, p.ExecCfg().Codec.DescIDSequenceKey()) + if err != nil { + return err + } + newValue := maxDescIDInBackup + 1 + if newValue <= v.ValueInt() { + // This case may happen when restoring backups from older versions. + newValue = v.ValueInt() + 1 + } + b := txn.NewBatch() + // N.B. This key is usually mutated using the Inc command. That + // command warns that if the key was every Put directly, Inc will + // return an error. This is only to ensure that the type of the key + // doesn't change. Here we just need to be very careful that we only + // write int64 values. + // The generator's value should be set to the value of the next ID + // to generate. + b.Put(p.ExecCfg().Codec.DescIDSequenceKey(), newValue) + return txn.Run(ctx, b) + }); err != nil { + return nil, err + } + tempSysDBID, err = descidgen.GenerateUniqueDescID(ctx, p.ExecCfg().DB, p.ExecCfg().Codec) if err != nil { - return err + return nil, err + } + } else if restoreSystemUsers { + tempSysDBID, err = descidgen.GenerateUniqueDescID(ctx, p.ExecCfg().DB, p.ExecCfg().Codec) + if err != nil { + return nil, err } - newValue := maxDescIDInBackup + 1 - if newValue <= v.ValueInt() { - // This case may happen when restoring backups from older versions. - newValue = v.ValueInt() + 1 - } - b := txn.NewBatch() - // N.B. This key is usually mutated using the Inc command. That - // command warns that if the key was every Put directly, Inc will - // return an error. This is only to ensure that the type of the key - // doesn't change. Here we just need to be very careful that we only - // write int64 values. - // The generator's value should be set to the value of the next ID - // to generate. - b.Put(p.ExecCfg().Codec.DescIDSequenceKey(), newValue) - return txn.Run(ctx, b) - }); err != nil { - return nil, err - } - tempSysDBID, err = descidgen.GenerateUniqueDescID(ctx, p.ExecCfg().DB, p.ExecCfg().Codec) - if err != nil { - return nil, err } // Remap all of the descriptor belonging to system tables to the temp system // DB. @@ -831,7 +841,7 @@ func allocateDescriptorRewrites( // backup should have the same ID as they do in the backup. descriptorsToRemap := make([]catalog.Descriptor, 0, len(tablesByID)) for _, table := range tablesByID { - if descriptorCoverage == tree.AllDescriptors { + if descriptorCoverage == tree.AllDescriptors || restoreSystemUsers { if table.ParentID == systemschema.SystemDB.GetID() { // This is a system table that should be marked for descriptor creation. descriptorsToRemap = append(descriptorsToRemap, table) @@ -1522,6 +1532,7 @@ func restoreJobDescription( kmsURIs []string, ) (string, error) { r := &tree.Restore{ + SystemUsers: restore.SystemUsers, DescriptorCoverage: restore.DescriptorCoverage, AsOf: restore.AsOf, Targets: restore.Targets, @@ -1598,6 +1609,9 @@ func restorePlanHook( var intoDBFn func() (string, error) if restoreStmt.Options.IntoDB != nil { + if restoreStmt.SystemUsers { + return nil, nil, nil, false, errors.New("cannot set into_db option when only restoring system users") + } intoDBFn, err = p.TypeAsString(ctx, restoreStmt.Options.IntoDB, "RESTORE") if err != nil { return nil, nil, nil, false, err @@ -1633,6 +1647,9 @@ func restorePlanHook( " database") return nil, nil, nil, false, err } + if restoreStmt.SystemUsers { + return nil, nil, nil, false, errors.New("cannot set new_db_name option when only restoring system users") + } newDBNameFn, err = p.TypeAsString(ctx, restoreStmt.Options.NewDBName, "RESTORE") if err != nil { return nil, nil, nil, false, err @@ -2030,7 +2047,7 @@ func doRestorePlan( } sqlDescs, restoreDBs, tenants, err := selectTargets( - ctx, p, mainBackupManifests, restoreStmt.Targets, restoreStmt.DescriptorCoverage, endTime, + ctx, p, mainBackupManifests, restoreStmt.Targets, restoreStmt.DescriptorCoverage, endTime, restoreStmt.SystemUsers, ) if err != nil { return errors.Wrap(err, @@ -2158,7 +2175,8 @@ func doRestorePlan( restoreStmt.DescriptorCoverage, restoreStmt.Options, intoDB, - newDBName) + newDBName, + restoreStmt.SystemUsers) if err != nil { return err } @@ -2241,6 +2259,7 @@ func doRestorePlan( RevalidateIndexes: revalidateIndexes, DatabaseModifiers: databaseModifiers, DebugPauseOn: debugPauseOn, + RestoreSystemUsers: restoreStmt.SystemUsers, }, Progress: jobspb.RestoreProgress{}, } diff --git a/pkg/ccl/backupccl/targets.go b/pkg/ccl/backupccl/targets.go index a80ad9e51ad2..b5593ad14e29 100644 --- a/pkg/ccl/backupccl/targets.go +++ b/pkg/ccl/backupccl/targets.go @@ -321,9 +321,10 @@ func fullClusterTargetsBackup( return fullClusterDescs, fullClusterDBIDs, nil } -// selectTargets loads all descriptors from the selected backup manifest(s), and -// filters the descriptors based on the targets specified in the restore. Post -// filtering, the method returns: +// selectTargets loads all descriptors from the selected backup manifest(s), +// filters the descriptors based on the targets specified in the restore, and +// calculates the max descriptor ID in the backup. +// Post filtering, the method returns: // - A list of all descriptors (table, type, database, schema) along with their // parent databases. // - A list of database descriptors IFF the user is restoring on the cluster or @@ -336,6 +337,7 @@ func selectTargets( targets tree.TargetList, descriptorCoverage tree.DescriptorCoverage, asOf hlc.Timestamp, + restoreSystemUsers bool, ) ([]catalog.Descriptor, []catalog.DatabaseDescriptor, []descpb.TenantInfoWithUsage, error) { allDescs, lastBackupManifest := loadSQLDescsFromBackupsAtTime(backupManifests, asOf) @@ -343,6 +345,27 @@ func selectTargets( return fullClusterTargetsRestore(allDescs, lastBackupManifest) } + if restoreSystemUsers { + systemTables := make([]catalog.Descriptor, 0) + var users catalog.Descriptor + for _, desc := range allDescs { + if desc.GetParentID() == systemschema.SystemDB.GetID() { + switch desc.GetName() { + case systemschema.UsersTable.GetName(): + users = desc + systemTables = append(systemTables, desc) + case systemschema.RoleMembersTable.GetName(): + systemTables = append(systemTables, desc) + // TODO(casper): should we handle role_options table? + } + } + } + if users == nil { + return nil, nil, nil, errors.Errorf("cannot restore system users as no system.users table in the backup") + } + return systemTables, nil, nil, nil + } + if targets.Tenant != (roachpb.TenantID{}) { for _, tenant := range lastBackupManifest.GetTenants() { // TODO(dt): for now it is zero-or-one but when that changes, we should diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index a72286e77f74..77c95b0454f9 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -288,7 +288,9 @@ const ( // preserving temporary indexes, and a post-backfill merging // processing. MVCCIndexBackfiller - + // EnableLeaseHolderRemoval enables removing a leaseholder and transferring the lease + // during joint configuration, including to VOTER_INCOMING replicas. + EnableLeaseHolderRemoval // ************************************************* // Step (1): Add new versions here. // Do not add new versions to a patch release. @@ -460,6 +462,10 @@ var versionsSingleton = keyedVersions{ Key: MVCCIndexBackfiller, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 68}, }, + { + Key: EnableLeaseHolderRemoval, + Version: roachpb.Version{Major: 21, Minor: 2, Internal: 70}, + }, // ************************************************* // Step (2): Add new versions here. // Do not add new versions to a patch release. diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index 59d8a966076e..3a97efc98cc8 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -43,11 +43,12 @@ func _() { _ = x[EnablePebbleFormatVersionBlockProperties-32] _ = x[DisableSystemConfigGossipTrigger-33] _ = x[MVCCIndexBackfiller-34] + _ = x[EnableLeaseHolderRemoval-35] } -const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersTenantSettingsTableEnablePebbleFormatVersionBlockPropertiesDisableSystemConfigGossipTriggerMVCCIndexBackfiller" +const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersTenantSettingsTableEnablePebbleFormatVersionBlockPropertiesDisableSystemConfigGossipTriggerMVCCIndexBackfillerEnableLeaseHolderRemoval" -var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 455, 483, 504, 517, 536, 570, 608, 642, 674, 710, 742, 778, 820, 839, 879, 911, 930} +var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 455, 483, 504, 517, 536, 570, 608, 642, 674, 710, 742, 778, 820, 839, 879, 911, 930, 954} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/jobs/job_scheduler.go b/pkg/jobs/job_scheduler.go index 7d819df05f03..e0a1390af6af 100644 --- a/pkg/jobs/job_scheduler.go +++ b/pkg/jobs/job_scheduler.go @@ -351,6 +351,30 @@ func (s *jobScheduler) executeSchedules( return err } +// An internal, safety valve setting to revert scheduler execution to distributed mode. +// This setting should be removed once scheduled job system no longer locks tables for excessive +// periods of time. +var schedulerRunsOnSingleNode = settings.RegisterBoolSetting( + settings.TenantReadOnly, + "jobs.scheduler.single_node_scheduler.enabled", + "execute scheduler on a single node in a cluster", + false, +) + +func (s *jobScheduler) schedulerEnabledOnThisNode(ctx context.Context) bool { + if s.ShouldRunScheduler == nil || !schedulerRunsOnSingleNode.Get(&s.Settings.SV) { + return true + } + + enabled, err := s.ShouldRunScheduler(ctx, s.DB.Clock().NowAsClockTimestamp()) + if err != nil { + log.Errorf(ctx, "error determining if the scheduler enabled: %v; will recheck after %s", + err, recheckEnabledAfterPeriod) + return false + } + return enabled +} + func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) { _ = stopper.RunAsyncTask(ctx, "job-scheduler", func(ctx context.Context) { initialDelay := getInitialScanDelay(s.TestingKnobs) @@ -361,13 +385,12 @@ func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) { } for timer := time.NewTimer(initialDelay); ; timer.Reset( - getWaitPeriod(ctx, &s.Settings.SV, jitter, s.TestingKnobs)) { + getWaitPeriod(ctx, &s.Settings.SV, s.schedulerEnabledOnThisNode, jitter, s.TestingKnobs)) { select { case <-stopper.ShouldQuiesce(): return case <-timer.C: - if !schedulerEnabledSetting.Get(&s.Settings.SV) { - log.Info(ctx, "scheduled job daemon disabled") + if !schedulerEnabledSetting.Get(&s.Settings.SV) || !s.schedulerEnabledOnThisNode(ctx) { continue } @@ -427,7 +450,11 @@ type jitterFn func(duration time.Duration) time.Duration // Returns duration to wait before scanning system.scheduled_jobs. func getWaitPeriod( - ctx context.Context, sv *settings.Values, jitter jitterFn, knobs base.ModuleTestingKnobs, + ctx context.Context, + sv *settings.Values, + enabledOnThisNode func(ctx context.Context) bool, + jitter jitterFn, + knobs base.ModuleTestingKnobs, ) time.Duration { if k, ok := knobs.(*TestingKnobs); ok && k.SchedulerDaemonScanDelay != nil { return k.SchedulerDaemonScanDelay() @@ -437,6 +464,10 @@ func getWaitPeriod( return recheckEnabledAfterPeriod } + if enabledOnThisNode != nil && !enabledOnThisNode(ctx) { + return recheckEnabledAfterPeriod + } + pace := schedulerPaceSetting.Get(sv) if pace < minPacePeriod { if warnIfPaceTooLow.ShouldLog() { @@ -481,5 +512,9 @@ func StartJobSchedulerDaemon( return } + if daemonKnobs != nil && daemonKnobs.CaptureJobScheduler != nil { + daemonKnobs.CaptureJobScheduler(scheduler) + } + scheduler.runDaemon(ctx, stopper) } diff --git a/pkg/jobs/job_scheduler_test.go b/pkg/jobs/job_scheduler_test.go index b68230cb08fb..4a2588f18309 100644 --- a/pkg/jobs/job_scheduler_test.go +++ b/pkg/jobs/job_scheduler_test.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -221,22 +222,23 @@ func TestJobSchedulerDaemonGetWaitPeriod(t *testing.T) { sv, cleanup := getScopedSettings() defer cleanup() + var schedulerEnabled func(context.Context) bool noJitter := func(d time.Duration) time.Duration { return d } schedulerEnabledSetting.Override(ctx, sv, false) // When disabled, we wait 5 minutes before rechecking. - require.EqualValues(t, 5*time.Minute, getWaitPeriod(ctx, sv, noJitter, nil)) + require.EqualValues(t, 5*time.Minute, getWaitPeriod(ctx, sv, schedulerEnabled, noJitter, nil)) schedulerEnabledSetting.Override(ctx, sv, true) // When pace is too low, we use something more reasonable. schedulerPaceSetting.Override(ctx, sv, time.Nanosecond) - require.EqualValues(t, minPacePeriod, getWaitPeriod(ctx, sv, noJitter, nil)) + require.EqualValues(t, minPacePeriod, getWaitPeriod(ctx, sv, schedulerEnabled, noJitter, nil)) // Otherwise, we use user specified setting. pace := 42 * time.Second schedulerPaceSetting.Override(ctx, sv, pace) - require.EqualValues(t, pace, getWaitPeriod(ctx, sv, noJitter, nil)) + require.EqualValues(t, pace, getWaitPeriod(ctx, sv, schedulerEnabled, noJitter, nil)) } type recordScheduleExecutor struct { @@ -762,3 +764,65 @@ INSERT INTO defaultdb.foo VALUES(1, 1) updated := h.loadSchedule(t, schedule.ScheduleID()) require.Equal(t, "", updated.ScheduleStatus()) } + +func TestSchedulerCanBeRestrictedToSingleNode(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const numNodes = 3 + for _, enableSingleNode := range []bool{true, false} { + t.Run(fmt.Sprintf("runs-on-single-node=%t", enableSingleNode), func(t *testing.T) { + schedulers := struct { + syncutil.Mutex + schedulers []*jobScheduler + }{} + knobs := &TestingKnobs{ + CaptureJobScheduler: func(s interface{}) { + schedulers.Lock() + defer schedulers.Unlock() + schedulers.schedulers = append(schedulers.schedulers, s.(*jobScheduler)) + }, + } + + args := base.TestServerArgs{ + Knobs: base.TestingKnobs{JobsTestingKnobs: knobs}, + } + + tc := serverutils.StartNewTestCluster(t, numNodes, base.TestClusterArgs{ServerArgs: args}) + defer tc.Stopper().Stop(context.Background()) + + testutils.SucceedsSoon(t, func() error { + schedulers.Lock() + defer schedulers.Unlock() + if len(schedulers.schedulers) == numNodes { + return nil + } + return errors.Newf("want %d schedules, got %d", numNodes, len(schedulers.schedulers)) + }) + + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + sqlDB.Exec(t, "SET CLUSTER SETTING jobs.scheduler.single_node_scheduler.enabled=$1", enableSingleNode) + + schedulers.Lock() + defer schedulers.Unlock() + expectedEnabled := numNodes + if enableSingleNode { + expectedEnabled = 1 + } + + testutils.SucceedsSoon(t, func() error { + numEnabled := 0 + for _, s := range schedulers.schedulers { + if s.schedulerEnabledOnThisNode(context.Background()) { + numEnabled++ + } + } + if numEnabled == expectedEnabled { + return nil + } + return errors.Newf("expecting %d enabled, found %d", expectedEnabled, numEnabled) + }) + + }) + } +} diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index b39599593293..cad1b66d0fa5 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -317,8 +317,9 @@ message RestoreDetails { // DebugPauseOn describes the events that the job should pause itself on for debugging purposes. string debug_pause_on = 20; + bool restore_system_users = 22; - // NEXT ID: 22. + // NEXT ID: 23. } message RestoreProgress { diff --git a/pkg/jobs/testing_knobs.go b/pkg/jobs/testing_knobs.go index 5f863bf7767b..2474b742d7cb 100644 --- a/pkg/jobs/testing_knobs.go +++ b/pkg/jobs/testing_knobs.go @@ -38,6 +38,11 @@ type TestingKnobs struct { // may invoke directly, bypassing normal job scheduler daemon logic. TakeOverJobsScheduling func(func(ctx context.Context, maxSchedules int64, txn *kv.Txn) error) + // CaptureJobScheduler is a function which will be passed a fully constructed job scheduler. + // The scheduler is passed in as interface{} because jobScheduler is an unexported type. + // This testing knob is useful only for job scheduler tests. + CaptureJobScheduler func(scheduler interface{}) + // CaptureJobExecutionConfig is a callback invoked with a job execution config // which will be used when executing job schedules. // The reason this callback exists is due to a circular dependency issues that exists diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 790e4955b1f7..81b0b8b76cb1 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -2009,11 +2009,17 @@ func replicaIsBehind(raftStatus *raft.Status, replicaID roachpb.ReplicaID) bool // with an empty or nil `raftStatus` (as will be the case when its called by a // replica that is not the raft leader), we pessimistically assume that // `replicaID` may need a snapshot. -func replicaMayNeedSnapshot(raftStatus *raft.Status, replicaID roachpb.ReplicaID) bool { +func replicaMayNeedSnapshot(raftStatus *raft.Status, replica roachpb.ReplicaDescriptor) bool { + // When adding replicas, we only move them from LEARNER to VOTER_INCOMING after + // they applied the snapshot (see initializeRaftLearners and its use in + // changeReplicasImpl). + if replica.GetType() == roachpb.VOTER_INCOMING { + return false + } if raftStatus == nil || len(raftStatus.Progress) == 0 { return true } - if progress, ok := raftStatus.Progress[uint64(replicaID)]; ok { + if progress, ok := raftStatus.Progress[uint64(replica.ReplicaID)]; ok { // We can only reasonably assume that the follower replica is not in need of // a snapshot iff it is in `StateReplicate`. However, even this is racey // because we can still possibly have an ill-timed log truncation between @@ -2040,7 +2046,7 @@ func excludeReplicasInNeedOfSnapshots( filled := 0 for _, repl := range replicas { - if replicaMayNeedSnapshot(raftStatus, repl.ReplicaID) { + if replicaMayNeedSnapshot(raftStatus, repl) { log.VEventf( ctx, 5, diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index 64ada8ab54e1..1e614ae673b2 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -16,7 +16,6 @@ import ( "math" "runtime" "strconv" - "strings" "sync" "sync/atomic" "testing" @@ -594,6 +593,182 @@ func TestLeasePreferencesRebalance(t *testing.T) { }) } +// Tests that when leaseholder is relocated, the lease can be transferred directly to a new node. +// This verifies https://github.com/cockroachdb/cockroach/issues/67740 +func TestLeaseholderRelocatePreferred(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + testLeaseholderRelocateInternal(t, "us") +} + +// Tests that when leaseholder is relocated, the lease will transfer to a node in a preferred +// location, even if another node is being added. +// This verifies https://github.com/cockroachdb/cockroach/issues/67740 +func TestLeaseholderRelocateNonPreferred(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + testLeaseholderRelocateInternal(t, "eu") +} + +// Tests that when leaseholder is relocated, the lease will transfer to some node, +// even if nodes in the preferred region aren't available. +// This verifies https://github.com/cockroachdb/cockroach/issues/67740 +func TestLeaseholderRelocateNonExistent(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + testLeaseholderRelocateInternal(t, "au") +} + +// Tests that when leaseholder is relocated, the lease can be transferred directly to new node +func testLeaseholderRelocateInternal(t *testing.T, preferredRegion string) { + stickyRegistry := server.NewStickyInMemEnginesRegistry() + defer stickyRegistry.CloseAllStickyInMemEngines() + ctx := context.Background() + manualClock := hlc.NewHybridManualClock() + zcfg := zonepb.DefaultZoneConfig() + zcfg.LeasePreferences = []zonepb.LeasePreference{ + { + Constraints: []zonepb.Constraint{ + {Type: zonepb.Constraint_REQUIRED, Key: "region", Value: preferredRegion}, + }, + }, + } + + serverArgs := make(map[int]base.TestServerArgs) + locality := func(region string) roachpb.Locality { + return roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "region", Value: region}, + }, + } + } + localities := []roachpb.Locality{ + locality("eu"), + locality("eu"), + locality("us"), + locality("us"), + locality("au"), + } + + const numNodes = 4 + for i := 0; i < numNodes; i++ { + serverArgs[i] = base.TestServerArgs{ + Locality: localities[i], + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + DefaultZoneConfigOverride: &zcfg, + StickyEngineRegistry: stickyRegistry, + }, + }, + StoreSpecs: []base.StoreSpec{ + { + InMemory: true, + StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + }, + }, + } + } + tc := testcluster.StartTestCluster(t, numNodes, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: serverArgs, + }) + defer tc.Stopper().Stop(ctx) + + _, rhsDesc := tc.SplitRangeOrFatal(t, bootstrap.TestingUserTableDataMin()) + + // We start with having the range under test on (1,2,3). + tc.AddVotersOrFatal(t, rhsDesc.StartKey.AsRawKey(), tc.Targets(1, 2)...) + + // Make sure the lease is on 3 + tc.TransferRangeLeaseOrFatal(t, rhsDesc, tc.Target(2)) + + // Check that the lease moved to 3. + leaseHolder, err := tc.FindRangeLeaseHolder(rhsDesc, nil) + require.NoError(t, err) + require.Equal(t, tc.Target(2), leaseHolder) + + gossipLiveness(t, tc) + + testutils.SucceedsSoon(t, func() error { + // Relocate range 3 -> 4. + err = tc.Servers[2].DB(). + AdminRelocateRange( + context.Background(), rhsDesc.StartKey.AsRawKey(), + tc.Targets(0, 1, 3), nil, false) + if err != nil { + require.True(t, kvserver.IsTransientLeaseholderError(err), "%v", err) + return err + } + leaseHolder, err = tc.FindRangeLeaseHolder(rhsDesc, nil) + if err != nil { + return err + } + if leaseHolder.Equal(tc.Target(2)) { + return errors.Errorf("Leaseholder didn't move.") + } + return nil + }) + + // The only node with "au" locality is down, the lease can move anywhere. + if preferredRegion == "au" { + return + } + + // Make sure lease moved to the preferred region, if . + leaseHolder, err = tc.FindRangeLeaseHolder(rhsDesc, nil) + require.NoError(t, err) + require.Equal(t, locality(preferredRegion), + localities[leaseHolder.NodeID-1]) + + var leaseholderNodeId int + if preferredRegion == "us" { + require.Equal(t, tc.Target(3).NodeID, + leaseHolder.NodeID) + leaseholderNodeId = 3 + } else { + if leaseHolder.NodeID == tc.Target(0).NodeID { + leaseholderNodeId = 0 + } else { + require.Equal(t, tc.Target(1).NodeID, + leaseHolder.NodeID) + leaseholderNodeId = 1 + } + } + + // Double check that lease moved directly. + repl := tc.GetFirstStoreFromServer(t, leaseholderNodeId). + LookupReplica(roachpb.RKey(rhsDesc.StartKey.AsRawKey())) + history := repl.GetLeaseHistory() + require.Equal(t, leaseHolder.NodeID, + history[len(history)-1].Replica.NodeID) + require.Equal(t, tc.Target(2).NodeID, + history[len(history)-2].Replica.NodeID) +} + +func gossipLiveness(t *testing.T, tc *testcluster.TestCluster) { + for i := range tc.Servers { + testutils.SucceedsSoon(t, tc.Servers[i].HeartbeatNodeLiveness) + } + // Make sure that all store pools have seen liveness heartbeats from everyone. + testutils.SucceedsSoon(t, func() error { + for i := range tc.Servers { + for j := range tc.Servers { + live, err := tc.GetFirstStoreFromServer(t, i).GetStoreConfig(). + StorePool.IsLive(tc.Target(j).StoreID) + if err != nil { + return err + } + if !live { + return errors.Errorf("Server %d is suspect on server %d", j, i) + } + } + } + return nil + }) +} + // This test replicates the behavior observed in // https://github.com/cockroachdb/cockroach/issues/62485. We verify that // when a dc with the leaseholder is lost, a node in a dc that does not have the @@ -716,29 +891,10 @@ func TestLeasePreferencesDuringOutage(t *testing.T) { return nil }) - _, processError, enqueueError := tc.GetFirstStoreFromServer(t, 0). + _, _, enqueueError := tc.GetFirstStoreFromServer(t, 0). ManuallyEnqueue(ctx, "replicate", repl, true) + require.NoError(t, enqueueError) - if processError != nil { - log.Infof(ctx, "a US replica stole lease, manually moving it to the EU.") - if !strings.Contains(processError.Error(), "does not have the range lease") { - t.Fatal(processError) - } - // The us replica ended up stealing the lease, so we need to manually - // transfer the lease and then do another run through the replicate queue - // to move it to the us. - tc.TransferRangeLeaseOrFatal(t, *repl.Desc(), tc.Target(0)) - testutils.SucceedsSoon(t, func() error { - if !repl.OwnsValidLease(ctx, tc.Servers[0].Clock().NowAsClockTimestamp()) { - return errors.Errorf("Expected lease to transfer to server 0") - } - return nil - }) - _, processError, enqueueError = tc.GetFirstStoreFromServer(t, 0). - ManuallyEnqueue(ctx, "replicate", repl, true) - require.NoError(t, enqueueError) - require.NoError(t, processError) - } var newLeaseHolder roachpb.ReplicationTarget testutils.SucceedsSoon(t, func() error { @@ -747,22 +903,26 @@ func TestLeasePreferencesDuringOutage(t *testing.T) { return err }) + // Check that the leaseholder is in the US srv, err := tc.FindMemberServer(newLeaseHolder.StoreID) require.NoError(t, err) region, ok := srv.Locality().Find("region") require.True(t, ok) require.Equal(t, "us", region) - require.Equal(t, 3, len(repl.Desc().Replicas().Voters().VoterDescriptors())) + // Validate that we upreplicated outside of SF. - for _, replDesc := range repl.Desc().Replicas().Voters().VoterDescriptors() { + replicas := repl.Desc().Replicas().Voters().VoterDescriptors() + require.Equal(t, 3, len(replicas)) + for _, replDesc := range replicas { serv, err := tc.FindMemberServer(replDesc.StoreID) require.NoError(t, err) dc, ok := serv.Locality().Find("dc") require.True(t, ok) require.NotEqual(t, "sf", dc) } - history := repl.GetLeaseHistory() + // make sure we see the eu node as a lease holder in the second to last position. + history := repl.GetLeaseHistory() require.Equal(t, tc.Target(0).NodeID, history[len(history)-2].Replica.NodeID) } @@ -841,7 +1001,7 @@ func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) { _, rhsDesc := tc.SplitRangeOrFatal(t, bootstrap.TestingUserTableDataMin()) tc.AddVotersOrFatal(t, rhsDesc.StartKey.AsRawKey(), tc.Targets(1, 2, 3)...) - tc.RemoveLeaseHolderOrFatal(t, rhsDesc, tc.Target(0), tc.Target(1)) + tc.RemoveLeaseHolderOrFatal(t, rhsDesc, tc.Target(0)) startKeys := make([]roachpb.Key, 20) startKeys[0] = rhsDesc.StartKey.AsRawKey() diff --git a/pkg/kv/kvserver/client_metrics_test.go b/pkg/kv/kvserver/client_metrics_test.go index 28575385104d..948b5627d4db 100644 --- a/pkg/kv/kvserver/client_metrics_test.go +++ b/pkg/kv/kvserver/client_metrics_test.go @@ -337,7 +337,7 @@ func TestStoreMetrics(t *testing.T) { // Verify stats after addition. verifyStats(t, tc, 1, 2) checkGauge(t, "store 0", tc.GetFirstStoreFromServer(t, 0).Metrics().ReplicaCount, initialCount+1) - tc.RemoveLeaseHolderOrFatal(t, desc, tc.Target(0), tc.Target(1)) + tc.RemoveLeaseHolderOrFatal(t, desc, tc.Target(0)) testutils.SucceedsSoon(t, func() error { _, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID) if err == nil { diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index f82d3fe53976..21ad3b6b37af 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -3826,16 +3826,25 @@ func TestLeaseHolderRemoveSelf(t *testing.T) { }) defer tc.Stopper().Stop(ctx) - leaseHolder := tc.GetFirstStoreFromServer(t, 0) - key := []byte("a") - tc.SplitRangeOrFatal(t, key) - tc.AddVotersOrFatal(t, key, tc.Target(1)) + _, desc := tc.SplitRangeOrFatal(t, bootstrap.TestingUserTableDataMin()) + key := desc.StartKey.AsRawKey() + tc.AddVotersOrFatal(t, key, tc.Targets(1)...) + + // Remove the replica from first store. + tc.RemoveLeaseHolderOrFatal(t, desc, tc.Target(0)) - // Attempt to remove the replica from first store. - expectedErr := "invalid ChangeReplicasTrigger" - if _, err := tc.RemoveVoters(key, tc.Target(0)); !testutils.IsError(err, expectedErr) { - t.Fatalf("expected %q error trying to remove leaseholder replica; got %v", expectedErr, err) + // Check that lease moved to server 2. + leaseInfo := getLeaseInfoOrFatal(t, context.Background(), tc.Servers[1].DB(), key) + rangeDesc, err := tc.LookupRange(key) + if err != nil { + t.Fatal(err) + } + replica, ok := rangeDesc.GetReplicaDescriptor(tc.Servers[1].GetFirstStoreID()) + if !ok { + t.Fatalf("expected to find replica in server 2") } + require.Equal(t, leaseInfo.Lease.Replica, replica) + leaseHolder := tc.GetFirstStoreFromServer(t, 1) // Expect that we can still successfully do a get on the range. getArgs := getArgs(key) diff --git a/pkg/kv/kvserver/client_relocate_range_test.go b/pkg/kv/kvserver/client_relocate_range_test.go index 5240945a9927..353da7c679ee 100644 --- a/pkg/kv/kvserver/client_relocate_range_test.go +++ b/pkg/kv/kvserver/client_relocate_range_test.go @@ -148,10 +148,12 @@ func usesAtomicReplicationChange(ops []roachpb.ReplicationChange) bool { // 4. Voter swapped with non-voter (ADD_VOTER, REMOVE_NON_VOTER, // ADD_NON_VOTER, REMOVE_VOTER) if len(ops) >= 2 { + // Either a simple voter rebalance, or its a non-voter promotion. if ops[0].ChangeType == roachpb.ADD_VOTER && ops[1].ChangeType.IsRemoval() { return true } } + // Demotion of a voter. if len(ops) == 2 && ops[0].ChangeType == roachpb.ADD_NON_VOTER && ops[1].ChangeType == roachpb.REMOVE_VOTER { return true @@ -246,10 +248,9 @@ func TestAdminRelocateRange(t *testing.T) { } // s5 (LH) ---> s3 (LH) - // Lateral movement while at replication factor one. In this case atomic - // replication changes cannot be used; we add-then-remove instead. + // Lateral movement while at replication factor one. { - requireNumAtomic(0, 2, func() { + requireNumAtomic(1, 0, func() { relocateAndCheck(t, tc, k, tc.Targets(2), nil /* nonVoterTargets */) }) } diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 6daf41460cfc..a14aa81a3a3c 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -1815,10 +1815,24 @@ func TestLeaseExtensionNotBlockedByRead(t *testing.T) { readBlocked <- struct{}{} } } +func validateLeaseholderSoon( + t *testing.T, db *kv.DB, key roachpb.Key, replica roachpb.ReplicaDescriptor, isTarget bool, +) { + testutils.SucceedsSoon(t, func() error { + leaseInfo := getLeaseInfoOrFatal(t, context.Background(), db, key) + if isTarget && leaseInfo.Lease.Replica != replica { + return fmt.Errorf("lease holder should be replica %+v, but is: %+v", + replica, leaseInfo.Lease.Replica) + } else if !isTarget && leaseInfo.Lease.Replica == replica { + return fmt.Errorf("lease holder still on replica: %+v", replica) + } + return nil + }) +} -func getLeaseInfo( - ctx context.Context, db *kv.DB, key roachpb.Key, -) (*roachpb.LeaseInfoResponse, error) { +func getLeaseInfoOrFatal( + t *testing.T, ctx context.Context, db *kv.DB, key roachpb.Key, +) *roachpb.LeaseInfoResponse { header := roachpb.Header{ // INCONSISTENT read with a NEAREST routing policy, since we want to make // sure that the node used to send this is the one that processes the @@ -1829,9 +1843,37 @@ func getLeaseInfo( leaseInfoReq := &roachpb.LeaseInfoRequest{RequestHeader: roachpb.RequestHeader{Key: key}} reply, pErr := kv.SendWrappedWith(ctx, db.NonTransactionalSender(), header, leaseInfoReq) if pErr != nil { - return nil, pErr.GoError() + t.Fatal(pErr) } - return reply.(*roachpb.LeaseInfoResponse), nil + return reply.(*roachpb.LeaseInfoResponse) +} + +func TestRemoveLeaseholder(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + tc := testcluster.StartTestCluster(t, numNodes, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(context.Background()) + _, rhsDesc := tc.SplitRangeOrFatal(t, bootstrap.TestingUserTableDataMin()) + + // We start with having the range under test on (1,2,3). + tc.AddVotersOrFatal(t, rhsDesc.StartKey.AsRawKey(), tc.Targets(1, 2)...) + + // Make sure the lease is on 1. + tc.TransferRangeLeaseOrFatal(t, rhsDesc, tc.Target(0)) + leaseHolder, err := tc.FindRangeLeaseHolder(rhsDesc, nil) + require.NoError(t, err) + require.Equal(t, tc.Target(0), leaseHolder) + + // Remove server 1. + tc.RemoveLeaseHolderOrFatal(t, rhsDesc, tc.Target(0)) + + // Check that the lease moved away from 1. + leaseHolder, err = tc.FindRangeLeaseHolder(rhsDesc, nil) + require.NoError(t, err) + require.NotEqual(t, tc.Target(0), leaseHolder) } func TestLeaseInfoRequest(t *testing.T) { @@ -1841,7 +1883,6 @@ func TestLeaseInfoRequest(t *testing.T) { defer tc.Stopper().Stop(context.Background()) kvDB0 := tc.Servers[0].DB() - kvDB1 := tc.Servers[1].DB() key := []byte("a") rangeDesc, err := tc.LookupRange(key) @@ -1856,13 +1897,6 @@ func TestLeaseInfoRequest(t *testing.T) { t.Fatalf("expected to find replica in server %d", i) } } - mustGetLeaseInfo := func(db *kv.DB) *roachpb.LeaseInfoResponse { - resp, err := getLeaseInfo(context.Background(), db, rangeDesc.StartKey.AsRawKey()) - if err != nil { - t.Fatal(err) - } - return resp - } // Transfer the lease to Servers[0] so we start in a known state. Otherwise, // there might be already a lease owned by a random node. @@ -1871,16 +1905,8 @@ func TestLeaseInfoRequest(t *testing.T) { t.Fatal(err) } - // Now test the LeaseInfo. We might need to loop until the node we query has - // applied the lease. - testutils.SucceedsSoon(t, func() error { - leaseHolderReplica := mustGetLeaseInfo(kvDB0).Lease.Replica - if leaseHolderReplica != replicas[0] { - return fmt.Errorf("lease holder should be replica %+v, but is: %+v", - replicas[0], leaseHolderReplica) - } - return nil - }) + // Now test the LeaseInfo. We might need to loop until the node we query has applied the lease. + validateLeaseholderSoon(t, kvDB0, rangeDesc.StartKey.AsRawKey(), replicas[0], true) // Transfer the lease to Server 1 and check that LeaseInfoRequest gets the // right answer. @@ -1891,26 +1917,19 @@ func TestLeaseInfoRequest(t *testing.T) { // An inconsistent LeaseInfoReqeust on the old lease holder should give us the // right answer immediately, since the old holder has definitely applied the // transfer before TransferRangeLease returned. - leaseHolderReplica := mustGetLeaseInfo(kvDB0).Lease.Replica - if !leaseHolderReplica.Equal(replicas[1]) { + leaseInfo := getLeaseInfoOrFatal(t, context.Background(), kvDB0, rangeDesc.StartKey.AsRawKey()) + if !leaseInfo.Lease.Replica.Equal(replicas[1]) { t.Fatalf("lease holder should be replica %+v, but is: %+v", - replicas[1], leaseHolderReplica) + replicas[1], leaseInfo.Lease.Replica) } // A read on the new lease holder does not necessarily succeed immediately, // since it might take a while for it to apply the transfer. - testutils.SucceedsSoon(t, func() error { - // We can't reliably do a CONSISTENT read here, even though we're reading - // from the supposed lease holder, because this node might initially be - // unaware of the new lease and so the request might bounce around for a - // while (see #8816). - leaseHolderReplica = mustGetLeaseInfo(kvDB1).Lease.Replica - if !leaseHolderReplica.Equal(replicas[1]) { - return errors.Errorf("lease holder should be replica %+v, but is: %+v", - replicas[1], leaseHolderReplica) - } - return nil - }) + // We can't reliably do a CONSISTENT read here, even though we're reading + // from the supposed lease holder, because this node might initially be + // unaware of the new lease and so the request might bounce around for a + // while (see #8816). + validateLeaseholderSoon(t, kvDB0, rangeDesc.StartKey.AsRawKey(), replicas[1], true) // Transfer the lease to Server 2 and check that LeaseInfoRequest gets the // right answer. @@ -1942,10 +1961,9 @@ func TestLeaseInfoRequest(t *testing.T) { t.Fatal(pErr) } resp := *(reply.(*roachpb.LeaseInfoResponse)) - leaseHolderReplica = resp.Lease.Replica - if !leaseHolderReplica.Equal(replicas[2]) { - t.Fatalf("lease holder should be replica %s, but is: %s", replicas[2], leaseHolderReplica) + if !resp.Lease.Replica.Equal(replicas[2]) { + t.Fatalf("lease holder should be replica %s, but is: %s", replicas[2], resp.Lease.Replica) } // TODO(andrei): test the side-effect of LeaseInfoRequest when there's no @@ -2279,8 +2297,8 @@ func TestClearRange(t *testing.T) { t.Fatal(err) } var actualKeys []roachpb.Key - for _, kv := range kvs { - actualKeys = append(actualKeys, kv.Key.Key) + for _, keyValue := range kvs { + actualKeys = append(actualKeys, keyValue.Key.Key) } if !reflect.DeepEqual(expectedKeys, actualKeys) { t.Fatalf("expected %v, but got %v", expectedKeys, actualKeys) @@ -2541,7 +2559,7 @@ func TestRandomConcurrentAdminChangeReplicasRequests(t *testing.T) { ctx := context.Background() defer tc.Stopper().Stop(ctx) const actors = 10 - errors := make([]error, actors) + errs := make([]error, actors) var wg sync.WaitGroup key := roachpb.Key("a") db := tc.Servers[0].DB() @@ -2581,11 +2599,11 @@ func TestRandomConcurrentAdminChangeReplicasRequests(t *testing.T) { } wg.Add(actors) for i := 0; i < actors; i++ { - go func(i int) { errors[i] = addReplicas(); wg.Done() }(i) + go func(i int) { errs[i] = addReplicas(); wg.Done() }(i) } wg.Wait() var gotSuccess bool - for _, err := range errors { + for _, err := range errs { if err != nil { require.Truef(t, kvserver.IsRetriableReplicationChangeError(err), "%s; desc: %v", err, rangeInfo.Desc) } else if gotSuccess { @@ -2611,7 +2629,7 @@ func TestChangeReplicasSwapVoterWithNonVoter(t *testing.T) { key := tc.ScratchRange(t) // NB: The test cluster starts with firstVoter having a voting replica (and // the lease) for all ranges. - firstVoter, secondVoter, nonVoter := tc.Target(0), tc.Target(1), tc.Target(3) + firstVoter, nonVoter := tc.Target(0), tc.Target(1) firstStore, err := tc.Server(0).GetStores().(*kvserver.Stores).GetStore(tc.Server(0).GetFirstStoreID()) require.NoError(t, err) firstRepl := firstStore.LookupReplica(roachpb.RKey(key)) @@ -2619,17 +2637,8 @@ func TestChangeReplicasSwapVoterWithNonVoter(t *testing.T) { " replica for the ScratchRange") tc.AddNonVotersOrFatal(t, key, nonVoter) - // TODO(aayush): Trying to swap the last voting replica with a non-voter hits - // the safeguard inside Replica.propose() as the last voting replica is always - // the leaseholder. There are a bunch of subtleties around getting a - // leaseholder to remove itself without another voter to immediately transfer - // the lease to. See #40333. - _, err = tc.SwapVoterWithNonVoter(key, firstVoter, nonVoter) - require.Regexp(t, "received invalid ChangeReplicasTrigger", err) - - tc.AddVotersOrFatal(t, key, secondVoter) - - tc.SwapVoterWithNonVoterOrFatal(t, key, secondVoter, nonVoter) + // Swap the only voting replica (leaseholder) with a non-voter + tc.SwapVoterWithNonVoterOrFatal(t, key, firstVoter, nonVoter) } // TestReplicaTombstone ensures that tombstones are written when we expect diff --git a/pkg/kv/kvserver/markers.go b/pkg/kv/kvserver/markers.go index 29b7269e4469..0caa39e2efe0 100644 --- a/pkg/kv/kvserver/markers.go +++ b/pkg/kv/kvserver/markers.go @@ -78,3 +78,13 @@ var errMarkInvalidReplicationChange = errors.New("invalid replication change") func IsIllegalReplicationChangeError(err error) bool { return errors.Is(err, errMarkInvalidReplicationChange) } + +var errLeaseholderNotRaftLeader = errors.New( + "removing leaseholder not allowed since it isn't the Raft leader") + +// IsTransientLeaseholderError can happen when a reconfiguration is invoked, +// if the Raft leader is not collocated with the leaseholder. +// This is temporary, and indicates that the operation should be retried. +func IsTransientLeaseholderError(err error) bool { + return errors.Is(err, errLeaseholderNotRaftLeader) +} diff --git a/pkg/kv/kvserver/merge_queue.go b/pkg/kv/kvserver/merge_queue.go index 1ec5224111b9..534253af8527 100644 --- a/pkg/kv/kvserver/merge_queue.go +++ b/pkg/kv/kvserver/merge_queue.go @@ -281,18 +281,14 @@ func (mq *mergeQueue) process( } { - store := lhsRepl.store // AdminMerge errors if there is a learner or joint config on either // side and AdminRelocateRange removes any on the range it operates on. - // For the sake of obviousness, just fix this all upfront. + // For the sake of obviousness, just fix this all upfront. The merge is + // performed by the LHS leaseholder, so it can easily do this for LHS. + // We deal with the RHS, whose leaseholder may be remote, further down. var err error - lhsDesc, err = maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, store, lhsDesc) - if err != nil { - log.VEventf(ctx, 2, `%v`, err) - return false, err - } - - rhsDesc, err = maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, store, rhsDesc) + lhsDesc, err = + lhsRepl.maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, lhsDesc) if err != nil { log.VEventf(ctx, 2, `%v`, err) return false, err @@ -311,21 +307,15 @@ func (mq *mergeQueue) process( ) } } - for i := range rightRepls { - if typ := rightRepls[i].GetType(); !(typ == roachpb.VOTER_FULL || typ == roachpb.NON_VOTER) { - return false, - errors.AssertionFailedf( - `cannot merge because rhs is either in a joint state or has learner replicas: %v`, - rightRepls, - ) - } - } // Range merges require that the set of stores that contain a replica for the // RHS range be equal to the set of stores that contain a replica for the LHS // range. The LHS and RHS ranges' leaseholders do not need to be co-located - // and types of the replicas (voting or non-voting) do not matter. - if !replicasCollocated(leftRepls, rightRepls) { + // and types of the replicas (voting or non-voting) do not matter. Even if + // replicas are collocated, the RHS might still be in a joint config, and + // calling AdminRelocateRange will fix this. + if !replicasCollocated(leftRepls, rightRepls) || + rhsDesc.Replicas().InAtomicReplicationChange() { // TODO(aayush): We enable merges to proceed even when LHS and/or RHS are in // violation of their constraints (by adding or removing replicas on the RHS // as needed). We could instead choose to check constraints conformance of @@ -361,6 +351,23 @@ func (mq *mergeQueue) process( ); err != nil { return false, err } + + // Refresh RHS descriptor. + rhsDesc, _, _, _, err = mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey()) + if err != nil { + return false, err + } + rightRepls = rhsDesc.Replicas().Descriptors() + } + for i := range rightRepls { + if typ := rightRepls[i].GetType(); !(typ == roachpb.VOTER_FULL || typ == roachpb.NON_VOTER) { + log.Infof(ctx, "RHS Type: %s", typ) + return false, + errors.AssertionFailedf( + `cannot merge because rhs is either in a joint state or has learner replicas: %v`, + rightRepls, + ) + } } log.VEventf(ctx, 2, "merging to produce range: %s-%s", mergedDesc.StartKey, mergedDesc.EndKey) diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 873a2c690a4b..6ed71ebcebb6 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -19,6 +19,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" @@ -310,16 +311,16 @@ func (r *Replica) adminSplitWithDescriptor( reason string, ) (roachpb.AdminSplitResponse, error) { var err error + var reply roachpb.AdminSplitResponse + // The split queue doesn't care about the set of replicas, so if we somehow // are being handed one that's in a joint state, finalize that before // continuing. - desc, err = maybeLeaveAtomicChangeReplicas(ctx, r.store, desc) + desc, err = r.maybeLeaveAtomicChangeReplicas(ctx, desc) if err != nil { - return roachpb.AdminSplitResponse{}, err + return reply, err } - var reply roachpb.AdminSplitResponse - // Determine split key if not provided with args. This scan is // allowed to be relatively slow because admin commands don't block // other commands. @@ -1007,7 +1008,7 @@ func (r *Replica) changeReplicasImpl( // If in a joint config, clean up. The assumption here is that the caller // of ChangeReplicas didn't even realize that they were holding on to a // joint descriptor and would rather not have to deal with that fact. - desc, err = maybeLeaveAtomicChangeReplicas(ctx, r.store, desc) + desc, err = r.maybeLeaveAtomicChangeReplicas(ctx, desc) if err != nil { return nil, err } @@ -1057,6 +1058,21 @@ func (r *Replica) changeReplicasImpl( } } + // Before we initialize learners, check that we're not removing the leaseholder. + // Or if we are, ensure that leaseholder is collocated with the Raft leader. + // A leaseholder that isn't the Raft leader doesn't know whether other replicas + // are sufficiently up-to-date (have the latest snapshot), and so choosing a + // target for lease transfer is riskier as it may result in temporary unavailability. + for _, target := range targets.voterRemovals { + if r.NodeID() == target.NodeID && r.StoreID() == target.StoreID { + raftStatus := r.RaftStatus() + if raftStatus == nil || len(raftStatus.Progress) == 0 { + log.VErrEventf(ctx, 5, "%v", errLeaseholderNotRaftLeader) + return nil, errLeaseholderNotRaftLeader + } + } + } + if adds := targets.voterAdditions; len(adds) > 0 { // For all newly added voters, first add LEARNER replicas. They accept raft // traffic (so they can catch up) but don't get to vote (so they don't @@ -1083,9 +1099,8 @@ func (r *Replica) changeReplicasImpl( ) if err != nil { // If the error occurred while transitioning out of an atomic replication - // change, try again here with a fresh descriptor; this is a noop - // otherwise. - if _, err := maybeLeaveAtomicChangeReplicas(ctx, r.store, r.Desc()); err != nil { + // change, try again here with a fresh descriptor; this is a noop otherwise. + if _, err := r.maybeLeaveAtomicChangeReplicas(ctx, r.Desc()); err != nil { return nil, err } if fn := r.store.cfg.TestingKnobs.ReplicaAddSkipLearnerRollback; fn != nil && fn() { @@ -1144,7 +1159,7 @@ func (r *Replica) changeReplicasImpl( // If we demoted or swapped any voters with non-voters, we likely are in a // joint config or have learners on the range. Let's exit the joint config // and remove the learners. - return maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, r.store, desc) + return r.maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, desc) } return desc, nil } @@ -1182,14 +1197,80 @@ func synthesizeTargetsByChangeType( return result } +// maybeTransferLeaseDuringLeaveJoint checks whether the leaseholder is being +// removed and if so looks for a suitable transfer target for the lease and +// attempts to transfer the lease to that target. If a target isn't found +// or lease transfer fails, an error is returned. +func (r *Replica) maybeTransferLeaseDuringLeaveJoint( + ctx context.Context, desc *roachpb.RangeDescriptor, +) error { + voters := desc.Replicas().VoterDescriptors() + // Determine whether the current leaseholder is being removed. voters includes + // the set of full or incoming voters that will remain after the joint configuration is + // complete. If we don't find the current leaseholder there this means it's being removed, + // and we're going to transfer the lease to another voter below, before exiting the JOINT config. + beingRemoved := true + for _, v := range voters { + if v.ReplicaID == r.ReplicaID() { + beingRemoved = false + break + } + } + if !beingRemoved { + return nil + } + // TransferLeaseTarget looks for a suitable target for lease transfer. + // Replicas are filtered from considerations based on the arguments passed, + // as well as various indicators. One such filtering is a requirement that a + // target replica has applied a snapshot. We exclude VOTER_INCOMING replicas + // from this check, since they only move to this state after applying a + // snapshot. Another filtering is based on the lieveness status of nodes + // We do not transfer the lease to nodes in draining or unknown state. + // Unknown is a temporary state, and is usually resolved after receiving a + // gossip message. But we do not know whether a particular node is alive, + // and would rather stay in the JOINT config than transferring the lease + // to a dead node. If no candidates are found, we will remain in a JOINT + // config, and rely on upper layers to retry exiting from the config. + target := r.store.allocator.TransferLeaseTarget( + ctx, + r.SpanConfig(), + voters, + r, + r.leaseholderStats, + true, /* forceDecisionWithoutStats */ + transferLeaseOptions{ + goal: followTheWorkload, + checkTransferLeaseSource: false, + checkCandidateFullness: false, + dryRun: false, + }, + ) + if target == (roachpb.ReplicaDescriptor{}) { + err := errors.Errorf( + "could not find a better lease transfer target for r%d", desc.RangeID) + log.VErrEventf(ctx, 5, "%v", err) + // Couldn't find a target. Returning nil means we're not exiting the JOINT config, and the + // caller will retry. Note that the JOINT config isn't rolled back. + return err + } + log.VEventf(ctx, 5, "current leaseholder %v is being removed through an"+ + " atomic replication change. Transferring lease to %v", r.String(), target) + err := r.store.DB().AdminTransferLease(ctx, r.startKey, target.StoreID) + if err != nil { + return err + } + log.VEventf(ctx, 5, "leaseholder transfer to %v complete", target) + return nil +} + // maybeLeaveAtomicChangeReplicas transitions out of the joint configuration if // the descriptor indicates one. This involves running a distributed transaction // updating said descriptor, the result of which will be returned. The // descriptor returned from this method will contain replicas of type LEARNER, // NON_VOTER, and VOTER_FULL only. -func maybeLeaveAtomicChangeReplicas( - ctx context.Context, s *Store, desc *roachpb.RangeDescriptor, -) (*roachpb.RangeDescriptor, error) { +func (r *Replica) maybeLeaveAtomicChangeReplicas( + ctx context.Context, desc *roachpb.RangeDescriptor, +) (rangeDesc *roachpb.RangeDescriptor, err error) { // We want execChangeReplicasTxn to be able to make sure it's only tasked // with leaving a joint state when it's in one, so make sure we don't call // it if we're not. @@ -1199,10 +1280,21 @@ func maybeLeaveAtomicChangeReplicas( // NB: this is matched on in TestMergeQueueSeesLearner. log.Eventf(ctx, "transitioning out of joint configuration %s", desc) + // If the leaseholder is being demoted, leaving the joint config is only + // possible if we first transfer the lease. A range not being able to exit + // the JOINT config will wedge splits, merges, and all rebalancing on the + // range including load-based rebalancing. We currently prefer to stay in + // the JOINT config than to transfer the lease to a suspected node. Exiting + // the JOINT config is retried by upper layers. + if err := r.maybeTransferLeaseDuringLeaveJoint(ctx, desc); err != nil { + return desc, err + } + // NB: reason and detail won't be used because no range log event will be // emitted. // // TODO(tbg): reconsider this. + s := r.store return execChangeReplicasTxn( ctx, desc, kvserverpb.ReasonUnknown /* unused */, "", nil, /* iChgs */ changeReplicasTxnArgs{ @@ -1217,16 +1309,16 @@ func maybeLeaveAtomicChangeReplicas( // maybeLeaveAtomicChangeReplicasAndRemoveLearners transitions out of the joint // config (if there is one), and then removes all learners. After this function // returns, all remaining replicas will be of type VOTER_FULL or NON_VOTER. -func maybeLeaveAtomicChangeReplicasAndRemoveLearners( - ctx context.Context, store *Store, desc *roachpb.RangeDescriptor, -) (*roachpb.RangeDescriptor, error) { - desc, err := maybeLeaveAtomicChangeReplicas(ctx, store, desc) +func (r *Replica) maybeLeaveAtomicChangeReplicasAndRemoveLearners( + ctx context.Context, desc *roachpb.RangeDescriptor, +) (rangeDesc *roachpb.RangeDescriptor, err error) { + desc, err = r.maybeLeaveAtomicChangeReplicas(ctx, desc) if err != nil { return nil, err } + // Now the config isn't joint any more, but we may have demoted some voters // into learners. These learners should go as well. - learners := desc.Replicas().LearnerDescriptors() if len(learners) == 0 { return desc, nil @@ -1242,6 +1334,7 @@ func maybeLeaveAtomicChangeReplicasAndRemoveLearners( // // https://github.com/cockroachdb/cockroach/pull/40268 origDesc := desc + store := r.store for _, target := range targets { var err error desc, err = execChangeReplicasTxn( @@ -1755,7 +1848,7 @@ func (r *Replica) execReplicationChangesForVoters( reason kvserverpb.RangeLogEventReason, details string, voterAdditions, voterRemovals []roachpb.ReplicationTarget, -) (*roachpb.RangeDescriptor, error) { +) (rangeDesc *roachpb.RangeDescriptor, err error) { // TODO(dan): We allow ranges with learner replicas to split, so in theory // this may want to detect that and retry, sending a snapshot and promoting // both sides. @@ -1773,7 +1866,6 @@ func (r *Replica) execReplicationChangesForVoters( iChgs = append(iChgs, internalReplicationChange{target: target, typ: typ}) } - var err error desc, err = execChangeReplicasTxn(ctx, desc, reason, details, iChgs, changeReplicasTxnArgs{ db: r.store.DB(), liveAndDeadReplicas: r.store.allocator.storePool.liveAndDeadReplicas, @@ -1791,7 +1883,7 @@ func (r *Replica) execReplicationChangesForVoters( // Leave the joint config if we entered one. Also, remove any learners we // might have picked up due to removal-via-demotion. - return maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, r.store, desc) + return r.maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, desc) } // tryRollbackRaftLearner attempts to remove a learner specified by the target. @@ -2676,7 +2768,7 @@ func updateRangeDescriptor( // // This is best-effort; it's possible that the replicate queue on the // leaseholder could take action at the same time, causing errors. -func (s *Store) AdminRelocateRange( +func (r *Replica) AdminRelocateRange( ctx context.Context, rangeDesc roachpb.RangeDescriptor, voterTargets, nonVoterTargets []roachpb.ReplicationTarget, @@ -2703,14 +2795,15 @@ func (s *Store) AdminRelocateRange( // Remove learners so we don't have to think about relocating them, and leave // the joint config if we're in one. - newDesc, err := maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, s, &rangeDesc) + newDesc, err := + r.maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, &rangeDesc) if err != nil { log.Warningf(ctx, "%v", err) return err } rangeDesc = *newDesc - rangeDesc, err = s.relocateReplicas( + rangeDesc, err = r.relocateReplicas( ctx, rangeDesc, voterTargets, nonVoterTargets, transferLeaseToFirstVoter, ) if err != nil { @@ -2740,7 +2833,7 @@ func (s *Store) AdminRelocateRange( // // Transient errors returned from relocateOne are retried until things // work out. -func (s *Store) relocateReplicas( +func (r *Replica) relocateReplicas( ctx context.Context, rangeDesc roachpb.RangeDescriptor, voterTargets, nonVoterTargets []roachpb.ReplicationTarget, @@ -2753,11 +2846,11 @@ func (s *Store) relocateReplicas( // the leaseholder so we'll fail there later if this fails), so it // seems like a good idea to return any errors here to the caller (or // to retry some errors appropriately). - if err := s.DB().AdminTransferLease( + if err := r.store.DB().AdminTransferLease( ctx, startKey, target.StoreID, ); err != nil { log.Warningf(ctx, "while transferring lease: %+v", err) - if s.TestingKnobs().DontIgnoreFailureToTransferLease { + if r.store.TestingKnobs().DontIgnoreFailureToTransferLease { return err } } @@ -2771,31 +2864,43 @@ func (s *Store) relocateReplicas( return rangeDesc, err } - ops, leaseTarget, err := s.relocateOne( + ops, leaseTarget, err := r.relocateOne( ctx, &rangeDesc, voterTargets, nonVoterTargets, transferLeaseToFirstVoter, ) if err != nil { return rangeDesc, err } - if leaseTarget != nil { - // NB: we may need to transfer even if there are no ops, to make - // sure the attempt is made to make the first target the final - // leaseholder. - if err := transferLease(*leaseTarget); err != nil { - return rangeDesc, err + if !r.store.cfg.Settings.Version.IsActive(ctx, + clusterversion.EnableLeaseHolderRemoval) { + if leaseTarget != nil { + // NB: we may need to transfer even if there are no ops, to make + // sure the attempt is made to make the first target the final + // leaseholder. + if err := transferLease(*leaseTarget); err != nil { + return rangeDesc, err + } + } + if len(ops) == 0 { + // Done + return rangeDesc, ctx.Err() + } + } else if len(ops) == 0 { + if len(voterTargets) > 0 && transferLeaseToFirstVoter { + // NB: we may need to transfer even if there are no ops, to make + // sure the attempt is made to make the first target the final + // leaseholder. + if err := transferLease(voterTargets[0]); err != nil { + return rangeDesc, err + } } - } - - if len(ops) == 0 { - // Done. return rangeDesc, ctx.Err() } opss := [][]roachpb.ReplicationChange{ops} success := true for _, ops := range opss { - newDesc, err := s.DB().AdminChangeReplicas(ctx, startKey, rangeDesc, ops) + newDesc, err := r.store.DB().AdminChangeReplicas(ctx, startKey, rangeDesc, ops) if err != nil { returnErr := errors.Wrapf(err, "while carrying out changes %v", ops) if !isSnapshotError(err) { @@ -2810,8 +2915,8 @@ func (s *Store) relocateReplicas( rangeDesc = *newDesc } if success { - if fn := s.cfg.TestingKnobs.OnRelocatedOne; fn != nil { - fn(ops, leaseTarget) + if fn := r.store.cfg.TestingKnobs.OnRelocatedOne; fn != nil { + fn(ops, &voterTargets[0]) } break @@ -2860,7 +2965,7 @@ func (r *relocationArgs) finalRelocationTargets() []roachpb.ReplicationTarget { } } -func (s *Store) relocateOne( +func (r *Replica) relocateOne( ctx context.Context, desc *roachpb.RangeDescriptor, voterTargets, nonVoterTargets []roachpb.ReplicationTarget, @@ -2873,7 +2978,7 @@ func (s *Store) relocateOne( `range %s was either in a joint configuration or had learner replicas: %v`, desc, desc.Replicas()) } - confReader, err := s.GetConfReader(ctx) + confReader, err := r.store.GetConfReader(ctx) if err != nil { return nil, nil, errors.Wrap(err, "can't relocate range") } @@ -2882,7 +2987,7 @@ func (s *Store) relocateOne( return nil, nil, err } - storeList, _, _ := s.allocator.storePool.getStoreList(storeFilterNone) + storeList, _, _ := r.store.allocator.storePool.getStoreList(storeFilterNone) storeMap := storeListToMap(storeList) // Compute which replica to add and/or remove, respectively. We then ask the @@ -2923,13 +3028,13 @@ func (s *Store) relocateOne( } candidateStoreList := makeStoreList(candidateDescs) - additionTarget, _ = s.allocator.allocateTargetFromList( + additionTarget, _ = r.store.allocator.allocateTargetFromList( ctx, candidateStoreList, conf, existingVoters, existingNonVoters, - s.allocator.scorerOptions(), + r.store.allocator.scorerOptions(), // NB: Allow the allocator to return target stores that might be on the // same node as an existing replica. This is to ensure that relocations // that require "lateral" movement of replicas within a node can succeed. @@ -2991,14 +3096,14 @@ func (s *Store) relocateOne( // (s1,s2,s3,s4) which is a reasonable request; that replica set is // overreplicated. If we asked it instead to remove s3 from (s1,s2,s3) it // may not want to do that due to constraints. - targetStore, _, err := s.allocator.removeTarget( + targetStore, _, err := r.store.allocator.removeTarget( ctx, conf, - s.allocator.storeListForTargets(args.targetsToRemove()), + r.store.allocator.storeListForTargets(args.targetsToRemove()), existingVoters, existingNonVoters, args.targetType, - s.allocator.scorerOptions(), + r.store.allocator.scorerOptions(), ) if err != nil { return nil, nil, errors.Wrapf( @@ -3010,20 +3115,21 @@ func (s *Store) relocateOne( NodeID: targetStore.NodeID, StoreID: targetStore.StoreID, } - // We can't remove the leaseholder, which really throws a wrench into - // atomic replication changes. If we find that we're trying to do just - // that, we need to first move the lease elsewhere. This is not possible - // if there is no other replica available at that point, i.e. if the - // existing descriptor is a single replica that's being replaced. + // Prior to 22.1 we can't remove the leaseholder. If we find that we're + // trying to do just that, we need to first move the lease elsewhere. + // This is not possible if there is no other replica available at that + // point, i.e. if the existing descriptor is a single replica that's + // being replaced. var b kv.Batch liReq := &roachpb.LeaseInfoRequest{} liReq.Key = desc.StartKey.AsRawKey() b.AddRawRequest(liReq) - if err := s.DB().Run(ctx, &b); err != nil { + if err := r.store.DB().Run(ctx, &b); err != nil { return nil, nil, errors.Wrap(err, "looking up lease") } curLeaseholder := b.RawResponse().Responses[0].GetLeaseInfo().Lease.Replica - shouldRemove = curLeaseholder.StoreID != removalTarget.StoreID + shouldRemove = (curLeaseholder.StoreID != removalTarget.StoreID) || + r.store.cfg.Settings.Version.IsActive(ctx, clusterversion.EnableLeaseHolderRemoval) if args.targetType == voterTarget { // If the voter being removed is about to be added as a non-voter, then we // can just demote it. diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 83c5e8dbca2a..9599fae0101a 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -1195,7 +1195,7 @@ func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) { require.False(t, desc.Replicas().InAtomicReplicationChange(), desc) // Repeat the game, except now we start with two replicas and we're - // giving the RHS a VOTER_OUTGOING. + // giving the RHS a VOTER_DEMOTING_LEARNER. desc = splitAndUnsplit() ltk.withStopAfterJointConfig(func() { descRight := tc.RemoveVotersOrFatal(t, desc.EndKey.AsRawKey(), tc.Target(1)) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 2bb9d5c2e6f5..4d9271f2d90f 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -17,6 +17,7 @@ import ( "strings" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" @@ -332,28 +333,52 @@ func (r *Replica) propose( log.Infof(p.ctx, "proposing %s", crt) prefix = false - // Ensure that we aren't trying to remove ourselves from the range without - // having previously given up our lease, since the range won't be able - // to make progress while the lease is owned by a removed replica (and - // leases can stay in such a state for a very long time when using epoch- - // based range leases). This shouldn't happen often, but has been seen - // before (#12591). + // The following deals with removing a leaseholder. A voter can be removed + // in two ways. 1) Simple (old style) where there is a reconfiguration + // turning a voter into a LEARNER / NON-VOTER. 2) Through an intermediate + // joint configuration, where the replica remains in the descriptor, but + // as VOTER_{OUTGOING, DEMOTING}. When leaving the JOINT config (a second + // Raft operation), the removed replica transitions a LEARNER / NON-VOTER. // - // Note that due to atomic replication changes, when a removal is initiated, - // the replica remains in the descriptor, but as VOTER_{OUTGOING,DEMOTING}. - // We want to block it from getting into that state in the first place, - // since there's no stopping the actual removal/demotion once it's there. - // IsVoterNewConfig checks that the leaseholder is a voter in the - // proposed configuration. + // In case (1) the lease needs to be transferred out before a removal is + // proposed (cooperative transfer). The code below permits leaseholder + // removal only if entering a joint configuration (option 2 above) in which + // the leaseholder is (any kind of) voter. In this case, the lease is + // transferred to a different voter (potentially incoming) in + // maybeLeaveAtomicChangeReplicas right before we exit the joint + // configuration. + // + // When the leaseholder is replaced by a new replica, transferring the + // lease in the joint config allows transferring directly from old to new, + // since both are active in the joint config, without going through a third + // node or adding the new node before transferring, which might reduce + // fault tolerance. For example, consider v1 in region1 (leaseholder), v2 + // in region2 and v3 in region3. We want to relocate v1 to a new node v4 in + // region1. We add v4 as LEARNER. At this point we can't transfer the lease + // to v4, so we could transfer it to v2 first, but this is likely to hurt + // application performance. We could instead add v4 as VOTER first, and + // then transfer lease directly to v4, but this would change the number of + // replicas to 4, and if region1 goes down, we loose a quorum. Instead, + // we move to a joint config where v1 (VOTER_DEMOTING_LEARNER) transfer the + // lease to v4 (VOTER_INCOMING) directly. + // See also https://github.com/cockroachdb/cockroach/issues/67740. replID := r.ReplicaID() rDesc, ok := p.command.ReplicatedEvalResult.State.Desc.GetReplicaDescriptorByID(replID) - for !ok || !rDesc.IsVoterNewConfig() { - if rDesc.ReplicaID == replID { - err := errors.Mark(errors.Newf("received invalid ChangeReplicasTrigger %s to remove self (leaseholder)", crt), - errMarkInvalidReplicationChange) - log.Errorf(p.ctx, "%v", err) - return roachpb.NewError(err) - } + lhRemovalAllowed := r.store.cfg.Settings.Version.IsActive(ctx, + clusterversion.EnableLeaseHolderRemoval) + // Previously, we were not allowed to enter a joint config where the + // leaseholder is being removed (i.e., not a voter). In the new version + // we're allowed to enter such a joint config, but not to exit it in this + // state, i.e., the leaseholder must be some kind of voter in the next + // new config (potentially VOTER_DEMOTING). + if !ok || + (lhRemovalAllowed && !rDesc.IsAnyVoter()) || + (!lhRemovalAllowed && !rDesc.IsVoterNewConfig()) { + err := errors.Mark(errors.Newf("received invalid ChangeReplicasTrigger %s to remove self ("+ + "leaseholder); lhRemovalAllowed: %v", crt, lhRemovalAllowed), + errMarkInvalidReplicationChange) + log.Errorf(p.ctx, "%v", err) + return roachpb.NewError(err) } } else if p.command.ReplicatedEvalResult.AddSSTable != nil { log.VEvent(p.ctx, 4, "sideloadable proposal detected") diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index b094e78db398..537a641bd3a8 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -932,7 +932,7 @@ func (r *Replica) executeAdminBatch( transferLeaseToFirstVoter := !tArgs.TransferLeaseToFirstVoterAccurate // We also revert to that behavior if the caller specifically asked for it. transferLeaseToFirstVoter = transferLeaseToFirstVoter || tArgs.TransferLeaseToFirstVoter - err := r.store.AdminRelocateRange( + err := r.AdminRelocateRange( ctx, *r.Desc(), tArgs.VoterTargets, tArgs.NonVoterTargets, transferLeaseToFirstVoter, ) pErr = roachpb.NewError(err) diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 01ff8999c118..e892c0274f64 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -18,6 +18,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" @@ -494,7 +495,8 @@ func (rq *replicateQueue) processOneChange( dryRun, ) case AllocatorFinalizeAtomicReplicationChange: - _, err := maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, repl.store, repl.Desc()) + _, err := + repl.maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, repl.Desc()) // Requeue because either we failed to transition out of a joint state // (bad) or we did and there might be more to do for that range. return true, err @@ -549,16 +551,19 @@ func (rq *replicateQueue) addOrReplaceVoters( break } } - // See about transferring the lease away if we're about to remove the - // leaseholder. - done, err := rq.maybeTransferLeaseAway( - ctx, repl, existingVoters[removeIdx].StoreID, dryRun, nil /* canTransferLeaseFrom */) - if err != nil { - return false, err - } - if done { - // Lease was transferred away. Next leaseholder is going to take over. - return false, nil + if !repl.store.cfg.Settings.Version.IsActive(ctx, + clusterversion.EnableLeaseHolderRemoval) { + // See about transferring the lease away if we're about to remove the + // leaseholder. + done, err := rq.maybeTransferLeaseAway( + ctx, repl, existingVoters[removeIdx].StoreID, dryRun, nil /* canTransferLeaseFrom */) + if err != nil { + return false, err + } + if done { + // Lease was transferred away. Next leaseholder is going to take over. + return false, nil + } } } @@ -1114,14 +1119,19 @@ func (rq *replicateQueue) considerRebalance( if !ok { log.VEventf(ctx, 1, "no suitable rebalance target for non-voters") - } else if done, err := rq.maybeTransferLeaseAway( - ctx, repl, removeTarget.StoreID, dryRun, canTransferLeaseFrom, - ); err != nil { - log.VEventf(ctx, 1, "want to remove self, but failed to transfer lease away: %s", err) - } else if done { - // Lease is now elsewhere, so we're not in charge any more. - return false, nil - } else { + } else if !repl.store.cfg.Settings.Version.IsActive(ctx, + clusterversion.EnableLeaseHolderRemoval) { + if done, err := rq.maybeTransferLeaseAway( + ctx, repl, removeTarget.StoreID, dryRun, canTransferLeaseFrom, + ); err != nil { + log.VEventf(ctx, 1, "want to remove self, but failed to transfer lease away: %s", err) + ok = false + } else if done { + // Lease is now elsewhere, so we're not in charge any more. + return false, nil + } + } + if ok { // If we have a valid rebalance action (ok == true) and we haven't // transferred our lease away, execute the rebalance. chgs, performingSwap, err := replicationChangesForRebalance(ctx, desc, len(existingVoters), addTarget, diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index 5276d67b1f5e..830561a775d8 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -485,6 +485,18 @@ func (r ReplicaDescriptor) IsVoterNewConfig() bool { } } +// IsAnyVoter returns true if the replica is a voter in the previous +// config (pre-reconfiguration) or the incoming config. Can be used as a filter +// for ReplicaDescriptors.Filter(ReplicaDescriptor.IsVoterOldConfig). +func (r ReplicaDescriptor) IsAnyVoter() bool { + switch r.GetType() { + case VOTER_FULL, VOTER_INCOMING, VOTER_OUTGOING, VOTER_DEMOTING_NON_VOTER, VOTER_DEMOTING_LEARNER: + return true + default: + return false + } +} + // PercentilesFromData derives percentiles from a slice of data points. // Sorts the input data if it isn't already sorted. func PercentilesFromData(data []float64) Percentiles { diff --git a/pkg/roachpb/metadata_replicas.go b/pkg/roachpb/metadata_replicas.go index 727ae08a20b5..760bb4f1d566 100644 --- a/pkg/roachpb/metadata_replicas.go +++ b/pkg/roachpb/metadata_replicas.go @@ -530,15 +530,6 @@ func CheckCanReceiveLease(wouldbeLeaseholder ReplicaDescriptor, rngDesc *RangeDe if !ok { return errReplicaNotFound } else if !repDesc.IsVoterNewConfig() { - // NB: there's no harm in transferring the lease to a VOTER_INCOMING. - // On the other hand, transferring to VOTER_OUTGOING would be a pretty bad - // idea since those voters are dropped when transitioning out of the joint - // config, which then amounts to removing the leaseholder without any - // safety precautions. This would either wedge the range or allow illegal - // reads to be served. - // - // Since the leaseholder can't remove itself and is a VOTER_FULL, we - // also know that in any configuration there's at least one VOTER_FULL. return errReplicaCannotHoldLease } return nil diff --git a/pkg/scheduledjobs/BUILD.bazel b/pkg/scheduledjobs/BUILD.bazel index 87164e407736..71f3268baea9 100644 --- a/pkg/scheduledjobs/BUILD.bazel +++ b/pkg/scheduledjobs/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//pkg/security", "//pkg/settings/cluster", "//pkg/sql/sqlutil", + "//pkg/util/hlc", "//pkg/util/timeutil", ], ) diff --git a/pkg/scheduledjobs/env.go b/pkg/scheduledjobs/env.go index ef57d0632661..231594721bc5 100644 --- a/pkg/scheduledjobs/env.go +++ b/pkg/scheduledjobs/env.go @@ -11,6 +11,7 @@ package scheduledjobs import ( + "context" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -19,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) @@ -53,6 +55,9 @@ type JobExecutionConfig struct { // function that must be called once the caller is done with the planner. // This is the same mechanism used in jobs.Registry. PlanHookMaker func(opName string, tnx *kv.Txn, user security.SQLUsername) (interface{}, func()) + // ShouldRunScheduler, if set, returns true if the job scheduler should run + // schedules. This callback should be re-checked periodically. + ShouldRunScheduler func(ctx context.Context, ts hlc.ClockTimestamp) (bool, error) } // production JobSchedulerEnv implementation. diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 7281efabdc8f..5fc05fe84a1c 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -151,6 +151,8 @@ type SQLServer struct { systemConfigWatcher *systemconfigwatcher.Cache + isMeta1Leaseholder func(context.Context, hlc.ClockTimestamp) (bool, error) + // pgL is the shared RPC/SQL listener, opened when RPC was initialized. pgL net.Listener // connManager is the connection manager to use to set up additional @@ -1016,6 +1018,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { spanconfigSQLWatcher: spanConfig.sqlWatcher, settingsWatcher: settingsWatcher, systemConfigWatcher: cfg.systemConfigWatcher, + isMeta1Leaseholder: cfg.isMeta1Leaseholder, }, nil } @@ -1232,6 +1235,12 @@ func (s *SQLServer) preStart( sessiondatapb.SessionData{}, ) }, + ShouldRunScheduler: func(ctx context.Context, ts hlc.ClockTimestamp) (bool, error) { + if s.execCfg.Codec.ForSystemTenant() { + return s.isMeta1Leaseholder(ctx, ts) + } + return true, nil + }, }, scheduledjobs.ProdJobSchedulerEnv, ) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 545e3d5327bd..43b1a4f83f39 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -296,6 +296,11 @@ type Server struct { // TelemetryLoggingMetrics is used to track metrics for logging to the telemetry channel. TelemetryLoggingMetrics *TelemetryLoggingMetrics + + mu struct { + syncutil.Mutex + connectionCount int64 + } } // Metrics collects timeseries data about SQL activity. @@ -662,6 +667,39 @@ func (s *Server) SetupConn( return ConnectionHandler{ex}, nil } +// IncrementConnectionCount increases connectionCount by 1. +func (s *Server) IncrementConnectionCount() { + s.mu.Lock() + defer s.mu.Unlock() + s.mu.connectionCount++ +} + +// DecrementConnectionCount decreases connectionCount by 1. +func (s *Server) DecrementConnectionCount() { + s.mu.Lock() + defer s.mu.Unlock() + s.mu.connectionCount-- +} + +// IncrementConnectionCountIfLessThan increases connectionCount by and returns true if allowedConnectionCount < max, +// otherwise it does nothing and returns false. +func (s *Server) IncrementConnectionCountIfLessThan(max int64) bool { + s.mu.Lock() + defer s.mu.Unlock() + lt := s.mu.connectionCount < max + if lt { + s.mu.connectionCount++ + } + return lt +} + +// GetConnectionCount returns the current number of connections. +func (s *Server) GetConnectionCount() int64 { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.connectionCount +} + // ConnectionHandler is the interface between the result of SetupConn // and the ServeConn below. It encapsulates the connExecutor and hides // it away from other packages. diff --git a/pkg/sql/execinfrapb/BUILD.bazel b/pkg/sql/execinfrapb/BUILD.bazel index 4de7ab4c9f86..8eb5433c4b23 100644 --- a/pkg/sql/execinfrapb/BUILD.bazel +++ b/pkg/sql/execinfrapb/BUILD.bazel @@ -83,6 +83,7 @@ go_test( "//pkg/sql/types", "//pkg/util/leaktest", "//pkg/util/optional", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/sql/execinfrapb/flow_diagram.go b/pkg/sql/execinfrapb/flow_diagram.go index b05e466dd930..0d1ad99acdd5 100644 --- a/pkg/sql/execinfrapb/flow_diagram.go +++ b/pkg/sql/execinfrapb/flow_diagram.go @@ -513,6 +513,17 @@ func (post *PostProcessSpec) summary() []string { return res } +// summary implements the diagramCellType interface. +func (c *RestoreDataSpec) summary() (string, []string) { + return "RestoreDataSpec", []string{} +} + +// summary implements the diagramCellType interface. +func (c *SplitAndScatterSpec) summary() (string, []string) { + detail := fmt.Sprintf("%d chunks", len(c.Chunks)) + return "SplitAndScatterSpec", []string{detail} +} + // summary implements the diagramCellType interface. func (c *ReadImportDataSpec) summary() (string, []string) { ss := make([]string, 0, len(c.Uri)) @@ -522,6 +533,21 @@ func (c *ReadImportDataSpec) summary() (string, []string) { return "ReadImportData", ss } +// summary implements the diagramCellType interface. +func (s *StreamIngestionDataSpec) summary() (string, []string) { + return "StreamIngestionData", []string{} +} + +// summary implements the diagramCellType interface. +func (s *StreamIngestionFrontierSpec) summary() (string, []string) { + return "StreamIngestionFrontier", []string{} +} + +// summary implements the diagramCellType interface. +func (s *IndexBackfillMergerSpec) summary() (string, []string) { + return "IndexBackfillMerger", []string{} +} + // summary implements the diagramCellType interface. func (s *ExportSpec) summary() (string, []string) { return "Exporter", []string{s.Destination} diff --git a/pkg/sql/execinfrapb/flow_diagram_test.go b/pkg/sql/execinfrapb/flow_diagram_test.go index 68162133bd3e..31751826f32a 100644 --- a/pkg/sql/execinfrapb/flow_diagram_test.go +++ b/pkg/sql/execinfrapb/flow_diagram_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" ) // compareDiagrams verifies that two JSON strings decode to equal diagramData @@ -404,3 +405,10 @@ func TestPlanDiagramJoin(t *testing.T) { compareDiagrams(t, s, expected) } + +func TestProcessorsImplementDiagramCellType(t *testing.T) { + pcu := reflect.ValueOf(ProcessorCoreUnion{}) + for i := 0; i < pcu.NumField(); i++ { + require.Implements(t, (*diagramCellType)(nil), pcu.Field(i).Interface()) + } +} diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 9fe1368cbb9f..d36def1d9f4c 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -3010,6 +3010,10 @@ opt_with_schedule_options: // RESTORE FROM // [ AS OF SYSTEM TIME ] // [ WITH