Skip to content

Commit

Permalink
[cluster] Store shards in sorted form (#2890)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis authored Nov 13, 2020
1 parent b18c15e commit b306e81
Show file tree
Hide file tree
Showing 4 changed files with 337 additions and 76 deletions.
4 changes: 2 additions & 2 deletions src/cluster/placement/algo/sharded_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1230,8 +1230,8 @@ func verifyAllShardsInAvailableState(t *testing.T, p placement.Placement) {
for _, instance := range p.Instances() {
s := instance.Shards()
require.Equal(t, len(s.All()), len(s.ShardsForState(shard.Available)))
require.Nil(t, s.ShardsForState(shard.Initializing))
require.Nil(t, s.ShardsForState(shard.Leaving))
require.Empty(t, s.ShardsForState(shard.Initializing))
require.Empty(t, s.ShardsForState(shard.Leaving))
}
}

Expand Down
175 changes: 108 additions & 67 deletions src/cluster/shard/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,19 @@ func (s State) Proto() (placementpb.ShardState, error) {
func NewShard(id uint32) Shard { return &shard{id: id, state: Unknown} }

// NewShardFromProto create a new shard from proto.
func NewShardFromProto(shard *placementpb.Shard) (Shard, error) {
state, err := NewShardStateFromProto(shard.State)
func NewShardFromProto(spb *placementpb.Shard) (Shard, error) {
state, err := NewShardStateFromProto(spb.State)
if err != nil {
return nil, err
}

return NewShard(shard.Id).
SetState(state).
SetSourceID(shard.SourceId).
SetCutoverNanos(shard.CutoverNanos).
SetCutoffNanos(shard.CutoffNanos), nil
return &shard{
id: spb.Id,
state: state,
sourceID: spb.SourceId,
cutoverNanos: spb.CutoverNanos,
cutoffNanos: spb.CutoffNanos,
}, nil
}

type shard struct {
Expand Down Expand Up @@ -146,26 +148,26 @@ func (s *shard) Equals(other Shard) bool {
}

func (s *shard) Proto() (*placementpb.Shard, error) {
ss, err := s.State().Proto()
ss, err := s.state.Proto()
if err != nil {
return nil, err
}

return &placementpb.Shard{
Id: s.ID(),
Id: s.id,
State: ss,
SourceId: s.SourceID(),
SourceId: s.sourceID,
CutoverNanos: s.cutoverNanos,
CutoffNanos: s.cutoffNanos,
}, nil
}

func (s *shard) Clone() Shard {
return NewShard(s.ID()).
SetState(s.State()).
SetSourceID(s.SourceID()).
SetCutoverNanos(s.CutoverNanos()).
SetCutoffNanos(s.CutoffNanos())
if s == nil {
return nil
}
clone := *s
return &clone
}

// SortableShardsByIDAsc are sortable shards by ID in ascending order
Expand All @@ -188,11 +190,23 @@ func (s SortableIDsAsc) Less(i, j int) bool {

// NewShards creates a new instance of Shards
func NewShards(ss []Shard) Shards {
// deduplicate first, last one wins
shardMap := make(map[uint32]Shard, len(ss))
for _, s := range ss {
shardMap[s.ID()] = s
}
return shards{shardsMap: shardMap}

shrds := make([]Shard, 0, len(shardMap))
for _, s := range shardMap {
shrds = append(shrds, s)
}

sort.Sort(SortableShardsByIDAsc(shrds))

return &shards{
shards: shrds,
shardMap: shardMap,
}
}

// NewShardsFromProto creates a new set of shards from proto.
Expand All @@ -209,77 +223,106 @@ func NewShardsFromProto(shards []*placementpb.Shard) (Shards, error) {
}

type shards struct {
shardsMap map[uint32]Shard
shards []Shard
shardMap map[uint32]Shard
}

func (ss shards) All() []Shard {
shards := make([]Shard, 0, len(ss.shardsMap))
for _, shard := range ss.shardsMap {
shards = append(shards, shard)
}
sort.Sort(SortableShardsByIDAsc(shards))
func (ss *shards) All() []Shard {
shards := make([]Shard, len(ss.shards))
copy(shards, ss.shards)

return shards
}

func (ss shards) AllIDs() []uint32 {
ids := make([]uint32, 0, len(ss.shardsMap))
for _, shard := range ss.shardsMap {
ids = append(ids, shard.ID())
func (ss *shards) AllIDs() []uint32 {
shardIDs := make([]uint32, 0, len(ss.shards))
for _, shrd := range ss.shards {
shardIDs = append(shardIDs, shrd.ID())
}
sort.Sort(SortableIDsAsc(ids))
return ids

return shardIDs
}

func (ss shards) NumShards() int {
return len(ss.shardsMap)
func (ss *shards) NumShards() int {
return len(ss.shards)
}

func (ss shards) Shard(id uint32) (Shard, bool) {
shard, ok := ss.shardsMap[id]
return shard, ok
func (ss *shards) Shard(id uint32) (Shard, bool) {
shard, ok := ss.shardMap[id]
if !ok {
return nil, false
}

return shard, true
}

func (ss shards) Add(shard Shard) {
ss.shardsMap[shard.ID()] = shard
func (ss *shards) Add(shard Shard) {
id := shard.ID()
// we keep a sorted slice of shards, do a binary search to either find the index
// of an existing shard for replacement, or the target index position
i := sort.Search(len(ss.shards), func(i int) bool { return ss.shards[i].ID() >= id })
if i < len(ss.shards) && ss.shards[i].ID() == id {
ss.shards[i] = shard
ss.shardMap[id] = shard
return
}

// extend the sorted shard slice by 1
ss.shards = append(ss.shards, shard)
ss.shardMap[id] = shard

// target position was at the end, so extending with the new shard was enough
if i >= len(ss.shards)-1 {
return
}

// if not, copy over all slice elements shifted by 1 and overwrite data at index
copy(ss.shards[i+1:], ss.shards[i:])
ss.shards[i] = shard
}

func (ss shards) Remove(shard uint32) {
delete(ss.shardsMap, shard)
func (ss *shards) Remove(id uint32) {
// we keep a sorted slice of shards, do a binary search to find the index
i := sort.Search(len(ss.shards), func(i int) bool { return ss.shards[i].ID() >= id })
if i < len(ss.shards) && ss.shards[i].ID() == id {
delete(ss.shardMap, id)
// shift all other elements back after removal
ss.shards = ss.shards[:i+copy(ss.shards[i:], ss.shards[i+1:])]
}
}

func (ss shards) Contains(shard uint32) bool {
_, ok := ss.shardsMap[shard]
func (ss *shards) Contains(shard uint32) bool {
_, ok := ss.shardMap[shard]
return ok
}

func (ss shards) NumShardsForState(state State) int {
func (ss *shards) NumShardsForState(state State) int {
count := 0
for _, s := range ss.shardsMap {
for _, s := range ss.shards {
if s.State() == state {
count++
}
}
return count
}

func (ss shards) ShardsForState(state State) []Shard {
var r []Shard
for _, s := range ss.shardsMap {
func (ss *shards) ShardsForState(state State) []Shard {
r := make([]Shard, 0, len(ss.shards))
for _, s := range ss.shards {
if s.State() == state {
r = append(r, s)
}
}
return r
}

func (ss shards) Equals(other Shards) bool {
shards := ss.All()
otherShards := other.All()
if len(shards) != len(otherShards) {
func (ss *shards) Equals(other Shards) bool {
if len(ss.shards) != other.NumShards() {
return false
}

for i, shard := range shards {
otherShards := other.All()
for i, shard := range ss.shards {
otherShard := otherShards[i]
if !shard.Equals(otherShard) {
return false
Expand All @@ -288,7 +331,7 @@ func (ss shards) Equals(other Shards) bool {
return true
}

func (ss shards) String() string {
func (ss *shards) String() string {
var strs []string
for _, state := range validStates() {
ids := NewShards(ss.ShardsForState(state)).AllIDs()
Expand All @@ -298,10 +341,9 @@ func (ss shards) String() string {
return fmt.Sprintf("[%s]", strings.Join(strs, ", "))
}

func (ss shards) Proto() ([]*placementpb.Shard, error) {
res := make([]*placementpb.Shard, 0, len(ss.shardsMap))
// All() returns the shards in ID ascending order.
for _, shard := range ss.All() {
func (ss *shards) Proto() ([]*placementpb.Shard, error) {
res := make([]*placementpb.Shard, 0, len(ss.shards))
for _, shard := range ss.shards {
sp, err := shard.Proto()
if err != nil {
return nil, err
Expand All @@ -312,22 +354,21 @@ func (ss shards) Proto() ([]*placementpb.Shard, error) {
return res, nil
}

func (ss shards) Clone() Shards {
shards := make([]Shard, ss.NumShards())
for i, shard := range ss.All() {
shards[i] = shard.Clone()
func (ss *shards) Clone() Shards {
shrds := make([]Shard, 0, len(ss.shards))
shardMap := make(map[uint32]Shard, len(ss.shards))

for _, shrd := range ss.shards {
shrds = append(shrds, shrd.Clone())
shardMap[shrd.ID()] = shrd
}

return NewShards(shards)
return &shards{
shards: shrds,
shardMap: shardMap,
}
}

// SortableShardProtosByIDAsc sorts shard protos by their ids in ascending order.
type SortableShardProtosByIDAsc []*placementpb.Shard

func (su SortableShardProtosByIDAsc) Len() int { return len(su) }
func (su SortableShardProtosByIDAsc) Less(i, j int) bool { return su[i].Id < su[j].Id }
func (su SortableShardProtosByIDAsc) Swap(i, j int) { su[i], su[j] = su[j], su[i] }

// validStates returns all the valid states.
func validStates() []State {
return []State{
Expand Down
Loading

0 comments on commit b306e81

Please sign in to comment.