Skip to content

Commit

Permalink
Attempt to fix integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-laterman committed Apr 2, 2024
1 parent 6704b25 commit 9732cbb
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 15 deletions.
37 changes: 24 additions & 13 deletions internal/pkg/server/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ import (
"time"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/state"

"github.com/elastic/go-ucfg"
"go.elastic.co/apm/v2"
apmtransport "go.elastic.co/apm/v2/transport"

Expand All @@ -31,10 +29,12 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/gc"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/monitor"
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
"github.com/elastic/fleet-server/v7/internal/pkg/profile"
"github.com/elastic/fleet-server/v7/internal/pkg/scheduler"
"github.com/elastic/fleet-server/v7/internal/pkg/state"
"github.com/elastic/fleet-server/v7/internal/pkg/ver"

"github.com/hashicorp/go-version"
Expand Down Expand Up @@ -196,10 +196,9 @@ LOOP:
select {
case cfg := <-f.cfgCh:
log.Info().Msg("Server configuration update")
if cfg.Inputs == nil {
// cfg only contains updated output retrieved from policy
if cfg.Inputs == nil && cfg.RevisionIdx != 0 { // cfg only contains updated output retrieved from policy
rev := cfg.RevisionIdx
esOutput := config.MergeElasticsearchFromPolicy(curCfg.Output.Elasticsearch, cfg.Output.Elasticsearch)
newCfg.RevisionIdx = cfg.RevisionIdx

// test config
cli, err := es.NewClient(ctx,
Expand All @@ -212,25 +211,37 @@ LOOP:
elasticsearchOptions(curCfg.Inputs[0].Server.Instrumentation.Enabled, f.bi)...,
)
if err != nil {
log.Warn().Err(err).Msg("unable to create elasticsearch client from policy output")
log.Warn().Int64(logger.RevisionIdx, rev).Err(err).Msg("unable to create elasticsearch client from policy output")
continue
}
remoteVersion, err := ver.CheckCompatibility(ctx, cli, f.bi.Version)
if err != nil {
// NOTE The error can indicate a bad network connection, bad TLS settings, etc.
// But if the error is an ErrElasticVersionConflict then something is very wrong
if errors.Is(err, es.ErrElasticVersionConflict) {
log.Error().Err(err).Interface("output", esOutput).Interface("bootstrap", curCfg.Output.Elasticsearch).Str("remote_version", remoteVersion).Msg("Elasticsearch version constraint failed for new output")
log.Error().Err(err).Int64(logger.RevisionIdx, rev).Interface("output", esOutput).Interface("bootstrap", curCfg.Output.Elasticsearch).Str("remote_version", remoteVersion).Msg("Elasticsearch version constraint failed for new output")
} else {
log.Warn().Err(err).Msg("Failed version compatibility check using output from policy")
log.Warn().Err(err).Int64(logger.RevisionIdx, rev).Msg("Failed version compatibility check using output from policy")
}
continue
}
log.Info().Int64(logger.RevisionIdx, cfg.RevisionIdx).Msg("Using output from policy")
newCfg.Output.Elasticsearch = esOutput
} else {
newCfg = cfg
// work around to get a new cfg object based off curCfg
// we override the output with esOutput and have a complete config with a new mutex
tmp, err := ucfg.NewFrom(curCfg, config.DefaultOptions...)
if err != nil {
log.Error().Err(err).Int64(logger.RevisionIdx, rev).Msg("Unable to convert config")
continue
}
err = tmp.Unpack(cfg, config.DefaultOptions...)
if err != nil {
log.Error().Err(err).Int64(logger.RevisionIdx, rev).Msg("Unable to unpack config")
continue
}
log.Info().Int64(logger.RevisionIdx, rev).Msg("Using output from policy")
cfg.Output.Elasticsearch = esOutput
cfg.RevisionIdx = rev
}
newCfg = cfg
case err := <-ech:
f.reporter.UpdateState(client.UnitStateFailed, fmt.Sprintf("Error - %s", err), nil) //nolint:errcheck // unclear on what should we do if updating the status fails?
log.Error().Err(err).Msg("Fleet Server failed")
Expand Down
10 changes: 8 additions & 2 deletions internal/pkg/server/fleet_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,14 @@ func TestServerConfigErrorReload(t *testing.T) {
mReporter.On("UpdateState", client.UnitStateConfiguring, mock.Anything, mock.Anything).Return(nil)
mReporter.On("UpdateState", client.UnitStateHealthy, mock.Anything, mock.Anything).Run(func(_ mock.Arguments) {
// Call cancel to stop the server once it's healthy
cancel()
go func() {
time.Sleep(time.Millisecond * 100)
t.Log("Terminating integration test")
cancel()
}()
}).Return(nil)
mReporter.On("UpdateState", client.UnitStateStopping, mock.Anything, mock.Anything).Return(nil)
mReporter.On("UpdateState", client.UnitStateFailed, context.Canceled, mock.Anything).Return(nil).Maybe()

// set bad config
cfg.Output.Elasticsearch.ServiceToken = "incorrect"
Expand Down Expand Up @@ -463,8 +468,9 @@ func TestServerReloadOutputOnly(t *testing.T) {
err = srv.srv.Reload(ctx, &cfg)
require.NoError(t, err)

successes := successfulOutputMsg.Load()
for i := 0; i < 5; i++ {
if successfulOutputMsg.Load() > 0 {
if successfulOutputMsg.Load() > successes {
break
}
time.Sleep(time.Second)
Expand Down

0 comments on commit 9732cbb

Please sign in to comment.