-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TableGC: support DROP VIEW #14020
TableGC: support DROP VIEW #14020
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,9 +72,15 @@ var ( | |
sqlPurgeTable = `delete from %a limit 50` | ||
sqlShowVtTables = `show full tables like '\_vt\_%'` | ||
sqlDropTable = "drop table if exists `%a`" | ||
sqlDropView = "drop view if exists `%a`" | ||
purgeReentranceFlag int64 | ||
) | ||
|
||
type gcTable struct { | ||
tableName string | ||
isBaseTable bool | ||
} | ||
|
||
// transitionRequest encapsulates a request to transition a table to next state | ||
type transitionRequest struct { | ||
fromTableName string | ||
|
@@ -223,7 +229,7 @@ func (collector *TableGC) Close() { | |
// operate is the main entry point for the table garbage collector operation and logic. | ||
func (collector *TableGC) operate(ctx context.Context) { | ||
|
||
dropTablesChan := make(chan string) | ||
dropTablesChan := make(chan *gcTable) | ||
purgeRequestsChan := make(chan bool) | ||
transitionRequestsChan := make(chan *transitionRequest) | ||
|
||
|
@@ -251,7 +257,11 @@ func (collector *TableGC) operate(ctx context.Context) { | |
case <-tableCheckTicker.C: | ||
{ | ||
log.Info("TableGC: tableCheckTicker") | ||
_ = collector.checkTables(ctx, dropTablesChan, transitionRequestsChan) | ||
if gcTables, err := collector.readTables(ctx); err != nil { | ||
log.Errorf("TableGC: error while reading tables: %+v", err) | ||
} else { | ||
_ = collector.checkTables(ctx, gcTables, dropTablesChan, transitionRequestsChan) | ||
} | ||
} | ||
case <-purgeReentranceTicker.C: | ||
{ | ||
|
@@ -280,11 +290,11 @@ func (collector *TableGC) operate(ctx context.Context) { | |
time.AfterFunc(time.Second, func() { purgeRequestsChan <- true }) | ||
}() | ||
} | ||
case dropTableName := <-dropTablesChan: | ||
case dropTable := <-dropTablesChan: | ||
{ | ||
log.Info("TableGC: dropTablesChan") | ||
if err := collector.dropTable(ctx, dropTableName); err != nil { | ||
log.Errorf("TableGC: error dropping table %s: %+v", dropTableName, err) | ||
log.Infof("TableGC: found %v in dropTablesChan", dropTable.tableName) | ||
if err := collector.dropTable(ctx, dropTable.tableName, dropTable.isBaseTable); err != nil { | ||
log.Errorf("TableGC: error dropping table %s: %+v", dropTable.tableName, err) | ||
} | ||
} | ||
case transition := <-transitionRequestsChan: | ||
|
@@ -368,29 +378,39 @@ func (collector *TableGC) shouldTransitionTable(tableName string) (shouldTransit | |
return true, state, uuid, nil | ||
} | ||
|
||
// checkTables looks for potential GC tables in the MySQL server+schema. | ||
// It lists _vt_% tables, then filters through those which are due-date. | ||
// It then applies the necessary operation per table. | ||
func (collector *TableGC) checkTables(ctx context.Context, dropTablesChan chan<- string, transitionRequestsChan chan<- *transitionRequest) error { | ||
// readTables reads the list of _vt_% tables from the database | ||
func (collector *TableGC) readTables(ctx context.Context) (gcTables []*gcTable, err error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. refactored |
||
log.Infof("TableGC: read tables") | ||
|
||
conn, err := collector.pool.Get(ctx, nil) | ||
if err != nil { | ||
return err | ||
return nil, err | ||
} | ||
defer conn.Recycle() | ||
|
||
log.Infof("TableGC: check tables") | ||
|
||
res, err := conn.Exec(ctx, sqlShowVtTables, math.MaxInt32, true) | ||
if err != nil { | ||
return err | ||
return nil, err | ||
} | ||
|
||
for _, row := range res.Rows { | ||
tableName := row[0].ToString() | ||
tableType := row[1].ToString() | ||
isBaseTable := (tableType == "BASE TABLE") | ||
gcTables = append(gcTables, &gcTable{tableName: tableName, isBaseTable: isBaseTable}) | ||
} | ||
return gcTables, nil | ||
} | ||
|
||
// checkTables looks for potential GC tables in the MySQL server+schema. | ||
// It lists _vt_% tables, then filters through those which are due-date. | ||
// It then applies the necessary operation per table. | ||
func (collector *TableGC) checkTables(ctx context.Context, gcTables []*gcTable, dropTablesChan chan<- *gcTable, transitionRequestsChan chan<- *transitionRequest) error { | ||
log.Infof("TableGC: check tables") | ||
|
||
shouldTransition, state, uuid, err := collector.shouldTransitionTable(tableName) | ||
for i := range gcTables { | ||
table := gcTables[i] // we capture as local variable as we will later use this in a goroutine | ||
shouldTransition, state, uuid, err := collector.shouldTransitionTable(table.tableName) | ||
|
||
if err != nil { | ||
log.Errorf("TableGC: error while checking tables: %+v", err) | ||
|
@@ -401,30 +421,32 @@ func (collector *TableGC) checkTables(ctx context.Context, dropTablesChan chan<- | |
continue | ||
} | ||
|
||
log.Infof("TableGC: will operate on table %s", tableName) | ||
log.Infof("TableGC: will operate on table %s", table.tableName) | ||
|
||
if state == schema.HoldTableGCState { | ||
// Hold period expired. Moving to next state | ||
collector.submitTransitionRequest(ctx, transitionRequestsChan, state, tableName, isBaseTable, uuid) | ||
collector.submitTransitionRequest(ctx, transitionRequestsChan, state, table.tableName, table.isBaseTable, uuid) | ||
} | ||
if state == schema.PurgeTableGCState { | ||
if isBaseTable { | ||
if table.isBaseTable { | ||
// This table needs to be purged. Make sure to enlist it (we may already have) | ||
if !collector.addPurgingTable(tableName) { | ||
collector.submitTransitionRequest(ctx, transitionRequestsChan, state, tableName, isBaseTable, uuid) | ||
if !collector.addPurgingTable(table.tableName) { | ||
collector.submitTransitionRequest(ctx, transitionRequestsChan, state, table.tableName, table.isBaseTable, uuid) | ||
} | ||
} else { | ||
// This is a view. We don't need to delete rows from views. Just transition into next phase | ||
collector.submitTransitionRequest(ctx, transitionRequestsChan, state, tableName, isBaseTable, uuid) | ||
collector.submitTransitionRequest(ctx, transitionRequestsChan, state, table.tableName, table.isBaseTable, uuid) | ||
} | ||
} | ||
if state == schema.EvacTableGCState { | ||
// This table was in EVAC state for the required period. It will transition into DROP state | ||
collector.submitTransitionRequest(ctx, transitionRequestsChan, state, tableName, isBaseTable, uuid) | ||
collector.submitTransitionRequest(ctx, transitionRequestsChan, state, table.tableName, table.isBaseTable, uuid) | ||
} | ||
if state == schema.DropTableGCState { | ||
// This table needs to be dropped immediately. | ||
go func() { dropTablesChan <- tableName }() | ||
go func() { | ||
dropTablesChan <- table | ||
}() | ||
} | ||
} | ||
|
||
|
@@ -520,21 +542,25 @@ func (collector *TableGC) purge(ctx context.Context) (tableName string, err erro | |
|
||
// dropTable runs an actual DROP TABLE statement, and marks the end of the line for the | ||
// tables' GC lifecycle. | ||
func (collector *TableGC) dropTable(ctx context.Context, tableName string) error { | ||
conn, err := collector.pool.Get(ctx, nil) | ||
func (collector *TableGC) dropTable(ctx context.Context, tableName string, isBaseTable bool) error { | ||
conn, err := dbconnpool.NewDBConnection(ctx, collector.env.Config().DB.DbaWithDB()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Powerful user needed to issue a |
||
if err != nil { | ||
return err | ||
} | ||
defer conn.Recycle() | ||
defer conn.Close() | ||
|
||
parsed := sqlparser.BuildParsedQuery(sqlDropTable, tableName) | ||
sqlDrop := sqlDropTable | ||
if !isBaseTable { | ||
sqlDrop = sqlDropView | ||
} | ||
parsed := sqlparser.BuildParsedQuery(sqlDrop, tableName) | ||
|
||
log.Infof("TableGC: dropping table: %s", tableName) | ||
_, err = conn.Exec(ctx, parsed.Query, 1, true) | ||
_, err = conn.ExecuteFetch(parsed.Query, 1, false) | ||
if err != nil { | ||
return err | ||
} | ||
log.Infof("TableGC: dropped table: %s", tableName) | ||
log.Infof("TableGC: dropped table: %s, isBaseTable: %v", tableName, isBaseTable) | ||
return nil | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️