Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support update table schema id #353

Merged
merged 1 commit into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions maintainer/barrier_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ type BarrierEvent struct {
blockedDispatchers *heartbeatpb.InfluencedTables
dropDispatchers *heartbeatpb.InfluencedTables

blockedTasks []*scheduler.StateMachine[common.DispatcherID]
dropTasks []*scheduler.StateMachine[common.DispatcherID]
newTables []*heartbeatpb.Table
blockedTasks []*scheduler.StateMachine[common.DispatcherID]
dropTasks []*scheduler.StateMachine[common.DispatcherID]
newTables []*heartbeatpb.Table
schemaIDChange []*heartbeatpb.SchemaIDChange

advancedDispatchers map[common.DispatcherID]bool
lastResendTime time.Time
Expand All @@ -56,6 +57,7 @@ func NewBlockEvent(cfID string, scheduler *Scheduler,
blockedDispatchers: status.BlockTables,
newTables: status.NeedAddedTables,
dropDispatchers: status.NeedDroppedTables,
schemaIDChange: status.UpdatedSchemas,
advancedDispatchers: make(map[common.DispatcherID]bool),
lastResendTime: time.Time{},
}
Expand Down Expand Up @@ -101,6 +103,15 @@ func (b *BarrierEvent) scheduleBlockEvent() {
TableID: add.TableID,
}, b.commitTs)
}

for _, change := range b.schemaIDChange {
log.Info("update schema id",
zap.String("changefeed", b.cfID),
zap.Int64("newSchema", change.OldSchemaID),
zap.Int64("oldSchema", change.NewSchemaID),
zap.Int64("table", change.TableID))
b.scheduler.UpdateSchemaID(change.TableID, change.NewSchemaID)
}
}

func (b *BarrierEvent) allDispatcherReported() bool {
Expand Down
28 changes: 28 additions & 0 deletions maintainer/barrier_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,31 @@ func TestResendAction(t *testing.T) {
require.Equal(t, resp.DispatcherStatuses[0].Action.Action, heartbeatpb.Action_Pass)
require.Equal(t, resp.DispatcherStatuses[0].Action.CommitTs, uint64(10))
}

func TestUpdateSchemaID(t *testing.T) {
sche := NewScheduler("test", 1, nil, nil, nil, 1000, 0)
sche.AddNewNode("node1")
sche.AddNewTable(common.Table{1, 1}, 1)
require.Len(t, sche.Absent(), 1)
require.Len(t, sche.GetTasksBySchemaID(1), 1)
event := NewBlockEvent("test", sche, &heartbeatpb.State{
IsBlocked: true,
BlockTs: 10,
BlockTables: &heartbeatpb.InfluencedTables{
InfluenceType: heartbeatpb.InfluenceType_All,
},
UpdatedSchemas: []*heartbeatpb.SchemaIDChange{
{
TableID: 1,
OldSchemaID: 1,
NewSchemaID: 2,
},
}},
)
event.scheduleBlockEvent()
require.Len(t, sche.Absent(), 1)
// check the schema id and map is updated
require.Len(t, sche.GetTasksBySchemaID(1), 0)
require.Len(t, sche.GetTasksBySchemaID(2), 1)
require.Equal(t, sche.GetTasksByTableIDs(1)[0].Inferior.(*ReplicaSet).SchemaID, int64(2))
}
21 changes: 6 additions & 15 deletions maintainer/barrier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"github.com/flowbehappy/tigate/pkg/messaging"
"github.com/flowbehappy/tigate/scheduler"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
Expand All @@ -36,22 +34,15 @@ func TestNormalBlock(t *testing.T) {
sche.AddNewNode("node2")
var blockedDispatcherIDS []*heartbeatpb.DispatcherID
for id := 1; id < 4; id++ {
span := spanz.TableIDToComparableSpan(int64(id))
tableSpan := &heartbeatpb.TableSpan{
TableID: int64(id),
StartKey: span.StartKey,
EndKey: span.EndKey,
}
dispatcherID := common.NewDispatcherID()
blockedDispatcherIDS = append(blockedDispatcherIDS, dispatcherID.ToPB())
replicaSet := NewReplicaSet(model.DefaultChangeFeedID("test"), dispatcherID, 1, tableSpan, 0)
stm := scheduler.NewStateMachine(dispatcherID, nil, replicaSet)
stm.State = scheduler.SchedulerStatusWorking
sche.Working()[dispatcherID] = stm
sche.AddNewTable(common.Table{1, int64(id)}, 0)
stm := sche.GetTasksByTableIDs(int64(id))[0]
blockedDispatcherIDS = append(blockedDispatcherIDS, stm.ID.ToPB())
stm.Primary = "node1"
sche.nodeTasks["node1"][dispatcherID] = stm
stm.State = scheduler.SchedulerStatusWorking
sche.tryMoveTask(stm.ID, stm, scheduler.SchedulerStatusAbsent, "", true)
}

// the last one is the writer
var selectDispatcherID = common.NewDispatcherIDFromPB(blockedDispatcherIDS[2])
sche.nodeTasks["node2"][selectDispatcherID] = sche.nodeTasks["node1"][selectDispatcherID]
dropID := sche.nodeTasks["node2"][selectDispatcherID].Inferior.(*ReplicaSet).Span.TableID
Expand Down
66 changes: 50 additions & 16 deletions maintainer/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Scheduler struct {
// group the tasks by schema id
schemaTasks map[int64]map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID]
// tables
tableTasks map[int64]struct{}
tableTasks map[int64]map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID]
// totalMaps holds all state maps, absent, committing, working and removing
totalMaps []map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID]
bootstrapped bool
Expand All @@ -71,7 +71,7 @@ func NewScheduler(changefeedID string,
s := &Scheduler{
nodeTasks: make(map[node.ID]map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID]),
schemaTasks: make(map[int64]map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID]),
tableTasks: make(map[int64]struct{}),
tableTasks: make(map[int64]map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID]),
startCheckpointTs: checkpointTs,
changefeedID: changefeedID,
bootstrapped: false,
Expand Down Expand Up @@ -106,8 +106,8 @@ func (s *Scheduler) GetAllNodes() []node.ID {
}

func (s *Scheduler) AddNewTable(table common.Table, startTs uint64) {
_, ok := s.tableTasks[table.TableID]
if ok {
tables, ok := s.tableTasks[table.TableID]
if ok && len(tables) > 0 {
log.Warn("table already add, ignore",
zap.String("changefeed", s.changefeedID),
zap.Int64("schema", table.SchemaID),
Expand Down Expand Up @@ -215,23 +215,43 @@ func (s *Scheduler) RemoveTask(stm *scheduler.StateMachine[common.DispatcherID])
}

func (s *Scheduler) GetTasksByTableIDs(tableIDs ...int64) []*scheduler.StateMachine[common.DispatcherID] {
tableMap := make(map[int64]bool, len(tableIDs))
for _, tableID := range tableIDs {
tableMap[tableID] = true
}
var stms []*scheduler.StateMachine[common.DispatcherID]
for _, m := range s.totalMaps {
for _, stm := range m {
replica := stm.Inferior.(*ReplicaSet)
if !tableMap[replica.Span.TableID] {
continue
}
for _, tableID := range tableIDs {
for _, stm := range s.tableTasks[tableID] {
stms = append(stms, stm)
}
}
return stms
}

// UpdateSchemaID will update the schema id of the table, and move the task to the new schema map
// it called when rename a table to another schema
func (s *Scheduler) UpdateSchemaID(tableID, newSchemaID int64) {
for _, stm := range s.tableTasks[tableID] {
replicaSet := stm.Inferior.(*ReplicaSet)
oldSchemaID := replicaSet.SchemaID
// update schemaID
replicaSet.SchemaID = newSchemaID

//update schema map
schemaMap, ok := s.schemaTasks[oldSchemaID]
if ok {
delete(schemaMap, stm.ID)
//clear the map if empty
if len(schemaMap) == 0 {
delete(s.schemaTasks, oldSchemaID)
}
}
// add it to new schema map
newMap, ok := s.schemaTasks[newSchemaID]
if !ok {
newMap = make(map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID])
s.schemaTasks[newSchemaID] = newMap
}
newMap[stm.ID] = stm
}
}

func (s *Scheduler) AddNewNode(id node.ID) {
_, ok := s.nodeTasks[id]
if ok {
Expand Down Expand Up @@ -550,13 +570,21 @@ func (s *Scheduler) addNewSpans(schemaID, tableID int64,
dispatcherID, schemaID, newTableSpan, startTs).(*ReplicaSet)
stm := scheduler.NewStateMachine(dispatcherID, nil, replicaSet)
s.Absent()[dispatcherID] = stm
// modify the schema map
schemaMap, ok := s.schemaTasks[schemaID]
if !ok {
schemaMap = make(map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID])
s.schemaTasks[schemaID] = schemaMap
}
schemaMap[dispatcherID] = stm
s.tableTasks[tableID] = struct{}{}

// modify the table map
tableMap, ok := s.tableTasks[tableID]
if !ok {
tableMap = make(map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID])
s.tableTasks[tableID] = tableMap
}
tableMap[dispatcherID] = stm
}
}

Expand All @@ -578,7 +606,13 @@ func (s *Scheduler) tryMoveTask(dispatcherID common.DispatcherID,
delete(m, dispatcherID)
}
delete(s.schemaTasks[stm.Inferior.(*ReplicaSet).SchemaID], dispatcherID)
delete(s.tableTasks, stm.Inferior.(*ReplicaSet).Span.TableID)
if len(s.schemaTasks[stm.Inferior.(*ReplicaSet).SchemaID]) == 0 {
delete(s.schemaTasks, stm.Inferior.(*ReplicaSet).SchemaID)
}
delete(s.tableTasks[stm.Inferior.(*ReplicaSet).Span.TableID], dispatcherID)
if len(s.tableTasks[stm.Inferior.(*ReplicaSet).Span.TableID]) == 0 {
delete(s.tableTasks, stm.Inferior.(*ReplicaSet).Span.TableID)
}
}
// keep node task map is updated
if modifyNodeMap && oldPrimary != stm.Primary {
Expand Down
Loading