Skip to content

Commit

Permalink
sql: push tenant-bound SQL codec into descriptor key generation
Browse files Browse the repository at this point in the history
Informs #48123.

This commit continues with the plumbing that began an #48190. It pushes
a tenant-bound SQL codec into the other main source of key generation in
the SQL layer - descriptor manipulation and metadata handling. This
allows SQL tenants to properly handle metadata descriptors for its
database and tables.

This ended up being a larger undertaking than I had originally expected.
However, now that it's complete, we're in a pretty good spot:
1. `sqlbase.MetadataSchema` is ready to be used for #47904.
2. we can now run SQL migrations for a non-system tenant
3. there is only one remaining use of TODOSQLCodec in pkg/sql. See #48375.
  • Loading branch information
nvanbenschoten committed May 5, 2020
1 parent 28d171e commit f618c80
Show file tree
Hide file tree
Showing 136 changed files with 966 additions and 713 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,7 @@ func TestBackupRestoreResume(t *testing.T) {
_, tc, outerDB, dir, cleanupFn := backupRestoreTestSetup(t, multiNode, numAccounts, initNone)
defer cleanupFn()

backupTableDesc := sqlbase.GetTableDescriptor(tc.Servers[0].DB(), "data", "bank")
backupTableDesc := sqlbase.GetTableDescriptor(tc.Servers[0].DB(), keys.SystemSQLCodec, "data", "bank")

t.Run("backup", func(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(outerDB.DB)
Expand Down
34 changes: 17 additions & 17 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,14 +448,14 @@ func WriteTableDescs(
desc.Privileges = sqlbase.NewDefaultPrivilegeDescriptor()
}
wroteDBs[desc.ID] = desc
if err := sql.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, desc.ID, desc); err != nil {
if err := sql.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, keys.SystemSQLCodec, desc.ID, desc); err != nil {
return err
}
// Depending on which cluster version we are restoring to, we decide which
// namespace table to write the descriptor into. This may cause wrong
// behavior if the cluster version is bumped DURING a restore.
dKey := sqlbase.MakeDatabaseNameKey(ctx, settings, desc.Name)
b.CPut(dKey.Key(), desc.ID, nil)
b.CPut(dKey.Key(keys.SystemSQLCodec), desc.ID, nil)
}
for i := range tables {
// For full cluster restore, keep privileges as they were.
Expand All @@ -466,7 +466,7 @@ func WriteTableDescs(
tables[i].Privileges = wrote.GetPrivileges()
}
} else {
parentDB, err := sqlbase.GetDatabaseDescFromID(ctx, txn, tables[i].ParentID)
parentDB, err := sqlbase.GetDatabaseDescFromID(ctx, txn, keys.SystemSQLCodec, tables[i].ParentID)
if err != nil {
return errors.Wrapf(err,
"failed to lookup parent DB %d", errors.Safe(tables[i].ParentID))
Expand All @@ -480,14 +480,14 @@ func WriteTableDescs(
tables[i].Privileges = parentDB.GetPrivileges()
}
}
if err := sql.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, tables[i].ID, tables[i]); err != nil {
if err := sql.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, keys.SystemSQLCodec, tables[i].ID, tables[i]); err != nil {
return err
}
// Depending on which cluster version we are restoring to, we decide which
// namespace table to write the descriptor into. This may cause wrong
// behavior if the cluster version is bumped DURING a restore.
tkey := sqlbase.MakePublicTableNameKey(ctx, settings, tables[i].ParentID, tables[i].Name)
b.CPut(tkey.Key(), tables[i].ID, nil)
b.CPut(tkey.Key(keys.SystemSQLCodec), tables[i].ID, nil)
}
for _, kv := range extra {
b.InitPut(kv.Key, &kv.Value, false)
Expand All @@ -500,7 +500,7 @@ func WriteTableDescs(
}

for _, table := range tables {
if err := table.Validate(ctx, txn); err != nil {
if err := table.Validate(ctx, txn, keys.SystemSQLCodec); err != nil {
return errors.Wrapf(err,
"validate table %d", errors.Safe(table.ID))
}
Expand Down Expand Up @@ -757,11 +757,11 @@ func restore(
// returned an error prior to this.
func loadBackupSQLDescs(
ctx context.Context,
p sql.PlanHookState,
details jobspb.RestoreDetails,
makeExternalStorageFromURI cloud.ExternalStorageFromURIFactory,
encryption *roachpb.FileEncryptionOptions,
) ([]BackupManifest, BackupManifest, []sqlbase.Descriptor, error) {
backupManifests, err := loadBackupManifests(ctx, details.URIs, makeExternalStorageFromURI, encryption)
backupManifests, err := loadBackupManifests(ctx, details.URIs, p.ExecCfg().DistSQLSrv.ExternalStorageFromURI, encryption)
if err != nil {
return nil, BackupManifest{}, nil, err
}
Expand All @@ -770,7 +770,7 @@ func loadBackupSQLDescs(
// TODO(lucy, jordan): This should become unnecessary in 20.1 when we stop
// writing old-style descs in RestoreDetails (unless a job persists across
// an upgrade?).
if err := maybeUpgradeTableDescsInBackupManifests(ctx, backupManifests, true /* skipFKsWithNoMatchingTable */); err != nil {
if err := maybeUpgradeTableDescsInBackupManifests(ctx, backupManifests, p.ExecCfg().Codec, true /* skipFKsWithNoMatchingTable */); err != nil {
return nil, BackupManifest{}, nil, err
}

Expand Down Expand Up @@ -951,7 +951,7 @@ func (r *restoreResumer) Resume(
p := phs.(sql.PlanHookState)

backupManifests, latestBackupManifest, sqlDescs, err := loadBackupSQLDescs(
ctx, details, p.ExecCfg().DistSQLSrv.ExternalStorageFromURI, details.Encryption,
ctx, p, details, details.Encryption,
)
if err != nil {
return err
Expand Down Expand Up @@ -1083,12 +1083,12 @@ func (r *restoreResumer) publishTables(ctx context.Context) error {
tableDesc := *tbl
tableDesc.Version++
tableDesc.State = sqlbase.TableDescriptor_PUBLIC
existingDescVal, err := sqlbase.ConditionalGetTableDescFromTxn(ctx, txn, tbl)
existingDescVal, err := sqlbase.ConditionalGetTableDescFromTxn(ctx, txn, r.execCfg.Codec, tbl)
if err != nil {
return errors.Wrap(err, "validating table descriptor has not changed")
}
b.CPut(
sqlbase.MakeDescMetadataKey(tableDesc.ID),
sqlbase.MakeDescMetadataKey(keys.SystemSQLCodec, tableDesc.ID),
sqlbase.WrapDescriptor(&tableDesc),
existingDescVal,
)
Expand Down Expand Up @@ -1158,16 +1158,16 @@ func (r *restoreResumer) dropTables(ctx context.Context, jr *jobs.Registry, txn
tableDesc := *tbl
tableDesc.Version++
tableDesc.State = sqlbase.TableDescriptor_DROP
err := sqlbase.RemovePublicTableNamespaceEntry(ctx, txn, tbl.ParentID, tbl.Name)
err := sqlbase.RemovePublicTableNamespaceEntry(ctx, txn, keys.SystemSQLCodec, tbl.ParentID, tbl.Name)
if err != nil {
return errors.Wrap(err, "dropping tables caused by restore fail/cancel from public namespace")
}
existingDescVal, err := sqlbase.ConditionalGetTableDescFromTxn(ctx, txn, tbl)
existingDescVal, err := sqlbase.ConditionalGetTableDescFromTxn(ctx, txn, r.execCfg.Codec, tbl)
if err != nil {
return errors.Wrap(err, "dropping tables caused by restore fail/cancel")
}
b.CPut(
sqlbase.MakeDescMetadataKey(tableDesc.ID),
sqlbase.MakeDescMetadataKey(keys.SystemSQLCodec, tableDesc.ID),
sqlbase.WrapDescriptor(&tableDesc),
existingDescVal,
)
Expand Down Expand Up @@ -1213,9 +1213,9 @@ func (r *restoreResumer) dropTables(ctx context.Context, jr *jobs.Registry, txn
}

if isDBEmpty {
descKey := sqlbase.MakeDescMetadataKey(dbDesc.ID)
descKey := sqlbase.MakeDescMetadataKey(keys.SystemSQLCodec, dbDesc.ID)
b.Del(descKey)
b.Del(sqlbase.NewDatabaseKey(dbDesc.Name).Key())
b.Del(sqlbase.NewDatabaseKey(dbDesc.Name).Key(keys.SystemSQLCodec))
}
}
if err := txn.Run(ctx, b); err != nil {
Expand Down
25 changes: 14 additions & 11 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func allocateTableRewrites(
maxExpectedDB := keys.MinUserDescID + sql.MaxDefaultDescriptorID
// Check that any DBs being restored do _not_ exist.
for name := range restoreDBNames {
found, foundID, err := sqlbase.LookupDatabaseID(ctx, txn, name)
found, foundID, err := sqlbase.LookupDatabaseID(ctx, txn, p.ExecCfg().Codec, name)
if err != nil {
return err
}
Expand All @@ -242,11 +242,11 @@ func allocateTableRewrites(
} else if descriptorCoverage == tree.AllDescriptors && table.ParentID < sql.MaxDefaultDescriptorID {
// This is a table that is in a database that already existed at
// cluster creation time.
defaultDBID, err := lookupDatabaseID(ctx, txn, sessiondata.DefaultDatabaseName)
defaultDBID, err := lookupDatabaseID(ctx, txn, p.ExecCfg().Codec, sessiondata.DefaultDatabaseName)
if err != nil {
return err
}
postgresDBID, err := lookupDatabaseID(ctx, txn, sessiondata.PgDatabaseName)
postgresDBID, err := lookupDatabaseID(ctx, txn, p.ExecCfg().Codec, sessiondata.PgDatabaseName)
if err != nil {
return err
}
Expand Down Expand Up @@ -281,7 +281,7 @@ func allocateTableRewrites(
} else {
var parentID sqlbase.ID
{
found, newParentID, err := sqlbase.LookupDatabaseID(ctx, txn, targetDB)
found, newParentID, err := sqlbase.LookupDatabaseID(ctx, txn, p.ExecCfg().Codec, targetDB)
if err != nil {
return err
}
Expand All @@ -293,13 +293,13 @@ func allocateTableRewrites(
}
// Check that the table name is _not_ in use.
// This would fail the CPut later anyway, but this yields a prettier error.
if err := CheckTableExists(ctx, txn, parentID, table.Name); err != nil {
if err := CheckTableExists(ctx, txn, p.ExecCfg().Codec, parentID, table.Name); err != nil {
return err
}

// Check privileges.
{
parentDB, err := sqlbase.GetDatabaseDescFromID(ctx, txn, parentID)
parentDB, err := sqlbase.GetDatabaseDescFromID(ctx, txn, p.ExecCfg().Codec, parentID)
if err != nil {
return errors.Wrapf(err,
"failed to lookup parent DB %d", errors.Safe(parentID))
Expand Down Expand Up @@ -390,7 +390,10 @@ func allocateTableRewrites(
// "other" table is missing from the set provided are omitted during the
// upgrade, instead of causing an error to be returned.
func maybeUpgradeTableDescsInBackupManifests(
ctx context.Context, backupManifests []BackupManifest, skipFKsWithNoMatchingTable bool,
ctx context.Context,
backupManifests []BackupManifest,
codec keys.SQLCodec,
skipFKsWithNoMatchingTable bool,
) error {
protoGetter := sqlbase.MapProtoGetter{
Protos: make(map[interface{}]protoutil.Message),
Expand All @@ -400,7 +403,7 @@ func maybeUpgradeTableDescsInBackupManifests(
for _, backupManifest := range backupManifests {
for _, desc := range backupManifest.Descriptors {
if table := desc.Table(hlc.Timestamp{}); table != nil {
protoGetter.Protos[string(sqlbase.MakeDescMetadataKey(table.ID))] =
protoGetter.Protos[string(sqlbase.MakeDescMetadataKey(codec, table.ID))] =
sqlbase.WrapDescriptor(protoutil.Clone(table).(*sqlbase.TableDescriptor))
}
}
Expand All @@ -410,7 +413,7 @@ func maybeUpgradeTableDescsInBackupManifests(
backupManifest := &backupManifests[i]
for j := range backupManifest.Descriptors {
if table := backupManifest.Descriptors[j].Table(hlc.Timestamp{}); table != nil {
if _, err := table.MaybeUpgradeForeignKeyRepresentation(ctx, protoGetter, skipFKsWithNoMatchingTable); err != nil {
if _, err := table.MaybeUpgradeForeignKeyRepresentation(ctx, protoGetter, codec, skipFKsWithNoMatchingTable); err != nil {
return err
}
// TODO(lucy): Is this necessary?
Expand Down Expand Up @@ -713,7 +716,7 @@ func doRestorePlan(

// Ensure that no user table descriptors exist for a full cluster restore.
txn := p.ExecCfg().DB.NewTxn(ctx, "count-user-descs")
descCount, err := sql.CountUserDescriptors(ctx, txn)
descCount, err := sql.CountUserDescriptors(ctx, txn, p.ExecCfg().Codec)
if err != nil {
return errors.Wrap(err, "looking up user descriptors during restore")
}
Expand All @@ -725,7 +728,7 @@ func doRestorePlan(
}

_, skipMissingFKs := opts[restoreOptSkipMissingFKs]
if err := maybeUpgradeTableDescsInBackupManifests(ctx, mainBackupManifests, skipMissingFKs); err != nil {
if err := maybeUpgradeTableDescsInBackupManifests(ctx, mainBackupManifests, p.ExecCfg().Codec, skipMissingFKs); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func showBackupPlanHook(
// display them anyway, because we don't have the referenced table names,
// etc.
if err := maybeUpgradeTableDescsInBackupManifests(
ctx, manifests, true, /*skipFKsWithNoMatchingTable*/
ctx, manifests, p.ExecCfg().Codec, true, /*skipFKsWithNoMatchingTable*/
); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func TestShowBackup(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE data.details2()`)
sqlDB.Exec(t, `BACKUP data.details1, data.details2 TO $1;`, details)

details1Desc := sqlbase.GetTableDescriptor(tc.Server(0).DB(), "data", "details1")
details2Desc := sqlbase.GetTableDescriptor(tc.Server(0).DB(), "data", "details2")
details1Desc := sqlbase.GetTableDescriptor(tc.Server(0).DB(), keys.SystemSQLCodec, "data", "details1")
details2Desc := sqlbase.GetTableDescriptor(tc.Server(0).DB(), keys.SystemSQLCodec, "data", "details2")
details1Key := roachpb.Key(sqlbase.MakeIndexKeyPrefix(keys.SystemSQLCodec, details1Desc, details1Desc.PrimaryIndex.ID))
details2Key := roachpb.Key(sqlbase.MakeIndexKeyPrefix(keys.SystemSQLCodec, details2Desc, details2Desc.PrimaryIndex.ID))

Expand Down
12 changes: 8 additions & 4 deletions pkg/ccl/backupccl/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,10 @@ func fullClusterTargets(
return fullClusterDescs, fullClusterDBs, nil
}

func lookupDatabaseID(ctx context.Context, txn *kv.Txn, name string) (sqlbase.ID, error) {
found, id, err := sqlbase.LookupDatabaseID(ctx, txn, name)
func lookupDatabaseID(
ctx context.Context, txn *kv.Txn, codec keys.SQLCodec, name string,
) (sqlbase.ID, error) {
found, id, err := sqlbase.LookupDatabaseID(ctx, txn, codec, name)
if err != nil {
return sqlbase.InvalidID, err
}
Expand All @@ -611,8 +613,10 @@ func lookupDatabaseID(ctx context.Context, txn *kv.Txn, name string) (sqlbase.ID

// CheckTableExists returns an error if a table already exists with given
// parent and name.
func CheckTableExists(ctx context.Context, txn *kv.Txn, parentID sqlbase.ID, name string) error {
found, _, err := sqlbase.LookupPublicTableID(ctx, txn, parentID, name)
func CheckTableExists(
ctx context.Context, txn *kv.Txn, codec keys.SQLCodec, parentID sqlbase.ID, name string,
) error {
found, _, err := sqlbase.LookupPublicTableID(ctx, txn, codec, parentID, name)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func createBenchmarkChangefeed(
feedClock *hlc.Clock,
database, table string,
) (*benchSink, func() error, error) {
tableDesc := sqlbase.GetTableDescriptor(s.DB(), database, table)
tableDesc := sqlbase.GetTableDescriptor(s.DB(), keys.SystemSQLCodec, database, table)
spans := []roachpb.Span{tableDesc.PrimaryIndexSpan(keys.SystemSQLCodec)}
details := jobspb.ChangefeedDetails{
Targets: jobspb.ChangefeedTargets{tableDesc.ID: jobspb.ChangefeedTarget{
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func fetchSpansForTargets(
txn.SetFixedTimestamp(ctx, ts)
// Note that all targets are currently guaranteed to be tables.
for tableID := range targets {
tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, tableID)
tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, codec, tableID)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/schemafeed/schema_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (tf *SchemaFeed) primeInitialTableDescs(ctx context.Context) error {
txn.SetFixedTimestamp(ctx, initialTableDescTs)
// Note that all targets are currently guaranteed to be tables.
for tableID := range tf.targets {
tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, tableID)
tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, keys.SystemSQLCodec, tableID)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit f618c80

Please sign in to comment.