Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
improve coverage for syncer/sharding_group.go to 82.4% (#1069) (#1082)
Browse files Browse the repository at this point in the history
Co-authored-by: lance6716 <lance6716@gmail.com>
  • Loading branch information
ti-srebot and lance6716 authored Sep 23, 2020
1 parent 2da53e6 commit 55fcd6d
Show file tree
Hide file tree
Showing 2 changed files with 307 additions and 48 deletions.
46 changes: 13 additions & 33 deletions syncer/sharding_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package syncer

/*
* sharding DDL sync for Syncer
* sharding DDL sync for Syncer (pessimism)
*
* assumption:
* all tables in a sharding group execute same DDLs in the same order
Expand Down Expand Up @@ -139,7 +139,7 @@ func NewShardingGroup(sourceID, shardMetaSchema, shardMetaTable string, sources
// * add a new table to exists sharding group
// * add new table(s) to parent database's sharding group
// if group is in sequence sharding, return error directly
// othereise add it in source, set it false and increment remain
// otherwise add it in source, set it false and increment remain
func (sg *ShardingGroup) Merge(sources []string) (bool, bool, int, error) {
sg.Lock()
defer sg.Unlock()
Expand Down Expand Up @@ -286,14 +286,6 @@ func (sg *ShardingGroup) Tables() [][]string {
return tables
}

// IsUnresolved return whether it's unresolved
func (sg *ShardingGroup) IsUnresolved() bool {
sg.RLock()
defer sg.RUnlock()

return sg.remain != len(sg.sources)
}

// UnresolvedTables returns all source tables' <schema, table> pair if is unresolved, else returns nil
func (sg *ShardingGroup) UnresolvedTables() [][]string {
sg.RLock()
Expand Down Expand Up @@ -348,19 +340,12 @@ func (sg *ShardingGroup) String() string {
return fmt.Sprintf("IsSchemaOnly:%v remain:%d, sources:%+v", sg.IsSchemaOnly, sg.remain, sg.sources)
}

// InSequenceSharding returns whether this sharding group is in sequence sharding
func (sg *ShardingGroup) InSequenceSharding() bool {
sg.RLock()
defer sg.RUnlock()
return sg.meta.InSequenceSharding()
}

// ResolveShardingDDL resolves sharding DDL in sharding group
func (sg *ShardingGroup) ResolveShardingDDL() bool {
sg.Lock()
defer sg.Unlock()
reset := sg.meta.ResolveShardingDDL()
// reset sharding group after DDL is exexuted
// reset sharding group after DDL is executed
return reset
}

Expand Down Expand Up @@ -500,11 +485,17 @@ func (k *ShardingGroupKeeper) LeaveGroup(targetSchema, targetTable string, sourc
if err := group.Leave(sources); err != nil {
return err
}
if len(group.sources) == 0 {
delete(k.groups, targetTableID)
}
}
if schemaGroup, ok := k.groups[schemaID]; ok {
if err := schemaGroup.Leave(sources); err != nil {
return err
}
if len(schemaGroup.sources) == 0 {
delete(k.groups, schemaID)
}
}
return nil
}
Expand Down Expand Up @@ -537,7 +528,7 @@ func (k *ShardingGroupKeeper) TrySync(
return true, group, synced, active, remain, err
}

// InSyncing checks whether the source is in sharding syncing
// InSyncing checks whether the source is in sharding syncing, that is to say not before active DDL
func (k *ShardingGroupKeeper) InSyncing(targetSchema, targetTable, source string, location binlog.Location) bool {
group := k.Group(targetSchema, targetTable)
if group == nil {
Expand All @@ -547,7 +538,7 @@ func (k *ShardingGroupKeeper) InSyncing(targetSchema, targetTable, source string
}

// UnresolvedTables returns
// all `target-schema.target-table` that has unresolved sharding DDL
// all `target-schema.target-table` that has unresolved sharding DDL,
// all source tables which with DDLs are un-resolved
// NOTE: this func only ensure the returned tables are current un-resolved
// if passing the returned tables to other func (like checkpoint),
Expand All @@ -560,6 +551,7 @@ func (k *ShardingGroupKeeper) UnresolvedTables() (map[string]bool, [][]string) {
for id, group := range k.groups {
unresolved := group.UnresolvedTables()
if len(unresolved) > 0 {
// TODO: no need to return bool which indicates it has unresolved tables, because nowhere need it
ids[id] = true
tables = append(tables, unresolved...)
}
Expand Down Expand Up @@ -633,18 +625,6 @@ func (k *ShardingGroupKeeper) UnresolvedGroups() []*pb.ShardingGroup {
return groups
}

// InSequenceSharding returns whether exists sharding group in unfinished sequence sharding
func (k *ShardingGroupKeeper) InSequenceSharding() bool {
k.RLock()
defer k.RUnlock()
for _, group := range k.groups {
if group.InSequenceSharding() {
return true
}
}
return false
}

// ResolveShardingDDL resolves one sharding DDL in specific group
func (k *ShardingGroupKeeper) ResolveShardingDDL(targetSchema, targetTable string) (bool, error) {
group := k.Group(targetSchema, targetTable)
Expand All @@ -666,7 +646,7 @@ func (k *ShardingGroupKeeper) ActiveDDLFirstLocation(targetSchema, targetTable s
return binlog.Location{}, terror.ErrSyncUnitShardingGroupNotFound.Generate(targetSchema, targetTable)
}

// PrepareFlushSQLs returns all sharding meta flushed SQLs execpt for given table IDs
// PrepareFlushSQLs returns all sharding meta flushed SQLs except for given table IDs
func (k *ShardingGroupKeeper) PrepareFlushSQLs(exceptTableIDs map[string]bool) ([]string, [][]interface{}) {
k.RLock()
defer k.RUnlock()
Expand Down
Loading

0 comments on commit 55fcd6d

Please sign in to comment.