Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor --initialize-target-sequences followups #13758

Merged
merged 1 commit into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1252,9 +1252,9 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s
// be in another unsharded keyspace.
smMu := sync.Mutex{}
tableCount := len(sequencesByBackingTable)
tablesFound := 0 // Used to short circuit the search
searchCompleted := make(chan struct{}) // The search has completed
searchKeyspace := func(sctx context.Context, keyspace string) error { // The function used to search each keyspace
tablesFound := 0 // Used to short circuit the search
// Define the function used to search each keyspace.
searchKeyspace := func(sctx context.Context, done chan struct{}, keyspace string) error {
kvs, kerr := ts.TopoServer().GetVSchema(sctx, keyspace)
if kerr != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get vschema for keyspace %s: %v",
Expand All @@ -1267,7 +1267,7 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s
select {
case <-sctx.Done():
return sctx.Err()
case <-searchCompleted:
case <-done: // We've found everything we need in other goroutines
return nil
default:
}
Expand All @@ -1279,13 +1279,16 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s
sm != nil && tableName == sm.backingTableName {
tablesFound++ // This is also protected by the mutex
sm.backingTableKeyspace = keyspace
// Set the default keyspace name. We will later check to
// see if the tablet we send requests to is using a dbname
// override and use that if it is.
sm.backingTableDBName = "vt_" + keyspace
if tablesFound == tableCount { // Short circuit the search
select {
case <-searchCompleted: // It's already been closed
case <-done: // It's already been closed
return true
default:
close(searchCompleted) // Mark the search as completed
close(done) // Mark the search as completed
return true
}
}
Expand All @@ -1302,10 +1305,11 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get keyspaces: %v", err)
}
searchGroup, gctx := errgroup.WithContext(ctx)
searchCompleted := make(chan struct{})
for _, keyspace := range keyspaces {
keyspace := keyspace // https://golang.org/doc/faq#closures_and_goroutines
searchGroup.Go(func() error {
return searchKeyspace(gctx, keyspace)
return searchKeyspace(gctx, searchCompleted, keyspace)
})
}
if err := searchGroup.Wait(); err != nil {
Expand Down Expand Up @@ -1355,6 +1359,9 @@ func (ts *trafficSwitcher) findSequenceUsageInKeyspace(vschema *vschemapb.Keyspa
}
sm.backingTableName = tableName
sm.backingTableKeyspace = keyspace
// Set the default keyspace name. We will later check to
// see if the tablet we send requests to is using a dbname
// override and use that if it is.
sm.backingTableDBName = "vt_" + keyspace
} else {
allFullyQualified = false
Expand Down
21 changes: 14 additions & 7 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1974,9 +1974,9 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s
// be in another unsharded keyspace.
smMu := sync.Mutex{}
tableCount := len(sequencesByBackingTable)
tablesFound := 0 // Used to short circuit the search
searchCompleted := make(chan struct{}) // The search has completed
searchKeyspace := func(sctx context.Context, keyspace string) error { // The function used to search each keyspace
tablesFound := 0 // Used to short circuit the search
// Define the function used to search each keyspace.
searchKeyspace := func(sctx context.Context, done chan struct{}, keyspace string) error {
kvs, kerr := ts.TopoServer().GetVSchema(sctx, keyspace)
if kerr != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get vschema for keyspace %s: %v",
Expand All @@ -1989,7 +1989,7 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s
select {
case <-sctx.Done():
return sctx.Err()
case <-searchCompleted:
case <-done: // We've found everything we need in other goroutines
return nil
default:
}
Expand All @@ -2001,13 +2001,16 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s
sm != nil && tableName == sm.backingTableName {
tablesFound++ // This is also protected by the mutex
sm.backingTableKeyspace = keyspace
// Set the default keyspace name. We will later check to
// see if the tablet we send requests to is using a dbname
// override and use that if it is.
sm.backingTableDBName = "vt_" + keyspace
if tablesFound == tableCount { // Short circuit the search
select {
case <-searchCompleted: // It's already been closed
case <-done: // It's already been closed
return true
default:
close(searchCompleted) // Mark the search as completed
close(done) // Mark the search as completed
return true
}
}
Expand All @@ -2024,10 +2027,11 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get keyspaces: %v", err)
}
searchGroup, gctx := errgroup.WithContext(ctx)
searchCompleted := make(chan struct{})
for _, keyspace := range keyspaces {
keyspace := keyspace // https://golang.org/doc/faq#closures_and_goroutines
searchGroup.Go(func() error {
return searchKeyspace(gctx, keyspace)
return searchKeyspace(gctx, searchCompleted, keyspace)
})
}
if err := searchGroup.Wait(); err != nil {
Expand Down Expand Up @@ -2077,6 +2081,9 @@ func (ts *trafficSwitcher) findSequenceUsageInKeyspace(vschema *vschemapb.Keyspa
}
sm.backingTableName = tableName
sm.backingTableKeyspace = keyspace
// Set the default keyspace name. We will later check to
// see if the tablet we send requests to is using a dbname
// override and use that if it is.
sm.backingTableDBName = "vt_" + keyspace
} else {
allFullyQualified = false
Expand Down