Skip to content

Commit

Permalink
fix: do not allow shard creation to create overlapping shards (#22604)
Browse files Browse the repository at this point in the history
  • Loading branch information
lesam authored Oct 1, 2021
1 parent ea018df commit 0a6f562
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 6 deletions.
44 changes: 38 additions & 6 deletions v1/services/meta/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,16 +369,48 @@ func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time,
return nil
}

startTime := timestamp.Truncate(rpi.ShardGroupDuration).UTC()
endTime := startTime.Add(rpi.ShardGroupDuration).UTC()
if endTime.After(time.Unix(0, models.MaxNanoTime)) {
// Shard group range is [start, end) so add one to the max time.
endTime = time.Unix(0, models.MaxNanoTime+1)
}

for i := range rpi.ShardGroups {
if rpi.ShardGroups[i].Deleted() {
continue
}
startI := rpi.ShardGroups[i].StartTime
endI := rpi.ShardGroups[i].EndTime
if rpi.ShardGroups[i].Truncated() {
endI = rpi.ShardGroups[i].TruncatedAt
}

// shard_i covers range [start_i, end_i)
// We want the largest range [startTime, endTime) such that all of the following hold:
// startTime <= timestamp < endTime
// for all i, not { start_i < endTime && startTime < end_i }
// Assume the above conditions are true for shards index < i, we want to modify startTime,endTime so they are true
// also for shard_i

// It must be the case that either endI <= timestamp || timestamp < startI, because otherwise:
// startI <= timestamp < endI means timestamp is contained in shard I
if !timestamp.Before(endI) && endI.After(startTime) {
// startTime < endI <= timestamp
startTime = endI
}
if startI.After(timestamp) && startI.Before(endTime) {
// timestamp < startI < endTime
endTime = startI
}
}

// Create the shard group.
data.MaxShardGroupID++
sgi := ShardGroupInfo{}
sgi.ID = data.MaxShardGroupID
sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC()
sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC()
if sgi.EndTime.After(time.Unix(0, models.MaxNanoTime)) {
// Shard group range is [start, end) so add one to the max time.
sgi.EndTime = time.Unix(0, models.MaxNanoTime+1)
}
sgi.StartTime = startTime
sgi.EndTime = endTime

if len(shards) > 0 {
sgi.Shards = make([]ShardInfo, len(shards))
Expand Down
55 changes: 55 additions & 0 deletions v1/services/meta/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,61 @@ func TestData_TruncateShardGroups(t *testing.T) {
if sg1.TruncatedAt != sg1.StartTime {
t.Fatalf("Incorrect truncation of future shard group. Expected %v, got %v", sg1.StartTime, sg1.TruncatedAt)
}

groups := data.Databases[0].RetentionPolicies[0].ShardGroups
assert.Equal(t, 2, len(groups))
assert.Equal(t, "1970-01-01 00:00:00 +0000 UTC", groups[0].StartTime.String())
assert.Equal(t, "1970-01-02 00:00:00 +0000 UTC", groups[0].EndTime.String())
assert.Equal(t, "1970-01-01 23:59:00 +0000 UTC", groups[0].TruncatedAt.String())

assert.Equal(t, "1970-01-02 00:00:00 +0000 UTC", groups[1].StartTime.String())
assert.Equal(t, "1970-01-03 00:00:00 +0000 UTC", groups[1].EndTime.String())
assert.Equal(t, "1970-01-02 00:00:00 +0000 UTC", groups[1].TruncatedAt.String())

// Create some more shard groups and validate there is no overlap
// Add a shard starting at sg0's truncation time, until 01/02
must(data.CreateShardGroup("db", "rp", sg0.EndTime.Add(-time.Second)))
// Add a shard 01/02 - 01/03 (since sg1 is fully truncated)
must(data.CreateShardGroup("db", "rp", sg1.EndTime.Add(-time.Second)))
// Add a shard 01/06 - 01/07
must(data.CreateShardGroup("db", "rp", sg1.EndTime.Add(3*rp.ShardGroupDuration)))
newDuration := 10 * rp.ShardGroupDuration
data.UpdateRetentionPolicy("db", "rp", &meta.RetentionPolicyUpdate{
Name: nil,
Duration: nil,
ReplicaN: nil,
ShardGroupDuration: &newDuration,
}, true)
// Add a shard 01/03 - 01/06
must(data.CreateShardGroup("db", "rp", sg1.EndTime.Add(1*rp.ShardGroupDuration)))
// Add a shard 01/07 - 01/09
must(data.CreateShardGroup("db", "rp", sg1.EndTime.Add(4*rp.ShardGroupDuration)))
// Add a shard 01/09 - 01/19
must(data.CreateShardGroup("db", "rp", sg1.EndTime.Add(10*rp.ShardGroupDuration)))
// No additional shard added
must(data.CreateShardGroup("db", "rp", sg1.EndTime.Add(11*rp.ShardGroupDuration)))

groups = data.Databases[0].RetentionPolicies[0].ShardGroups
assert.Equal(t, 8, len(groups))

expectTimes := []struct {
start, end, truncated string
}{
{"1970-01-01 00:00:00 +0000 UTC", "1970-01-02 00:00:00 +0000 UTC", "1970-01-01 23:59:00 +0000 UTC"},
{"1970-01-01 23:59:00 +0000 UTC", "1970-01-02 00:00:00 +0000 UTC", "0001-01-01 00:00:00 +0000 UTC"},
{"1970-01-02 00:00:00 +0000 UTC", "1970-01-03 00:00:00 +0000 UTC", "1970-01-02 00:00:00 +0000 UTC"},
{"1970-01-02 00:00:00 +0000 UTC", "1970-01-03 00:00:00 +0000 UTC", "0001-01-01 00:00:00 +0000 UTC"},
{"1970-01-03 00:00:00 +0000 UTC", "1970-01-06 00:00:00 +0000 UTC", "0001-01-01 00:00:00 +0000 UTC"},
{"1970-01-06 00:00:00 +0000 UTC", "1970-01-07 00:00:00 +0000 UTC", "0001-01-01 00:00:00 +0000 UTC"},
{"1970-01-07 00:00:00 +0000 UTC", "1970-01-09 00:00:00 +0000 UTC", "0001-01-01 00:00:00 +0000 UTC"},
{"1970-01-09 00:00:00 +0000 UTC", "1970-01-19 00:00:00 +0000 UTC", "0001-01-01 00:00:00 +0000 UTC"},
}

for i := range expectTimes {
assert.Equal(t, expectTimes[i].start, groups[i].StartTime.String(), "start time %d", i)
assert.Equal(t, expectTimes[i].end, groups[i].EndTime.String(), "end time %d", i)
assert.Equal(t, expectTimes[i].truncated, groups[i].TruncatedAt.String(), "truncate time %d", i)
}
}

func TestUserInfo_AuthorizeDatabase(t *testing.T) {
Expand Down

0 comments on commit 0a6f562

Please sign in to comment.