Skip to content

Commit

Permalink
[latest-17.0](vitessio#3891): CherryPick(vitessio#14716): Fix Registe…
Browse files Browse the repository at this point in the history
…rNotifier to use a copy of the tables to prevent data races (vitessio#3900)

* backport of 3891

* feat: fix build issues

Signed-off-by: Manan Gupta <manan@planetscale.com>

---------

Signed-off-by: Manan Gupta <manan@planetscale.com>
Co-authored-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
2 people authored and andyedison committed Jul 29, 2024
1 parent 2960a36 commit 90691fa
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 9 deletions.
13 changes: 4 additions & 9 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,8 @@ func (se *Engine) RegisterNotifier(name string, f notifier, runNotifier bool) {
created = append(created, table)
}
if runNotifier {
f(se.tables, created, nil, nil)
s := maps.Clone(se.tables)
f(s, created, nil, nil)
}
}

Expand Down Expand Up @@ -736,10 +737,7 @@ func (se *Engine) broadcast(created, altered, dropped []*Table) {

se.notifierMu.Lock()
defer se.notifierMu.Unlock()
s := make(map[string]*Table, len(se.tables))
for k, v := range se.tables {
s[k] = v
}
s := maps.Clone(se.tables)
for _, f := range se.notifiers {
f(s, created, altered, dropped)
}
Expand All @@ -757,10 +755,7 @@ func (se *Engine) GetTable(tableName sqlparser.IdentifierCS) *Table {
func (se *Engine) GetSchema() map[string]*Table {
se.mu.Lock()
defer se.mu.Unlock()
tables := make(map[string]*Table, len(se.tables))
for k, v := range se.tables {
tables[k] = v
}
tables := maps.Clone(se.tables)
return tables
}

Expand Down
23 changes: 23 additions & 0 deletions go/vt/vttablet/tabletserver/schema/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,29 @@ func AddFakeInnoDBReadRowsResult(db *fakesqldb.DB, value int) *fakesqldb.Expecte
))
}

// TestRegisterNotifier tests the functionality of RegisterNotifier
// It also makes sure that writing to the tables map in the schema engine doesn't change the tables received by the notifiers.
func TestRegisterNotifier(t *testing.T) {
// Create a new engine for testing
se := NewEngineForTests()
se.notifiers = map[string]notifier{}
se.tables = map[string]*Table{
"t1": nil,
"t2": nil,
"t3": nil,
}

var tablesReceived map[string]*Table
// Register a notifier and make it run immediately.
se.RegisterNotifier("TestRegisterNotifier", func(full map[string]*Table, created, altered, dropped []*Table) {
tablesReceived = full
}, true)

// Change the se.tables and make sure it doesn't affect the tables received by the notifier.
se.tables["t4"] = nil
require.Len(t, tablesReceived, 3)
}

// TestEngineMysqlTime tests the functionality of Engine.mysqlTime function
func TestEngineMysqlTime(t *testing.T) {
tests := []struct {
Expand Down

0 comments on commit 90691fa

Please sign in to comment.