Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] Move queue management to dispatcher #1109

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 70 additions & 1 deletion internal/pkg/agent/application/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"reflect"
"strings"
"time"

"go.elastic.co/apm"

Expand All @@ -21,6 +22,12 @@ import (

type actionHandlers map[string]actions.Handler

type priorityQueue interface {
Add(fleetapi.Action, int64)
DequeueActions() []fleetapi.Action
Save() error
}

// Dispatcher processes actions coming from fleet api.
type Dispatcher interface {
Dispatch(context.Context, acker.Acker, ...fleetapi.Action) error
Expand All @@ -31,10 +38,11 @@ type ActionDispatcher struct {
log *logger.Logger
handlers actionHandlers
def actions.Handler
queue priorityQueue
}

// New creates a new action dispatcher.
func New(log *logger.Logger, def actions.Handler) (*ActionDispatcher, error) {
func New(log *logger.Logger, def actions.Handler, queue priorityQueue) (*ActionDispatcher, error) {
var err error
if log == nil {
log, err = logger.New("action_dispatcher", false)
Expand All @@ -51,6 +59,7 @@ func New(log *logger.Logger, def actions.Handler) (*ActionDispatcher, error) {
log: log,
handlers: make(actionHandlers),
def: def,
queue: queue,
}, nil
}

Expand Down Expand Up @@ -86,6 +95,17 @@ func (ad *ActionDispatcher) Dispatch(ctx context.Context, acker acker.Acker, act
span.End()
}()

actions = ad.queueScheduledActions(actions)
actions = ad.dispatchCancelActions(ctx, actions, acker)
queued, expired := ad.gatherQueuedActions(time.Now().UTC())
ad.log.Debugf("Gathered %d actions from queue, %d actions expired", len(queued), len(expired))
ad.log.Debugf("Expired actions: %v", expired)
actions = append(actions, queued...)

if err := ad.queue.Save(); err != nil {
ad.log.Errorf("failed to persist action_queue: %v", err)
}

if len(actions) == 0 {
ad.log.Debug("No action to dispatch")
return nil
Expand Down Expand Up @@ -128,3 +148,52 @@ func detectTypes(actions []fleetapi.Action) []string {
}
return str
}

// queueScheduledActions will add any action in actions with a valid start time to the queue and return the rest.
// start time to current time comparisons are purposefully not made in case of cancel actions.
func (ad *ActionDispatcher) queueScheduledActions(input []fleetapi.Action) []fleetapi.Action {
actions := make([]fleetapi.Action, 0, len(input))
for _, action := range input {
start, err := action.StartTime()
if err == nil {
ad.log.Debugf("Adding action id: %s to queue.", action.ID())
ad.queue.Add(action, start.Unix())
continue
}
if !errors.Is(err, fleetapi.ErrNoStartTime) {
ad.log.Warnf("Issue gathering start time from action id %s: %v", action.ID(), err)
}
actions = append(actions, action)
}
return actions
}

// dispatchCancelActions will separate and dispatch any cancel actions from the actions list and return the rest of the list.
// cancel actions are dispatched seperatly as they may remove items from the queue.
func (ad *ActionDispatcher) dispatchCancelActions(ctx context.Context, actions []fleetapi.Action, acker acker.Acker) []fleetapi.Action {
for i := len(actions) - 1; i >= 0; i-- {
action := actions[i]
// If it is a cancel action, remove from list and dispatch
if action.Type() == fleetapi.ActionTypeCancel {
actions = append(actions[:i], actions[i+1:]...)
if err := ad.dispatchAction(ctx, action, acker); err != nil {
ad.log.Errorf("Unable to dispatch cancel action id %s: %v", action.ID(), err)
}
}
}
return actions
}

// gatherQueuedActions will dequeue actions from the action queue and separate those that have already expired.
func (ad *ActionDispatcher) gatherQueuedActions(ts time.Time) (queued, expired []fleetapi.Action) {
actions := ad.queue.DequeueActions()
for _, action := range actions {
exp, _ := action.Expiration()
if ts.After(exp) {
expired = append(expired, action)
continue
}
queued = append(queued, action)
}
return queued, expired
}
144 changes: 141 additions & 3 deletions internal/pkg/agent/application/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,34 @@ func (m *mockAction) Expiration() (time.Time, error) {
return args.Get(0).(time.Time), args.Error(1)
}

type mockQueue struct {
mock.Mock
}

func (m *mockQueue) Add(action fleetapi.Action, n int64) {
m.Called(action, n)
}

func (m *mockQueue) DequeueActions() []fleetapi.Action {
args := m.Called()
return args.Get(0).([]fleetapi.Action)
}

func (m *mockQueue) Save() error {
args := m.Called()
return args.Error(0)
}

func TestActionDispatcher(t *testing.T) {
ack := noop.New()

t.Run("Success to dispatch multiples events", func(t *testing.T) {
ctx := context.Background()
def := &mockHandler{}
d, err := New(nil, def)
queue := &mockQueue{}
queue.On("Save").Return(nil).Once()
queue.On("DequeueActions").Return([]fleetapi.Action{}).Once()
d, err := New(nil, def, queue)
require.NoError(t, err)

success1 := &mockHandler{}
Expand All @@ -76,7 +97,13 @@ func TestActionDispatcher(t *testing.T) {
require.NoError(t, err)

action1 := &mockAction{}
action1.On("StartTime").Return(time.Time{}, fleetapi.ErrNoStartTime)
action1.On("Type").Return("action")
action1.On("ID").Return("id")
action2 := &mockOtherAction{}
action2.On("StartTime").Return(time.Time{}, fleetapi.ErrNoStartTime)
action2.On("Type").Return("action")
action2.On("ID").Return("id")

// TODO better matching for actions
success1.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
Expand All @@ -88,34 +115,145 @@ func TestActionDispatcher(t *testing.T) {
success1.AssertExpectations(t)
success2.AssertExpectations(t)
def.AssertNotCalled(t, "Handle", mock.Anything, mock.Anything, mock.Anything)
queue.AssertExpectations(t)
})

t.Run("Unknown action are caught by the unknown handler", func(t *testing.T) {
def := &mockHandler{}
def.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
ctx := context.Background()
d, err := New(nil, def)
queue := &mockQueue{}
queue.On("Save").Return(nil).Once()
queue.On("DequeueActions").Return([]fleetapi.Action{}).Once()
d, err := New(nil, def, queue)
require.NoError(t, err)

action := &mockUnknownAction{}
action.On("StartTime").Return(time.Time{}, fleetapi.ErrNoStartTime)
action.On("Type").Return("action")
action.On("ID").Return("id")
err = d.Dispatch(ctx, ack, action)

require.NoError(t, err)
def.AssertExpectations(t)
queue.AssertExpectations(t)
})

t.Run("Could not register two handlers on the same action", func(t *testing.T) {
success1 := &mockHandler{}
success2 := &mockHandler{}

def := &mockHandler{}
d, err := New(nil, def)
queue := &mockQueue{}
d, err := New(nil, def, queue)
require.NoError(t, err)

err = d.Register(&mockAction{}, success1)
require.NoError(t, err)

err = d.Register(&mockAction{}, success2)
require.Error(t, err)
queue.AssertExpectations(t)
})

t.Run("Dispatched action is queued", func(t *testing.T) {
def := &mockHandler{}
def.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()

queue := &mockQueue{}
queue.On("Save").Return(nil).Once()
queue.On("DequeueActions").Return([]fleetapi.Action{}).Once()
queue.On("Add", mock.Anything, mock.Anything).Once()

d, err := New(nil, def, queue)
require.NoError(t, err)
err = d.Register(&mockAction{}, def)
require.NoError(t, err)

action1 := &mockAction{}
action1.On("StartTime").Return(time.Time{}, fleetapi.ErrNoStartTime)
action1.On("Type").Return("action")
action1.On("ID").Return("id")
action2 := &mockAction{}
action2.On("StartTime").Return(time.Now().Add(time.Hour), nil)
action2.On("Type").Return("action")
action2.On("ID").Return("id")

err = d.Dispatch(context.Background(), ack, action1, action2)
require.NoError(t, err)
def.AssertExpectations(t)
queue.AssertExpectations(t)
})

t.Run("Cancel queued action", func(t *testing.T) {
def := &mockHandler{}
def.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()

queue := &mockQueue{}
queue.On("Save").Return(nil).Once()
queue.On("DequeueActions").Return([]fleetapi.Action{}).Once()

d, err := New(nil, def, queue)
require.NoError(t, err)
err = d.Register(&mockAction{}, def)
require.NoError(t, err)

action := &mockAction{}
action.On("StartTime").Return(time.Time{}, fleetapi.ErrNoStartTime)
action.On("Type").Return(fleetapi.ActionTypeCancel)
action.On("ID").Return("id")

err = d.Dispatch(context.Background(), ack, action)
require.NoError(t, err)
def.AssertExpectations(t)
queue.AssertExpectations(t)
})

t.Run("Retrieve actions from queue", func(t *testing.T) {
def := &mockHandler{}
def.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()

action1 := &mockAction{}
action1.On("StartTime").Return(time.Time{}, fleetapi.ErrNoStartTime)
action1.On("Expiration").Return(time.Now().Add(time.Hour), fleetapi.ErrNoStartTime)
action1.On("Type").Return(fleetapi.ActionTypeCancel)
action1.On("ID").Return("id")

queue := &mockQueue{}
queue.On("Save").Return(nil).Once()
queue.On("DequeueActions").Return([]fleetapi.Action{action1}).Once()

d, err := New(nil, def, queue)
require.NoError(t, err)
err = d.Register(&mockAction{}, def)
require.NoError(t, err)

action2 := &mockAction{}
action2.On("StartTime").Return(time.Time{}, fleetapi.ErrNoStartTime)
action2.On("Type").Return(fleetapi.ActionTypeCancel)
action2.On("ID").Return("id")

err = d.Dispatch(context.Background(), ack, action2)
require.NoError(t, err)
def.AssertExpectations(t)
queue.AssertExpectations(t)
})

t.Run("Retrieve no actions from queue", func(t *testing.T) {
def := &mockHandler{}
def.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(nil)

queue := &mockQueue{}
queue.On("Save").Return(nil).Once()
queue.On("DequeueActions").Return([]fleetapi.Action{}).Once()

d, err := New(nil, def, queue)
require.NoError(t, err)
err = d.Register(&mockAction{}, def)
require.NoError(t, err)

err = d.Dispatch(context.Background(), ack)
require.NoError(t, err)
def.AssertNotCalled(t, "Handle", mock.Anything, mock.Anything, mock.Anything)
})
}
Loading