diff --git a/src/cluster/placement/placement_mock.go b/src/cluster/placement/placement_mock.go index f297c8e606..1aff1e3b57 100644 --- a/src/cluster/placement/placement_mock.go +++ b/src/cluster/placement/placement_mock.go @@ -1134,6 +1134,20 @@ func (mr *MockActiveStagedPlacementMockRecorder) ActivePlacement() *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ActivePlacement", reflect.TypeOf((*MockActiveStagedPlacement)(nil).ActivePlacement)) } +// Version mocks base method +func (m *MockActiveStagedPlacement) Version() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Version") + ret0, _ := ret[0].(int) + return ret0 +} + +// Version indicates an expected call of Version +func (mr *MockActiveStagedPlacementMockRecorder) Version() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Version", reflect.TypeOf((*MockActiveStagedPlacement)(nil).Version)) +} + // Close mocks base method func (m *MockActiveStagedPlacement) Close() error { m.ctrl.T.Helper() diff --git a/src/cluster/placement/staged_placement.go b/src/cluster/placement/staged_placement.go index ae4114ca0c..bf7b5df6a4 100644 --- a/src/cluster/placement/staged_placement.go +++ b/src/cluster/placement/staged_placement.go @@ -39,6 +39,7 @@ type activeStagedPlacement struct { sync.RWMutex placements Placements + version int nowFn clock.NowFn onPlacementsAddedFn OnPlacementsAddedFn onPlacementsRemovedFn OnPlacementsRemovedFn @@ -50,13 +51,15 @@ type activeStagedPlacement struct { func newActiveStagedPlacement( placements []Placement, + version int, opts ActiveStagedPlacementOptions, -) ActiveStagedPlacement { +) *activeStagedPlacement { if opts == nil { opts = NewActiveStagedPlacementOptions() } p := &activeStagedPlacement{ placements: placements, + version: version, nowFn: opts.ClockOptions().NowFn(), onPlacementsAddedFn: opts.OnPlacementsAddedFn(), onPlacementsRemovedFn: opts.OnPlacementsRemovedFn(), @@ -94,6 +97,12 @@ func (p *activeStagedPlacement) Close() error { return nil } +func (p *activeStagedPlacement) Version() int { + p.RLock() + defer p.RUnlock() + return p.version +} + func (p *activeStagedPlacement) onPlacementDone() { p.RUnlock() } func (p *activeStagedPlacement) activePlacementWithLock(timeNanos int64) (Placement, error) { @@ -174,9 +183,9 @@ func (sp *stagedPlacement) ActiveStagedPlacement(timeNanos int64) ActiveStagedPl idx-- } if idx < 0 { - return newActiveStagedPlacement(sp.placements, sp.opts) + return newActiveStagedPlacement(sp.placements, sp.version, sp.opts) } - return newActiveStagedPlacement(sp.placements[idx:], sp.opts) + return newActiveStagedPlacement(sp.placements[idx:], sp.version, sp.opts) } func (sp *stagedPlacement) Version() int { return sp.version } diff --git a/src/cluster/placement/staged_placement_test.go b/src/cluster/placement/staged_placement_test.go index c2438e8679..fae5633d58 100644 --- a/src/cluster/placement/staged_placement_test.go +++ b/src/cluster/placement/staged_placement_test.go @@ -296,7 +296,7 @@ func TestNewActiveStagedPlacement(t *testing.T) { } }, ) - ap := newActiveStagedPlacement(testActivePlacements, opts).(*activeStagedPlacement) + ap := newActiveStagedPlacement(testActivePlacements, 0, opts) require.Equal(t, len(testActivePlacements), len(allInstances)) require.Equal(t, len(testActivePlacements), len(ap.placements)) for i := 0; i < len(testActivePlacements); i++ { @@ -380,7 +380,7 @@ func TestActiveStagedPlacementCloseSuccess(t *testing.T) { removedInstances = append(removedInstances, placement.Instances()) } }) - p := newActiveStagedPlacement(testActivePlacements, opts) + p := newActiveStagedPlacement(testActivePlacements, 0, opts) require.NoError(t, p.Close()) require.Equal(t, 2, len(addedInstances)) require.Equal(t, 2, len(removedInstances)) @@ -452,9 +452,16 @@ func TestStagedPlacementNilProto(t *testing.T) { func TestStagedPlacementValidProto(t *testing.T) { sp, err := NewStagedPlacementFromProto(1, testStagedPlacementProto, NewActiveStagedPlacementOptions()) require.NoError(t, err) + pss := sp.(*stagedPlacement) require.Equal(t, 1, pss.Version()) + require.Equal(t, 1, pss.ActiveStagedPlacement(0).Version()) + + pss.SetVersion(42) + require.Equal(t, 42, pss.ActiveStagedPlacement(0).Version()) + require.Equal(t, len(pss.placements), len(testStagedPlacementProto.Snapshots)) + for i := 0; i < len(testStagedPlacementProto.Snapshots); i++ { validateSnapshot(t, testActivePlacements[i], pss.placements[i]) } diff --git a/src/cluster/placement/staged_placement_watcher_test.go b/src/cluster/placement/staged_placement_watcher_test.go index e6b86b07cc..bed04b105f 100644 --- a/src/cluster/placement/staged_placement_watcher_test.go +++ b/src/cluster/placement/staged_placement_watcher_test.go @@ -169,3 +169,5 @@ func (mp *mockPlacement) ActivePlacement() (Placement, DoneFn, error) { } func (mp *mockPlacement) Close() error { return mp.closeFn() } + +func (mp *mockPlacement) Version() int { return 0 } diff --git a/src/cluster/placement/types.go b/src/cluster/placement/types.go index 73b6edc62b..29bef15dcf 100644 --- a/src/cluster/placement/types.go +++ b/src/cluster/placement/types.go @@ -271,6 +271,9 @@ type ActiveStagedPlacement interface { // function when the caller is done using the placement, and any errors encountered. ActivePlacement() (Placement, DoneFn, error) + // Version returns the version of the underlying staged placement. + Version() int + // Close closes the active staged placement. Close() error }