Skip to content

Commit

Permalink
Merge pull request #45873 from tbg/mv-cloud
Browse files Browse the repository at this point in the history
kvserver,kv: move bulk, cloud, and replace kv/*.go
  • Loading branch information
tbg authored Mar 9, 2020
2 parents a244ae5 + bf703d9 commit e0ca60f
Show file tree
Hide file tree
Showing 374 changed files with 4,565 additions and 4,599 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"testing"

"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/cloud"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down
16 changes: 8 additions & 8 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/cloud"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/covering"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -60,7 +60,7 @@ func countRows(raw roachpb.BulkOpSummary, pkIDs map[uint64]struct{}) RowCount {
return res
}

func allRangeDescriptors(ctx context.Context, txn *client.Txn) ([]roachpb.RangeDescriptor, error) {
func allRangeDescriptors(ctx context.Context, txn *kv.Txn) ([]roachpb.RangeDescriptor, error) {
rows, err := txn.Scan(ctx, keys.Meta2Prefix, keys.MetaMax, 0)
if err != nil {
return nil, errors.Wrapf(err,
Expand Down Expand Up @@ -161,7 +161,7 @@ type spanAndTime struct {
// file.
func backup(
ctx context.Context,
db *client.DB,
db *kv.DB,
gossip *gossip.Gossip,
settings *cluster.Settings,
defaultStore cloud.ExternalStorage,
Expand All @@ -186,7 +186,7 @@ func backup(
var checkpointMu syncutil.Mutex

var ranges []roachpb.RangeDescriptor
if err := db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
var err error
// TODO(benesch): limit the range descriptors we fetch to the ranges that
// are actually relevant in the backup to speed up small backups on large
Expand Down Expand Up @@ -306,7 +306,7 @@ func backup(
MVCCFilter: roachpb.MVCCFilter(backupManifest.MVCCFilter),
Encryption: encryption,
}
rawRes, pErr := client.SendWrappedWith(ctx, db.NonTransactionalSender(), header, req)
rawRes, pErr := kv.SendWrappedWith(ctx, db.NonTransactionalSender(), header, req)
if pErr != nil {
return pErr.GoError()
}
Expand Down Expand Up @@ -518,7 +518,7 @@ func (b *backupResumer) Resume(
return nil
}

func (b *backupResumer) clearStats(ctx context.Context, DB *client.DB) error {
func (b *backupResumer) clearStats(ctx context.Context, DB *kv.DB) error {
details := b.job.Details().(jobspb.BackupDetails)
var backupManifest BackupManifest
if err := protoutil.Unmarshal(details.BackupManifest, &backupManifest); err != nil {
Expand All @@ -530,7 +530,7 @@ func (b *backupResumer) clearStats(ctx context.Context, DB *client.DB) error {
return err
}
details.BackupManifest = descBytes
err = DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
err = DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return b.job.WithTxn(txn).SetDetails(ctx, details)
})
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/cloud"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand All @@ -30,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/interval"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl/sampledataccl"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
Expand Down Expand Up @@ -2475,8 +2475,8 @@ func TestRestoreAsOfSystemTimeGCBounds(t *testing.T) {
},
Threshold: tc.Server(0).Clock().Now(),
}
if _, err := client.SendWrapped(
ctx, tc.Server(0).DistSenderI().(*kv.DistSender), &gcr,
if _, err := kv.SendWrapped(
ctx, tc.Server(0).DistSenderI().(*kvcoord.DistSender), &gcr,
); err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/manifest_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/cloud"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
Expand Down
28 changes: 14 additions & 14 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/cloud"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand All @@ -33,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/interval"
Expand Down Expand Up @@ -291,7 +291,7 @@ rangeLoop:
func splitAndScatter(
restoreCtx context.Context,
settings *cluster.Settings,
db *client.DB,
db *kv.DB,
kr *storageccl.KeyRewriter,
numClusterNodes int,
importSpans []importEntry,
Expand Down Expand Up @@ -353,7 +353,7 @@ func splitAndScatter(
// span being restored into.
RandomizeLeases: true,
}
if _, pErr := client.SendWrapped(ctx, db.NonTransactionalSender(), scatterReq); pErr != nil {
if _, pErr := kv.SendWrapped(ctx, db.NonTransactionalSender(), scatterReq); pErr != nil {
// TODO(dan): Unfortunately, Scatter is still too unreliable to
// fail the RESTORE when Scatter fails. I'm uncomfortable that
// this could break entirely and not start failing the tests,
Expand Down Expand Up @@ -397,7 +397,7 @@ func splitAndScatter(
scatterReq := &roachpb.AdminScatterRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(roachpb.Span{Key: newSpanKey, EndKey: newSpanKey.Next()}),
}
if _, pErr := client.SendWrapped(ctx, db.NonTransactionalSender(), scatterReq); pErr != nil {
if _, pErr := kv.SendWrapped(ctx, db.NonTransactionalSender(), scatterReq); pErr != nil {
// TODO(dan): Unfortunately, Scatter is still too unreliable to
// fail the RESTORE when Scatter fails. I'm uncomfortable that
// this could break entirely and not start failing the tests,
Expand Down Expand Up @@ -427,7 +427,7 @@ func splitAndScatter(
// on that database at the time this function is called.
func WriteTableDescs(
ctx context.Context,
txn *client.Txn,
txn *kv.Txn,
databases []*sqlbase.DatabaseDescriptor,
tables []*sqlbase.TableDescriptor,
descCoverage tree.DescriptorCoverage,
Expand Down Expand Up @@ -557,7 +557,7 @@ func rewriteBackupSpanKey(kr *storageccl.KeyRewriter, key roachpb.Key) (roachpb.
// files.
func restore(
restoreCtx context.Context,
db *client.DB,
db *kv.DB,
gossip *gossip.Gossip,
settings *cluster.Settings,
backupManifests []BackupManifest,
Expand Down Expand Up @@ -710,7 +710,7 @@ func restore(
defer tracing.FinishSpan(importSpan)
defer func() { <-importsSem }()

importRes, pErr := client.SendWrapped(ctx, db.NonTransactionalSender(), importRequest)
importRes, pErr := kv.SendWrapped(ctx, db.NonTransactionalSender(), importRequest)
if pErr != nil {
return errors.Wrapf(pErr.GoError(), "importing span %v", importRequest.DataSpan)

Expand Down Expand Up @@ -834,14 +834,14 @@ func remapRelevantStatistics(
// after the other.
func isDatabaseEmpty(
ctx context.Context,
db *client.DB,
db *kv.DB,
dbDesc *sql.DatabaseDescriptor,
ignoredTables map[sqlbase.ID]struct{},
) (bool, error) {
var allDescs []sqlbase.Descriptor
if err := db.Txn(
ctx,
func(ctx context.Context, txn *client.Txn) error {
func(ctx context.Context, txn *kv.Txn) error {
var err error
allDescs, err = allSQLDescriptors(ctx, txn)
return err
Expand Down Expand Up @@ -923,7 +923,7 @@ func createImportingTables(
}

if !details.PrepareCompleted {
err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Write the new TableDescriptors which are set in the OFFLINE state.
if err := WriteTableDescs(ctx, txn, databases, tables, details.DescriptorCoverage, r.job.Payload().Username, r.settings, nil /* extra */); err != nil {
return errors.Wrapf(err, "restoring %d TableDescriptors from %d databases", len(r.tables), len(databases))
Expand Down Expand Up @@ -1029,7 +1029,7 @@ func (r *restoreResumer) insertStats(ctx context.Context) error {
return nil
}

err := r.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
err := r.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := stats.InsertNewStats(ctx, r.execCfg.InternalExecutor, txn, r.latestStats); err != nil {
return errors.Wrapf(err, "inserting stats from backup")
}
Expand All @@ -1053,7 +1053,7 @@ func (r *restoreResumer) publishTables(ctx context.Context) error {
}
log.Event(ctx, "making tables live")

err := r.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
err := r.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Write the new TableDescriptors and flip state over to public so they can be
// accessed.
b := txn.NewBatch()
Expand Down Expand Up @@ -1107,7 +1107,7 @@ func (r *restoreResumer) OnFailOrCancel(ctx context.Context, phs interface{}) er
}

// dropTables implements the OnFailOrCancel logic.
func (r *restoreResumer) dropTables(ctx context.Context, txn *client.Txn) error {
func (r *restoreResumer) dropTables(ctx context.Context, txn *kv.Txn) error {
details := r.job.Details().(jobspb.RestoreDetails)

// No need to mark the tables as dropped if they were not even created in the
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/cloud"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/covering"
Expand All @@ -30,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -219,7 +219,7 @@ func allocateTableRewrites(

// Fail fast if the necessary databases don't exist or are otherwise
// incompatible with this restore.
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
maxExpectedDB := keys.MinUserDescID + sql.MaxDefaultDescriptorID
// Check that any DBs being restored do _not_ exist.
for name := range restoreDBNames {
Expand Down
18 changes: 8 additions & 10 deletions pkg/ccl/backupccl/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"sort"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -297,7 +297,7 @@ func descriptorsMatchingTargets(
// a descriptor the the ID by which it was previously known (e.g pre-TRUNCATE).
func getRelevantDescChanges(
ctx context.Context,
db *client.DB,
db *kv.DB,
startTime, endTime hlc.Timestamp,
descs []sqlbase.Descriptor,
expanded []sqlbase.ID,
Expand Down Expand Up @@ -402,7 +402,7 @@ func getRelevantDescChanges(
// nil content).
func getAllDescChanges(
ctx context.Context,
db *client.DB,
db *kv.DB,
startTime, endTime hlc.Timestamp,
priorIDs map[sqlbase.ID]sqlbase.ID,
) ([]BackupManifest_DescriptorRevision, error) {
Expand Down Expand Up @@ -440,7 +440,7 @@ func getAllDescChanges(
return res, nil
}

func allSQLDescriptors(ctx context.Context, txn *client.Txn) ([]sqlbase.Descriptor, error) {
func allSQLDescriptors(ctx context.Context, txn *kv.Txn) ([]sqlbase.Descriptor, error) {
startKey := roachpb.Key(keys.MakeTablePrefix(keys.DescriptorTableID))
endKey := startKey.PrefixEnd()
rows, err := txn.Scan(ctx, startKey, endKey, 0)
Expand Down Expand Up @@ -492,12 +492,12 @@ func ensureInterleavesIncluded(tables []*sqlbase.TableDescriptor) error {
}

func loadAllDescs(
ctx context.Context, db *client.DB, asOf hlc.Timestamp,
ctx context.Context, db *kv.DB, asOf hlc.Timestamp,
) ([]sqlbase.Descriptor, error) {
var allDescs []sqlbase.Descriptor
if err := db.Txn(
ctx,
func(ctx context.Context, txn *client.Txn) error {
func(ctx context.Context, txn *kv.Txn) error {
var err error
txn.SetFixedTimestamp(ctx, asOf)
allDescs, err = allSQLDescriptors(ctx, txn)
Expand Down Expand Up @@ -592,7 +592,7 @@ func fullClusterTargets(
return fullClusterDescs, fullClusterDBs, nil
}

func lookupDatabaseID(ctx context.Context, txn *client.Txn, name string) (sqlbase.ID, error) {
func lookupDatabaseID(ctx context.Context, txn *kv.Txn, name string) (sqlbase.ID, error) {
found, id, err := sqlbase.LookupDatabaseID(ctx, txn, name)
if err != nil {
return sqlbase.InvalidID, err
Expand All @@ -605,9 +605,7 @@ func lookupDatabaseID(ctx context.Context, txn *client.Txn, name string) (sqlbas

// CheckTableExists returns an error if a table already exists with given
// parent and name.
func CheckTableExists(
ctx context.Context, txn *client.Txn, parentID sqlbase.ID, name string,
) error {
func CheckTableExists(ctx context.Context, txn *kv.Txn, parentID sqlbase.ID, name string) error {
found, _, err := sqlbase.LookupPublicTableID(ctx, txn, parentID, name)
if err != nil {
return err
Expand Down
Loading

0 comments on commit e0ca60f

Please sign in to comment.