Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coordinator] Validate placement on set placement endpoint unless force set #2922

Merged
merged 2 commits into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion src/cluster/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/cluster/shard"
xerrors "github.com/m3db/m3/src/x/errors"
)

const (
Expand Down Expand Up @@ -297,6 +298,13 @@ func (placements Placements) ActiveIndex(timeNanos int64) int {
// - There is one Initializing shard for each Leaving shard.
// - The instances with same shard_set_id owns the same shards.
func Validate(p Placement) error {
if err := validate(p); err != nil {
return xerrors.NewInvalidParamsError(err)
}
return nil
}

func validate(p Placement) error {
if p.IsMirrored() && !p.IsSharded() {
return errMirrorNotSharded
}
Expand All @@ -311,6 +319,7 @@ func Validate(p Placement) error {
totalLeaving := 0
totalInit := 0
totalInitWithSourceID := 0
instancesLeavingShardsWithMatchingInitShards := make(map[string]map[uint32]string)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; never uses the string value, maybe this should be a make(map[string]map[uint32]struct{})?

Copy link
Collaborator Author

@robskillington robskillington Nov 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah believe we do to print out the error of which host previously had claimed that leaving shard to match it's own initializing shard:

					match := matches[s.ID()]
					if match != "" {
						return fmt.Errorf(
							"instance %s has initializing shard %d with "+
								"source ID %s but leaving instance has shard already matched by %s",
							instance.ID(), s.ID(), sourceID, match)
					}

maxShardSetID := p.MaxShardSetID()
instancesByShardSetID := make(map[uint32]Instance, p.NumInstances())
for _, instance := range p.Instances() {
Expand Down Expand Up @@ -340,8 +349,52 @@ func Validate(p Placement) error {
totalInit++
shardCountMap[s.ID()] = count + 1
totalCapacity++
if s.SourceID() != "" {
if sourceID := s.SourceID(); sourceID != "" {
totalInitWithSourceID++

// Check the instance.
leaving, ok := p.Instance(sourceID)
if !ok {
return fmt.Errorf(
"instance %s has initializing shard %d with "+
"source ID %s but no such instance in placement",
instance.ID(), s.ID(), sourceID)
}

// Check has leaving shard.
leavingShard, ok := leaving.Shards().Shard(s.ID())
if !ok {
return fmt.Errorf(
"instance %s has initializing shard %d with "+
"source ID %s but leaving instance has no such shard",
instance.ID(), s.ID(), sourceID)
}

// Check the shard is leaving.
if state := leavingShard.State(); state != shard.Leaving {
return fmt.Errorf(
"instance %s has initializing shard %d with "+
"source ID %s but leaving instance has shard with state %s",
instance.ID(), s.ID(), sourceID, state.String())
}

// Make sure does not get double matched.
matches, ok := instancesLeavingShardsWithMatchingInitShards[sourceID]
if !ok {
matches = make(map[uint32]string)
instancesLeavingShardsWithMatchingInitShards[sourceID] = matches
}

match, ok := matches[s.ID()]
if ok {
return fmt.Errorf(
"instance %s has initializing shard %d with "+
"source ID %s but leaving instance has shard already matched by %s",
instance.ID(), s.ID(), sourceID, match)
}

// Track that it's matched.
matches[s.ID()] = instance.ID()
}
case shard.Leaving:
totalLeaving++
Expand Down
100 changes: 92 additions & 8 deletions src/cluster/placement/placement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/m3db/m3/src/cluster/shard"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestPlacement(t *testing.T) {
Expand Down Expand Up @@ -139,7 +140,7 @@ func TestMismatchShards(t *testing.T) {

// mismatch shards
p := NewPlacement().SetInstances([]Instance{i1, i2}).SetShards([]uint32{1, 2, 3}).SetReplicaFactor(1)
assert.Error(t, Validate(p))
require.Error(t, Validate(p))
}

func TestNonSharded(t *testing.T) {
Expand All @@ -155,8 +156,8 @@ func TestNonSharded(t *testing.T) {
func TestValidateMirrorButNotSharded(t *testing.T) {
p := NewPlacement().SetIsMirrored(true)
err := Validate(p)
assert.Error(t, err)
assert.Equal(t, errMirrorNotSharded, err)
require.Error(t, err)
assert.Equal(t, errMirrorNotSharded.Error(), err.Error())
}

func TestValidateMissingShard(t *testing.T) {
Expand All @@ -170,7 +171,7 @@ func TestValidateMissingShard(t *testing.T) {
ids := []uint32{1, 2}
p := NewPlacement().SetInstances([]Instance{i1, i2}).SetShards(ids).SetReplicaFactor(2).SetIsSharded(true)
err := Validate(p)
assert.Error(t, err)
require.Error(t, err)
assert.Equal(t, "invalid placement, the total available shards in the placement is 3, expecting 4", err.Error())
}

Expand All @@ -189,8 +190,9 @@ func TestValidateUnexpectedShard(t *testing.T) {
SetReplicaFactor(2).
SetIsSharded(true)

assert.Error(t, Validate(p))
assert.Equal(t, errUnexpectedShards, Validate(p))
err := Validate(p)
require.Error(t, err)
assert.Equal(t, errUnexpectedShards.Error(), err.Error())
}

func TestValidateDuplicatedShards(t *testing.T) {
Expand All @@ -208,8 +210,9 @@ func TestValidateDuplicatedShards(t *testing.T) {
SetInstances([]Instance{i1, i2}).
SetShards([]uint32{2, 3, 4, 4, 5, 6}).
SetReplicaFactor(1)
assert.Error(t, Validate(p))
assert.Equal(t, errDuplicatedShards, Validate(p))
err := Validate(p)
require.Error(t, err)
assert.Equal(t, errDuplicatedShards.Error(), err.Error())
}

func TestValidateWrongReplicaForSomeShards(t *testing.T) {
Expand Down Expand Up @@ -276,6 +279,87 @@ func TestValidateLeavingNotMatchInitializingWithSourceID(t *testing.T) {
assert.Equal(t, err.Error(), "invalid placement, 2 shards in Leaving state, not equal 1 in Initializing state with source id")
}

func TestValidateLeavingAndInitializingWithSourceIDMissing(t *testing.T) {
i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint", 1)
i1.Shards().Add(shard.NewShard(1).SetState(shard.Leaving))
i1.Shards().Add(shard.NewShard(2).SetState(shard.Leaving))
i1.Shards().Add(shard.NewShard(3).SetState(shard.Available))

i2 := NewEmptyInstance("i2", "r2", "z1", "endpoint", 1)
i2.Shards().Add(shard.NewShard(1).SetState(shard.Initializing).SetSourceID("unknown"))
i2.Shards().Add(shard.NewShard(2).SetState(shard.Initializing).SetSourceID("i1"))

p := NewPlacement().
SetInstances([]Instance{i1, i2}).
SetShards([]uint32{1, 2, 3}).
SetReplicaFactor(1).
SetIsSharded(true)
err := Validate(p)
require.Error(t, err)
assert.Equal(t, err.Error(), "instance i2 has initializing shard 1 with source ID unknown but no such instance in placement")
}

func TestValidateLeavingAndInitializingWithSourceIDNoSuchShard(t *testing.T) {
i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint", 1)
i1.Shards().Add(shard.NewShard(1).SetState(shard.Available))
i1.Shards().Add(shard.NewShard(2).SetState(shard.Leaving))

i2 := NewEmptyInstance("i2", "r2", "z1", "endpoint", 1)
i2.Shards().Add(shard.NewShard(2).SetState(shard.Initializing).SetSourceID("i1"))
i2.Shards().Add(shard.NewShard(3).SetState(shard.Initializing).SetSourceID("i1"))

p := NewPlacement().
SetInstances([]Instance{i1, i2}).
SetShards([]uint32{1, 2, 3}).
SetReplicaFactor(1).
SetIsSharded(true)
err := Validate(p)
require.Error(t, err)
assert.Equal(t, err.Error(), "instance i2 has initializing shard 3 with source ID i1 but leaving instance has no such shard")
}

func TestValidateLeavingAndInitializingWithSourceIDShardNotLeaving(t *testing.T) {
i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint", 1)
i1.Shards().Add(shard.NewShard(1).SetState(shard.Available))
i1.Shards().Add(shard.NewShard(2).SetState(shard.Leaving))
i1.Shards().Add(shard.NewShard(3).SetState(shard.Available))

i2 := NewEmptyInstance("i2", "r2", "z1", "endpoint", 1)
i2.Shards().Add(shard.NewShard(1).SetState(shard.Initializing).SetSourceID("i1"))
i2.Shards().Add(shard.NewShard(2).SetState(shard.Initializing).SetSourceID("i1"))

p := NewPlacement().
SetInstances([]Instance{i1, i2}).
SetShards([]uint32{1, 2, 3}).
SetReplicaFactor(1).
SetIsSharded(true)
err := Validate(p)
require.Error(t, err)
assert.Equal(t, err.Error(), "instance i2 has initializing shard 1 with source ID i1 but leaving instance has shard with state Available")
}

func TestValidateLeavingAndInitializingWithSourceIDDoubleMatched(t *testing.T) {
i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint", 1)
i1.Shards().Add(shard.NewShard(1).SetState(shard.Available))
i1.Shards().Add(shard.NewShard(2).SetState(shard.Leaving))
i1.Shards().Add(shard.NewShard(3).SetState(shard.Available))

i2 := NewEmptyInstance("i2", "r2", "z1", "endpoint", 1)
i2.Shards().Add(shard.NewShard(2).SetState(shard.Initializing).SetSourceID("i1"))

i3 := NewEmptyInstance("i3", "r2", "z1", "endpoint", 1)
Comment on lines +342 to +350
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: repeated code may tick off the linter unfortunately, can we just add stuff to the placement options and attempt validation in one function? Validates 1 test per function/path, but probably easier to maintain and see what's going on

i3.Shards().Add(shard.NewShard(2).SetState(shard.Initializing).SetSourceID("i1"))

p := NewPlacement().
SetInstances([]Instance{i1, i2, i3}).
SetShards([]uint32{1, 2, 3}).
SetReplicaFactor(1).
SetIsSharded(true)
err := Validate(p)
require.Error(t, err)
assert.Equal(t, err.Error(), "instance i3 has initializing shard 2 with source ID i1 but leaving instance has shard already matched by i2")
}

func TestValidateNoEndpoint(t *testing.T) {
i1 := NewEmptyInstance("i1", "r1", "z1", "", 1)
i1.Shards().Add(shard.NewShard(1).SetState(shard.Available))
Expand Down
11 changes: 11 additions & 0 deletions src/query/api/v1/handler/placement/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package placement

import (
"fmt"
"net/http"
"path"
"time"
Expand Down Expand Up @@ -116,6 +117,16 @@ func (h *SetHandler) ServeHTTP(
return
}

if err := placement.Validate(newPlacement); err != nil {
if !req.Force {
logger.Error("unable to validate new placement", zap.Error(err))
xhttp.WriteError(w,
xerrors.NewRenamedError(err, fmt.Errorf("unable to validate new placement: %w", err)))
return
}
logger.Warn("unable to validate new placement, continuing with force", zap.Error(err))
}

var (
placementProto = req.Placement
dryRun = !req.Confirm
Expand Down
115 changes: 115 additions & 0 deletions src/query/api/v1/handler/placement/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,118 @@ func TestPlacementSetHandler_NewPlacement(t *testing.T) {
assert.Equal(t, 0, newPlacement.Version())
})
}

func TestPlacementSetHandler_ValidatePlacementWithoutForce(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockClient, mockPlacementService := SetupPlacementTest(t, ctrl)
handlerOpts, err := NewHandlerOptions(
mockClient, config.Configuration{}, nil, instrument.NewOptions())
require.NoError(t, err)
handler := NewSetHandler(handlerOpts)

badReqProto := &admin.PlacementSetRequest{
Placement: &placementpb.Placement{
Instances: map[string]*placementpb.Instance{
"host1": {
Id: "host1",
IsolationGroup: "rack1",
Zone: "test",
Weight: 1,
Endpoint: "http://host1:1234",
Hostname: "host1",
Port: 1234,
Shards: []*placementpb.Shard{
&placementpb.Shard{
Id: 0,
State: placementpb.ShardState_AVAILABLE,
},
&placementpb.Shard{
Id: 1,
State: placementpb.ShardState_AVAILABLE,
},
},
},
"host2": {
Id: "host2",
IsolationGroup: "rack1",
Zone: "test",
Weight: 1,
Endpoint: "http://host2:1234",
Hostname: "host2",
Port: 1234,
Shards: []*placementpb.Shard{
&placementpb.Shard{
Id: 0,
State: placementpb.ShardState_INITIALIZING,
SourceId: "host1",
},
&placementpb.Shard{
Id: 1,
State: placementpb.ShardState_INITIALIZING,
SourceId: "host1",
},
},
},
},
IsSharded: true,
NumShards: 2,
ReplicaFactor: 2,
},
Version: 0,
Confirm: true,
}

reqBody, err := (&jsonpb.Marshaler{}).MarshalToString(badReqProto)
require.NoError(t, err)

req := httptest.NewRequest(SetHTTPMethod, M3DBSetURL, strings.NewReader(reqBody))
require.NotNil(t, req)

existingPlacementProto := &placementpb.Placement{
Instances: map[string]*placementpb.Instance{
"host1": {
Id: "host1",
IsolationGroup: "rack1",
Zone: "test",
Weight: 1,
Endpoint: "http://host1:1234",
Hostname: "host1",
Port: 1234,
Shards: []*placementpb.Shard{
&placementpb.Shard{
Id: 0,
State: placementpb.ShardState_AVAILABLE,
},
&placementpb.Shard{
Id: 1,
State: placementpb.ShardState_AVAILABLE,
},
},
},
},
IsSharded: true,
NumShards: 2,
ReplicaFactor: 1,
}

existingPlacement, err := placement.NewPlacementFromProto(existingPlacementProto)
require.NoError(t, err)

mockPlacementService.EXPECT().
Placement().
Return(existingPlacement, nil)

svcDefaults := handleroptions.ServiceNameAndDefaults{
ServiceName: handleroptions.M3DBServiceName,
}

w := httptest.NewRecorder()
handler.ServeHTTP(svcDefaults, w, req)
resp := w.Result()
body := w.Body.String()
assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
assert.True(t, strings.Contains(body, "unable to validate new placement"))
assert.True(t, strings.Contains(body, "instance host2 has initializing shard 0 with source ID host1 but leaving instance has shard with state Available"))
}
Loading