Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: push tenant-bound SQL codec into descriptor key generation #48376

Merged
merged 2 commits into from
May 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,7 @@ func TestBackupRestoreResume(t *testing.T) {
_, tc, outerDB, dir, cleanupFn := backupRestoreTestSetup(t, multiNode, numAccounts, initNone)
defer cleanupFn()

backupTableDesc := sqlbase.GetTableDescriptor(tc.Servers[0].DB(), "data", "bank")
backupTableDesc := sqlbase.GetTableDescriptor(tc.Servers[0].DB(), keys.SystemSQLCodec, "data", "bank")

t.Run("backup", func(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(outerDB.DB)
Expand Down
34 changes: 17 additions & 17 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,14 +448,14 @@ func WriteTableDescs(
desc.Privileges = sqlbase.NewDefaultPrivilegeDescriptor()
}
wroteDBs[desc.ID] = desc
if err := sql.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, desc.ID, desc); err != nil {
if err := sql.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, keys.SystemSQLCodec, desc.ID, desc); err != nil {
return err
}
// Depending on which cluster version we are restoring to, we decide which
// namespace table to write the descriptor into. This may cause wrong
// behavior if the cluster version is bumped DURING a restore.
dKey := sqlbase.MakeDatabaseNameKey(ctx, settings, desc.Name)
b.CPut(dKey.Key(), desc.ID, nil)
b.CPut(dKey.Key(keys.SystemSQLCodec), desc.ID, nil)
}
for i := range tables {
// For full cluster restore, keep privileges as they were.
Expand All @@ -466,7 +466,7 @@ func WriteTableDescs(
tables[i].Privileges = wrote.GetPrivileges()
}
} else {
parentDB, err := sqlbase.GetDatabaseDescFromID(ctx, txn, tables[i].ParentID)
parentDB, err := sqlbase.GetDatabaseDescFromID(ctx, txn, keys.SystemSQLCodec, tables[i].ParentID)
if err != nil {
return errors.Wrapf(err,
"failed to lookup parent DB %d", errors.Safe(tables[i].ParentID))
Expand All @@ -480,14 +480,14 @@ func WriteTableDescs(
tables[i].Privileges = parentDB.GetPrivileges()
}
}
if err := sql.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, tables[i].ID, tables[i]); err != nil {
if err := sql.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, keys.SystemSQLCodec, tables[i].ID, tables[i]); err != nil {
return err
}
// Depending on which cluster version we are restoring to, we decide which
// namespace table to write the descriptor into. This may cause wrong
// behavior if the cluster version is bumped DURING a restore.
tkey := sqlbase.MakePublicTableNameKey(ctx, settings, tables[i].ParentID, tables[i].Name)
b.CPut(tkey.Key(), tables[i].ID, nil)
b.CPut(tkey.Key(keys.SystemSQLCodec), tables[i].ID, nil)
}
for _, kv := range extra {
b.InitPut(kv.Key, &kv.Value, false)
Expand All @@ -500,7 +500,7 @@ func WriteTableDescs(
}

for _, table := range tables {
if err := table.Validate(ctx, txn); err != nil {
if err := table.Validate(ctx, txn, keys.SystemSQLCodec); err != nil {
return errors.Wrapf(err,
"validate table %d", errors.Safe(table.ID))
}
Expand Down Expand Up @@ -757,11 +757,11 @@ func restore(
// returned an error prior to this.
func loadBackupSQLDescs(
ctx context.Context,
p sql.PlanHookState,
details jobspb.RestoreDetails,
makeExternalStorageFromURI cloud.ExternalStorageFromURIFactory,
encryption *roachpb.FileEncryptionOptions,
) ([]BackupManifest, BackupManifest, []sqlbase.Descriptor, error) {
backupManifests, err := loadBackupManifests(ctx, details.URIs, makeExternalStorageFromURI, encryption)
backupManifests, err := loadBackupManifests(ctx, details.URIs, p.ExecCfg().DistSQLSrv.ExternalStorageFromURI, encryption)
if err != nil {
return nil, BackupManifest{}, nil, err
}
Expand All @@ -770,7 +770,7 @@ func loadBackupSQLDescs(
// TODO(lucy, jordan): This should become unnecessary in 20.1 when we stop
// writing old-style descs in RestoreDetails (unless a job persists across
// an upgrade?).
if err := maybeUpgradeTableDescsInBackupManifests(ctx, backupManifests, true /* skipFKsWithNoMatchingTable */); err != nil {
if err := maybeUpgradeTableDescsInBackupManifests(ctx, backupManifests, p.ExecCfg().Codec, true /* skipFKsWithNoMatchingTable */); err != nil {
return nil, BackupManifest{}, nil, err
}

Expand Down Expand Up @@ -951,7 +951,7 @@ func (r *restoreResumer) Resume(
p := phs.(sql.PlanHookState)

backupManifests, latestBackupManifest, sqlDescs, err := loadBackupSQLDescs(
ctx, details, p.ExecCfg().DistSQLSrv.ExternalStorageFromURI, details.Encryption,
ctx, p, details, details.Encryption,
)
if err != nil {
return err
Expand Down Expand Up @@ -1083,12 +1083,12 @@ func (r *restoreResumer) publishTables(ctx context.Context) error {
tableDesc := *tbl
tableDesc.Version++
tableDesc.State = sqlbase.TableDescriptor_PUBLIC
existingDescVal, err := sqlbase.ConditionalGetTableDescFromTxn(ctx, txn, tbl)
existingDescVal, err := sqlbase.ConditionalGetTableDescFromTxn(ctx, txn, r.execCfg.Codec, tbl)
if err != nil {
return errors.Wrap(err, "validating table descriptor has not changed")
}
b.CPut(
sqlbase.MakeDescMetadataKey(tableDesc.ID),
sqlbase.MakeDescMetadataKey(keys.SystemSQLCodec, tableDesc.ID),
sqlbase.WrapDescriptor(&tableDesc),
existingDescVal,
)
Expand Down Expand Up @@ -1158,16 +1158,16 @@ func (r *restoreResumer) dropTables(ctx context.Context, jr *jobs.Registry, txn
tableDesc := *tbl
tableDesc.Version++
tableDesc.State = sqlbase.TableDescriptor_DROP
err := sqlbase.RemovePublicTableNamespaceEntry(ctx, txn, tbl.ParentID, tbl.Name)
err := sqlbase.RemovePublicTableNamespaceEntry(ctx, txn, keys.SystemSQLCodec, tbl.ParentID, tbl.Name)
if err != nil {
return errors.Wrap(err, "dropping tables caused by restore fail/cancel from public namespace")
}
existingDescVal, err := sqlbase.ConditionalGetTableDescFromTxn(ctx, txn, tbl)
existingDescVal, err := sqlbase.ConditionalGetTableDescFromTxn(ctx, txn, r.execCfg.Codec, tbl)
if err != nil {
return errors.Wrap(err, "dropping tables caused by restore fail/cancel")
}
b.CPut(
sqlbase.MakeDescMetadataKey(tableDesc.ID),
sqlbase.MakeDescMetadataKey(keys.SystemSQLCodec, tableDesc.ID),
sqlbase.WrapDescriptor(&tableDesc),
existingDescVal,
)
Expand Down Expand Up @@ -1213,9 +1213,9 @@ func (r *restoreResumer) dropTables(ctx context.Context, jr *jobs.Registry, txn
}

if isDBEmpty {
descKey := sqlbase.MakeDescMetadataKey(dbDesc.ID)
descKey := sqlbase.MakeDescMetadataKey(keys.SystemSQLCodec, dbDesc.ID)
b.Del(descKey)
b.Del(sqlbase.NewDatabaseKey(dbDesc.Name).Key())
b.Del(sqlbase.NewDatabaseKey(dbDesc.Name).Key(keys.SystemSQLCodec))
}
}
if err := txn.Run(ctx, b); err != nil {
Expand Down
25 changes: 14 additions & 11 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func allocateTableRewrites(
maxExpectedDB := keys.MinUserDescID + sql.MaxDefaultDescriptorID
// Check that any DBs being restored do _not_ exist.
for name := range restoreDBNames {
found, foundID, err := sqlbase.LookupDatabaseID(ctx, txn, name)
found, foundID, err := sqlbase.LookupDatabaseID(ctx, txn, p.ExecCfg().Codec, name)
if err != nil {
return err
}
Expand All @@ -242,11 +242,11 @@ func allocateTableRewrites(
} else if descriptorCoverage == tree.AllDescriptors && table.ParentID < sql.MaxDefaultDescriptorID {
// This is a table that is in a database that already existed at
// cluster creation time.
defaultDBID, err := lookupDatabaseID(ctx, txn, sessiondata.DefaultDatabaseName)
defaultDBID, err := lookupDatabaseID(ctx, txn, p.ExecCfg().Codec, sessiondata.DefaultDatabaseName)
if err != nil {
return err
}
postgresDBID, err := lookupDatabaseID(ctx, txn, sessiondata.PgDatabaseName)
postgresDBID, err := lookupDatabaseID(ctx, txn, p.ExecCfg().Codec, sessiondata.PgDatabaseName)
if err != nil {
return err
}
Expand Down Expand Up @@ -281,7 +281,7 @@ func allocateTableRewrites(
} else {
var parentID sqlbase.ID
{
found, newParentID, err := sqlbase.LookupDatabaseID(ctx, txn, targetDB)
found, newParentID, err := sqlbase.LookupDatabaseID(ctx, txn, p.ExecCfg().Codec, targetDB)
if err != nil {
return err
}
Expand All @@ -293,13 +293,13 @@ func allocateTableRewrites(
}
// Check that the table name is _not_ in use.
// This would fail the CPut later anyway, but this yields a prettier error.
if err := CheckTableExists(ctx, txn, parentID, table.Name); err != nil {
if err := CheckTableExists(ctx, txn, p.ExecCfg().Codec, parentID, table.Name); err != nil {
return err
}

// Check privileges.
{
parentDB, err := sqlbase.GetDatabaseDescFromID(ctx, txn, parentID)
parentDB, err := sqlbase.GetDatabaseDescFromID(ctx, txn, p.ExecCfg().Codec, parentID)
if err != nil {
return errors.Wrapf(err,
"failed to lookup parent DB %d", errors.Safe(parentID))
Expand Down Expand Up @@ -390,7 +390,10 @@ func allocateTableRewrites(
// "other" table is missing from the set provided are omitted during the
// upgrade, instead of causing an error to be returned.
func maybeUpgradeTableDescsInBackupManifests(
ctx context.Context, backupManifests []BackupManifest, skipFKsWithNoMatchingTable bool,
ctx context.Context,
backupManifests []BackupManifest,
codec keys.SQLCodec,
skipFKsWithNoMatchingTable bool,
) error {
protoGetter := sqlbase.MapProtoGetter{
Protos: make(map[interface{}]protoutil.Message),
Expand All @@ -400,7 +403,7 @@ func maybeUpgradeTableDescsInBackupManifests(
for _, backupManifest := range backupManifests {
for _, desc := range backupManifest.Descriptors {
if table := desc.Table(hlc.Timestamp{}); table != nil {
protoGetter.Protos[string(sqlbase.MakeDescMetadataKey(table.ID))] =
protoGetter.Protos[string(sqlbase.MakeDescMetadataKey(codec, table.ID))] =
sqlbase.WrapDescriptor(protoutil.Clone(table).(*sqlbase.TableDescriptor))
}
}
Expand All @@ -410,7 +413,7 @@ func maybeUpgradeTableDescsInBackupManifests(
backupManifest := &backupManifests[i]
for j := range backupManifest.Descriptors {
if table := backupManifest.Descriptors[j].Table(hlc.Timestamp{}); table != nil {
if _, err := table.MaybeUpgradeForeignKeyRepresentation(ctx, protoGetter, skipFKsWithNoMatchingTable); err != nil {
if _, err := table.MaybeUpgradeForeignKeyRepresentation(ctx, protoGetter, codec, skipFKsWithNoMatchingTable); err != nil {
return err
}
// TODO(lucy): Is this necessary?
Expand Down Expand Up @@ -713,7 +716,7 @@ func doRestorePlan(

// Ensure that no user table descriptors exist for a full cluster restore.
txn := p.ExecCfg().DB.NewTxn(ctx, "count-user-descs")
descCount, err := sql.CountUserDescriptors(ctx, txn)
descCount, err := sql.CountUserDescriptors(ctx, txn, p.ExecCfg().Codec)
if err != nil {
return errors.Wrap(err, "looking up user descriptors during restore")
}
Expand All @@ -725,7 +728,7 @@ func doRestorePlan(
}

_, skipMissingFKs := opts[restoreOptSkipMissingFKs]
if err := maybeUpgradeTableDescsInBackupManifests(ctx, mainBackupManifests, skipMissingFKs); err != nil {
if err := maybeUpgradeTableDescsInBackupManifests(ctx, mainBackupManifests, p.ExecCfg().Codec, skipMissingFKs); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func showBackupPlanHook(
// display them anyway, because we don't have the referenced table names,
// etc.
if err := maybeUpgradeTableDescsInBackupManifests(
ctx, manifests, true, /*skipFKsWithNoMatchingTable*/
ctx, manifests, p.ExecCfg().Codec, true, /*skipFKsWithNoMatchingTable*/
); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func TestShowBackup(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE data.details2()`)
sqlDB.Exec(t, `BACKUP data.details1, data.details2 TO $1;`, details)

details1Desc := sqlbase.GetTableDescriptor(tc.Server(0).DB(), "data", "details1")
details2Desc := sqlbase.GetTableDescriptor(tc.Server(0).DB(), "data", "details2")
details1Desc := sqlbase.GetTableDescriptor(tc.Server(0).DB(), keys.SystemSQLCodec, "data", "details1")
details2Desc := sqlbase.GetTableDescriptor(tc.Server(0).DB(), keys.SystemSQLCodec, "data", "details2")
details1Key := roachpb.Key(sqlbase.MakeIndexKeyPrefix(keys.SystemSQLCodec, details1Desc, details1Desc.PrimaryIndex.ID))
details2Key := roachpb.Key(sqlbase.MakeIndexKeyPrefix(keys.SystemSQLCodec, details2Desc, details2Desc.PrimaryIndex.ID))

Expand Down
12 changes: 8 additions & 4 deletions pkg/ccl/backupccl/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,10 @@ func fullClusterTargets(
return fullClusterDescs, fullClusterDBs, nil
}

func lookupDatabaseID(ctx context.Context, txn *kv.Txn, name string) (sqlbase.ID, error) {
found, id, err := sqlbase.LookupDatabaseID(ctx, txn, name)
func lookupDatabaseID(
ctx context.Context, txn *kv.Txn, codec keys.SQLCodec, name string,
) (sqlbase.ID, error) {
found, id, err := sqlbase.LookupDatabaseID(ctx, txn, codec, name)
if err != nil {
return sqlbase.InvalidID, err
}
Expand All @@ -611,8 +613,10 @@ func lookupDatabaseID(ctx context.Context, txn *kv.Txn, name string) (sqlbase.ID

// CheckTableExists returns an error if a table already exists with given
// parent and name.
func CheckTableExists(ctx context.Context, txn *kv.Txn, parentID sqlbase.ID, name string) error {
found, _, err := sqlbase.LookupPublicTableID(ctx, txn, parentID, name)
func CheckTableExists(
ctx context.Context, txn *kv.Txn, codec keys.SQLCodec, parentID sqlbase.ID, name string,
) error {
found, _, err := sqlbase.LookupPublicTableID(ctx, txn, codec, parentID, name)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func createBenchmarkChangefeed(
feedClock *hlc.Clock,
database, table string,
) (*benchSink, func() error, error) {
tableDesc := sqlbase.GetTableDescriptor(s.DB(), database, table)
tableDesc := sqlbase.GetTableDescriptor(s.DB(), keys.SystemSQLCodec, database, table)
spans := []roachpb.Span{tableDesc.PrimaryIndexSpan(keys.SystemSQLCodec)}
details := jobspb.ChangefeedDetails{
Targets: jobspb.ChangefeedTargets{tableDesc.ID: jobspb.ChangefeedTarget{
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func fetchSpansForTargets(
txn.SetFixedTimestamp(ctx, ts)
// Note that all targets are currently guaranteed to be tables.
for tableID := range targets {
tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, tableID)
tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, codec, tableID)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/schemafeed/schema_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (tf *SchemaFeed) primeInitialTableDescs(ctx context.Context) error {
txn.SetFixedTimestamp(ctx, initialTableDescTs)
// Note that all targets are currently guaranteed to be tables.
for tableID := range tf.targets {
tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, tableID)
tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, keys.SystemSQLCodec, tableID)
if err != nil {
return err
}
Expand Down
Loading