Skip to content

Commit

Permalink
refactor the state store to use JSON (#4441)
Browse files Browse the repository at this point in the history
Fix action store -> state store migration tests (#4235)
* use golden file for store migration test
* remove withFile function
* migrate take in a storage.Store instead of the storage's path. It's needed so we can set the encrypted store vault's path

refactor state store (#4253)
It modifies the state store API to match the current needs.

update action model to match fleet-server schema (#4240)
* simplify fleetapi.Actions.UnmarshalJSON
* add test to ensure the state store is correctly loaded from disk
* skip state store migration tests, they will be fixes on a follow-up PR as part of #3912

add migrations for action and state stores (#4305)
  • Loading branch information
AndersonQ authored Jun 27, 2024
1 parent 4a78e5e commit b4877f5
Show file tree
Hide file tree
Showing 46 changed files with 2,289 additions and 1,071 deletions.
35 changes: 35 additions & 0 deletions changelog/fragments/1712067343-fix-state-store.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Fix the Elastic Agent state store

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: |
This change fixes issues when loading data from the Agent's internal state store.
Which include the error `error parsing version ""` the Agent would present after
trying to execute a scheduled upgrade after a restart.
# Affected component; a word indicating the component this changeset affects.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
#pr: https://github.com/owner/repo/1234

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/3912
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ func (h *Cancel) Handle(ctx context.Context, a fleetapi.Action, acker acker.Acke
if !ok {
return fmt.Errorf("invalid type, expected ActionCancel and received %T", a)
}
n := h.c.Cancel(action.TargetID)
n := h.c.Cancel(action.Data.TargetID)
if n == 0 {
h.log.Debugf("Cancel action id: %s target id: %s found no actions in queue.", action.ActionID, action.TargetID)
h.log.Debugf("Cancel action id: %s target id: %s found no actions in queue.",
action.ActionID, action.Data.TargetID)
return nil
}
h.log.Infof("Cancel action id: %s target id: %s removed %d action(s) from queue.", action.ActionID, action.TargetID, n)
h.log.Infof("Cancel action id: %s target id: %s removed %d action(s) from queue.",
action.ActionID, action.Data.TargetID, n)
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ func (h *Diagnostics) collectDiag(ctx context.Context, action *fleetapi.ActionDi
cDiag := h.diagComponents(ctx, action)

var r io.Reader
// attempt to create the a temporary diagnostics file on disk in order to avoid loading a
// potentially large file in memory.
// attempt to create a temporary diagnostics file on disk in order to avoid
// loading a potentially large file in memory.
// if on-disk creation fails an in-memory buffer is used.
f, s, err := h.diagFile(aDiag, uDiag, cDiag, action.ExcludeEventsLog)
f, s, err := h.diagFile(aDiag, uDiag, cDiag, action.Data.ExcludeEventsLog)
if err != nil {
var b bytes.Buffer
h.log.Warnw("Diagnostics action unable to use temporary file, using buffer instead.", "error.message", err)
Expand All @@ -161,7 +161,7 @@ func (h *Diagnostics) collectDiag(ctx context.Context, action *fleetapi.ActionDi
h.log.Warn(str)
}
}()
err := diagnostics.ZipArchive(&wBuf, &b, h.topPath, aDiag, uDiag, cDiag, action.ExcludeEventsLog)
err := diagnostics.ZipArchive(&wBuf, &b, h.topPath, aDiag, uDiag, cDiag, action.Data.ExcludeEventsLog)
if err != nil {
h.log.Errorw(
"diagnostics action handler failed generate zip archive",
Expand Down Expand Up @@ -202,7 +202,7 @@ func (h *Diagnostics) runHooks(ctx context.Context, action *fleetapi.ActionDiagn
// Currently CPU is the only additional metric we can collect.
// If this changes we would need to change how we scan AdditionalMetrics.
collectCPU := false
for _, metric := range action.AdditionalMetrics {
for _, metric := range action.Data.AdditionalMetrics {
if metric == "CPU" {
h.log.Debug("Diagnostics will collect CPU profile.")
collectCPU = true
Expand Down Expand Up @@ -296,7 +296,7 @@ func (h *Diagnostics) diagComponents(ctx context.Context, action *fleetapi.Actio
h.log.Debugf("Component diagnostics complete. Took: %s", time.Since(startTime))
}()
additionalMetrics := []cproto.AdditionalDiagnosticRequest{}
for _, metric := range action.AdditionalMetrics {
for _, metric := range action.Data.AdditionalMetrics {
if metric == "CPU" {
additionalMetrics = append(additionalMetrics, cproto.AdditionalDiagnosticRequest_CPU)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func TestDiagnosticHandlerWithCPUProfile(t *testing.T) {
mockUploader.EXPECT().UploadDiagnostics(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("upload-id", nil)

diagAction := &fleetapi.ActionDiagnostics{
AdditionalMetrics: []string{"CPU"},
Data: fleetapi.ActionDiagnosticsData{AdditionalMetrics: []string{"CPU"}},
}
handler.collectDiag(context.Background(), diagAction, mockAcker)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (h *PolicyChangeHandler) Handle(ctx context.Context, a fleetapi.Action, ack
// // Cache signature validation key for the next policy handling
// h.signatureValidationKey = signatureValidationKey

c, err := config.NewConfigFrom(action.Policy)
c, err := config.NewConfigFrom(action.Data.Policy)
if err != nil {
return errors.New(err, "could not parse the configuration from the policy", errors.TypeConfig)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ func TestPolicyChange(t *testing.T) {
action := &fleetapi.ActionPolicyChange{
ActionID: "abc123",
ActionType: "POLICY_CHANGE",
Policy: conf,
Data: fleetapi.ActionPolicyChangeData{
Policy: conf,
},
}

cfg := configuration.DefaultConfiguration()
Expand Down Expand Up @@ -86,7 +88,9 @@ func TestPolicyAcked(t *testing.T) {
action := &fleetapi.ActionPolicyChange{
ActionID: actionID,
ActionType: "POLICY_CHANGE",
Policy: config,
Data: fleetapi.ActionPolicyChangeData{
Policy: config,
},
}

cfg := configuration.DefaultConfiguration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (h *Settings) Handle(ctx context.Context, a fleetapi.Action, acker acker.Ac
return fmt.Errorf("invalid type, expected ActionSettings and received %T", a)
}

logLevel := action.LogLevel
logLevel := action.Data.LogLevel
return h.handleLogLevel(ctx, logLevel, acker, action)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestSettings_handleLogLevel(t *testing.T) {
action: &fleetapi.ActionSettings{
ActionID: "someactionid",
ActionType: fleetapi.ActionTypeSettings,
LogLevel: "debug",
Data: fleetapi.ActionSettingsData{LogLevel: "debug"},
},
},
setupMocks: func(t *testing.T, agent *mockinfo.Agent, setter *mockhandlers.LogLevelSetter, acker *mockfleetacker.Acker) {
Expand All @@ -154,7 +154,8 @@ func TestSettings_handleLogLevel(t *testing.T) {
action: &fleetapi.ActionSettings{
ActionID: "someactionid",
ActionType: fleetapi.ActionTypeSettings,
LogLevel: clearLogLevelValue,
Data: fleetapi.ActionSettingsData{
LogLevel: clearLogLevelValue},
},
},
setupMocks: func(t *testing.T, agent *mockinfo.Agent, setter *mockhandlers.LogLevelSetter, acker *mockfleetacker.Acker) {
Expand All @@ -175,7 +176,8 @@ func TestSettings_handleLogLevel(t *testing.T) {
action: &fleetapi.ActionSettings{
ActionID: "someactionid",
ActionType: fleetapi.ActionTypeSettings,
LogLevel: clearLogLevelValue,
Data: fleetapi.ActionSettingsData{
LogLevel: clearLogLevelValue},
},
},
setupMocks: func(t *testing.T, agent *mockinfo.Agent, setter *mockhandlers.LogLevelSetter, acker *mockfleetacker.Acker) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ const (
)

type stateStore interface {
Add(fleetapi.Action)
SetAction(fleetapi.Action)
AckToken() string
SetAckToken(ackToken string)
Save() error
Actions() []fleetapi.Action
Action() fleetapi.Action
}

// Unenroll results in running agent entering idle state, non managed non standalone.
Expand Down Expand Up @@ -94,7 +94,7 @@ func (h *Unenroll) Handle(ctx context.Context, a fleetapi.Action, acker acker.Ac

if h.stateStore != nil {
// backup action for future start to avoid starting fleet gateway loop
h.stateStore.Add(a)
h.stateStore.SetAction(a)
if err := h.stateStore.Save(); err != nil {
h.log.Warnf("Failed to update state store: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func (h *Upgrade) Handle(ctx context.Context, a fleetapi.Action, ack acker.Acker
}

go func() {
h.log.Infof("starting upgrade to version %s in background", action.Version)
if err := h.coord.Upgrade(asyncCtx, action.Version, action.SourceURI, action, false, false); err != nil {
h.log.Errorf("upgrade to version %s failed: %v", action.Version, err)
h.log.Infof("starting upgrade to version %s in background", action.Data.Version)
if err := h.coord.Upgrade(asyncCtx, action.Data.Version, action.Data.SourceURI, action, false, false); err != nil {
h.log.Errorf("upgrade to version %s failed: %v", action.Data.Version, err)
// If context is cancelled in getAsyncContext, the actions are acked there
if !errors.Is(asyncCtx.Err(), context.Canceled) {
h.bkgMutex.Lock()
Expand Down Expand Up @@ -125,14 +125,17 @@ func (h *Upgrade) getAsyncContext(ctx context.Context, action fleetapi.Action, a
h.log.Errorf("invalid type, expected ActionUpgrade and received %T", action)
return nil, false
}
if (upgradeAction.Version == bkgAction.Version) && (upgradeAction.SourceURI == bkgAction.SourceURI) {
h.log.Infof("Duplicate upgrade to version %s received", bkgAction.Version)
if (upgradeAction.Data.Version == bkgAction.Data.Version) &&
(upgradeAction.Data.SourceURI == bkgAction.Data.SourceURI) {
h.log.Infof("Duplicate upgrade to version %s received",
bkgAction.Data.Version)
h.bkgActions = append(h.bkgActions, action)
return nil, false
}

// Versions must be different, cancel the first upgrade and run the new one
h.log.Infof("Canceling upgrade to version %s received", bkgAction.Version)
h.log.Infof("Canceling upgrade to version %s received",
bkgAction.Data.Version)
h.bkgCancel()

// Ack here because we have the lock, and we need to clear out the saved actions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func TestUpgradeHandler(t *testing.T) {
go c.Run(ctx)

u := NewUpgrade(log, c)
a := fleetapi.ActionUpgrade{Version: "8.3.0", SourceURI: "http://localhost"}
a := fleetapi.ActionUpgrade{Data: fleetapi.ActionUpgradeData{
Version: "8.3.0", SourceURI: "http://localhost"}}
ack := noopacker.New()
err := u.Handle(ctx, &a, ack)
require.NoError(t, err)
Expand Down Expand Up @@ -114,7 +115,8 @@ func TestUpgradeHandlerSameVersion(t *testing.T) {
go c.Run(ctx)

u := NewUpgrade(log, c)
a := fleetapi.ActionUpgrade{Version: "8.3.0", SourceURI: "http://localhost"}
a := fleetapi.ActionUpgrade{Data: fleetapi.ActionUpgradeData{
Version: "8.3.0", SourceURI: "http://localhost"}}
ack := noopacker.New()
err1 := u.Handle(ctx, &a, ack)
err2 := u.Handle(ctx, &a, ack)
Expand Down Expand Up @@ -149,8 +151,10 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
go c.Run(ctx)

u := NewUpgrade(log, c)
a1 := fleetapi.ActionUpgrade{Version: "8.2.0", SourceURI: "http://localhost"}
a2 := fleetapi.ActionUpgrade{Version: "8.5.0", SourceURI: "http://localhost"}
a1 := fleetapi.ActionUpgrade{Data: fleetapi.ActionUpgradeData{
Version: "8.2.0", SourceURI: "http://localhost"}}
a2 := fleetapi.ActionUpgrade{Data: fleetapi.ActionUpgradeData{
Version: "8.5.0", SourceURI: "http://localhost"}}
ack := noopacker.New()
err1 := u.Handle(ctx, &a1, ack)
require.NoError(t, err1)
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/agent/application/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (ad *ActionDispatcher) handleExpired(
version := "unknown"
expiration := "unknown"
if upgrade, ok := e.(*fleetapi.ActionUpgrade); ok {
version = upgrade.Version
version = upgrade.Data.Version
expiration = upgrade.ActionExpiration
}
ad.lastUpgradeDetails = details.NewDetails(version, details.StateFailed, e.ID())
Expand Down Expand Up @@ -358,7 +358,7 @@ func (ad *ActionDispatcher) reportNextScheduledUpgrade(input []fleetapi.Action,
}

upgradeDetails := details.NewDetails(
nextUpgrade.Version,
nextUpgrade.Data.Version,
details.StateScheduled,
nextUpgrade.ID())
startTime, err := nextUpgrade.StartTime()
Expand Down
20 changes: 15 additions & 5 deletions internal/pkg/agent/application/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,9 @@ func TestReportNextScheduledUpgrade(t *testing.T) {
actions: []fleetapi.Action{
&fleetapi.ActionUpgrade{
ActionID: "action1",
Version: "8.12.3",
Data: fleetapi.ActionUpgradeData{
Version: "8.12.3",
},
},
},
expectedErrLogMsg: "failed to get start time for scheduled upgrade action [id = action1]",
Expand All @@ -685,7 +687,9 @@ func TestReportNextScheduledUpgrade(t *testing.T) {
&fleetapi.ActionUpgrade{
ActionID: "action2",
ActionStartTime: later.Format(time.RFC3339),
Version: "8.13.0",
Data: fleetapi.ActionUpgradeData{
Version: "8.13.0",
},
},
},
expectedDetails: &details.Details{
Expand All @@ -702,12 +706,16 @@ func TestReportNextScheduledUpgrade(t *testing.T) {
&fleetapi.ActionUpgrade{
ActionID: "action3",
ActionStartTime: muchLater.Format(time.RFC3339),
Version: "8.14.1",
Data: fleetapi.ActionUpgradeData{
Version: "8.14.1",
},
},
&fleetapi.ActionUpgrade{
ActionID: "action4",
ActionStartTime: later.Format(time.RFC3339),
Version: "8.13.5",
Data: fleetapi.ActionUpgradeData{
Version: "8.13.5",
},
},
},
expectedDetails: &details.Details{
Expand All @@ -723,8 +731,10 @@ func TestReportNextScheduledUpgrade(t *testing.T) {
actions: []fleetapi.Action{
&fleetapi.ActionUpgrade{
ActionID: "action1",
Version: "8.13.2",
ActionStartTime: "invalid",
Data: fleetapi.ActionUpgradeData{
Version: "8.13.2",
},
},
},
expectedErrLogMsg: "failed to get start time for scheduled upgrade action [id = action1]",
Expand Down
2 changes: 0 additions & 2 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,9 @@ type agentInfo interface {
}

type stateStore interface {
Add(fleetapi.Action)
AckToken() string
SetAckToken(ackToken string)
Save() error
Actions() []fleetapi.Action
}

type FleetGateway struct {
Expand Down
17 changes: 6 additions & 11 deletions internal/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func newManagedConfigManager(
// Create the state store that will persist the last good policy change on disk.
stateStore, err := store.NewStateStoreWithMigration(ctx, log, paths.AgentActionStoreFile(), paths.AgentStateStoreFile())
if err != nil {
return nil, errors.New(err, fmt.Sprintf("fail to read action store '%s'", paths.AgentActionStoreFile()))
return nil, errors.New(err, fmt.Sprintf("fail to read state store '%s'", paths.AgentStateStoreFile()))
}

actionQueue, err := queue.NewActionQueue(stateStore.Queue(), stateStore)
Expand Down Expand Up @@ -158,14 +158,14 @@ func (m *managedConfigManager) Run(ctx context.Context) error {
close(retrierRun)
}()

actions := m.stateStore.Actions()
action := m.stateStore.Action()
stateRestored := false
if len(actions) > 0 && !m.wasUnenrolled() {
if action != nil && !m.wasUnenrolled() {
// TODO(ph) We will need an improvement on fleet, if there is an error while dispatching a
// persisted action on disk we should be able to ask Fleet to get the latest configuration.
// But at the moment this is not possible because the policy change was acked.
m.log.Info("restoring current policy from disk")
m.dispatcher.Dispatch(ctx, m.coord.SetUpgradeDetails, actionAcker, actions...)
m.dispatcher.Dispatch(ctx, m.coord.SetUpgradeDetails, actionAcker, action)
stateRestored = true
}

Expand Down Expand Up @@ -269,13 +269,8 @@ func (m *managedConfigManager) Watch() <-chan coordinator.ConfigChange {
}

func (m *managedConfigManager) wasUnenrolled() bool {
actions := m.stateStore.Actions()
for _, a := range actions {
if a.Type() == "UNENROLL" {
return true
}
}
return false
return m.stateStore.Action() != nil &&
m.stateStore.Action().Type() == fleetapi.ActionTypeUnenroll
}

func (m *managedConfigManager) initFleetServer(ctx context.Context, cfg *configuration.FleetServerConfig) error {
Expand Down
Loading

0 comments on commit b4877f5

Please sign in to comment.