Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.15](backport #27981) [Elastic Agent] Fix lazy acker to only add new actions to the batch #28062

Merged
merged 1 commit into from
Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
- Add "_monitoring" suffix to monitoring instance names to remove ambiguity with the status command. {issue}25449[25449]
- Ignore ErrNotExists when fixing permissions. {issue}27836[27836] {pull}27846[27846]
- Snapshot artifact lookup will use agent.download proxy settings. {issue}27903[27903] {pull}27904[27904]
- Fix lazy acker to only add new actions to the batch. {pull}27981[27981]

==== New features

Expand Down
14 changes: 12 additions & 2 deletions x-pack/elastic-agent/pkg/fleetapi/acker/lazy/lazy_acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ func NewAcker(baseAcker batchAcker, log *logger.Logger) *Acker {

// Ack acknowledges action.
func (f *Acker) Ack(ctx context.Context, action fleetapi.Action) error {
f.queue = append(f.queue, action)
f.log.Debugf("appending action with id '%s' to the queue", action.ID())
f.enqueue(action)

if _, isAckForced := action.(ackForcer); isAckForced {
return f.Commit(ctx)
Expand All @@ -58,3 +57,14 @@ func (f *Acker) Commit(ctx context.Context) error {
f.queue = make([]fleetapi.Action, 0)
return nil
}

func (f *Acker) enqueue(action fleetapi.Action) {
for _, a := range f.queue {
if a.ID() == action.ID() {
f.log.Debugf("action with id '%s' has already been queued", action.ID())
return
}
}
f.queue = append(f.queue, action)
f.log.Debugf("appending action with id '%s' to the queue", action.ID())
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,21 @@ func TestLazyAcker(t *testing.T) {
}()
c := context.Background()

if err := lacker.Ack(c, testAction1); err != nil {
t.Fatal(err)
}
if err := lacker.Ack(c, testAction1); err != nil {
t.Fatal(err)
}
if err := lacker.Ack(c, testAction2); err != nil {
t.Fatal(err)
}
if err := lacker.Ack(c, testAction2); err != nil {
t.Fatal(err)
}
if err := lacker.Ack(c, testAction3); err != nil {
t.Fatal(err)
}
if err := lacker.Ack(c, testAction3); err != nil {
t.Fatal(err)
}
Expand Down