From 2add88e48ec04a7746dfb6ae1ae6dd56091cee20 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Tue, 8 Oct 2024 14:45:25 +0200 Subject: [PATCH 01/19] use poll timeout in es ctx --- internal/pkg/api/handleCheckin.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 3677ee438..0f415f8f5 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -337,6 +337,9 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r actions, ackToken = convertActions(zlog, agent.Id, pendingActions) span, ctx := apm.StartSpan(r.Context(), "longPoll", "process") + ctx, cancel := context.WithTimeout(ctx, pollDuration) + defer cancel() + if len(actions) == 0 { LOOP: for { @@ -368,7 +371,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r actions = append(actions, *actionResp) break LOOP case <-longPoll.C: - zlog.Trace().Msg("fire long poll") + zlog.Debug().Str(logger.AgentID, agent.Id).Msg("fire long poll") break LOOP case <-tick.C: err := ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, nil, rawComponents, nil, ver, unhealthyReason, false) From 780a146e7d04d86ef65a1cbfe012b25340337a47 Mon Sep 17 00:00:00 2001 From: Jill Guyonnet Date: Wed, 9 Oct 2024 15:36:32 +0200 Subject: [PATCH 02/19] Add some SCALEDEBUG logs --- internal/pkg/api/handleCheckin.go | 13 +++++++++++-- internal/pkg/policy/policy_output.go | 8 ++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 0f415f8f5..b17a0265d 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -337,18 +337,20 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r actions, ackToken = convertActions(zlog, agent.Id, pendingActions) span, ctx := apm.StartSpan(r.Context(), "longPoll", "process") - ctx, cancel := context.WithTimeout(ctx, pollDuration) - defer cancel() + // ctx, cancel := context.WithTimeout(ctx, pollDuration) + // defer cancel() if len(actions) == 0 { LOOP: for { select { case <-ctx.Done(): + zlog.Debug().Str(logger.AgentID, agent.Id).Msg("SCALEDEBUG checkin context done") defer span.End() // If the request context is canceled, the API server is shutting down. // We want to immediately stop the long-poll and return a 200 with the ackToken and no actions. if errors.Is(ctx.Err(), context.Canceled) { + zlog.Debug().Str(logger.AgentID, agent.Id).Msg("SCALEDEBUG checkin context canceled") resp := CheckinResponse{ AckToken: &ackToken, Action: "checkin", @@ -363,6 +365,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r actions = append(actions, acs...) break LOOP case policy := <-sub.Output(): + zlog.Debug().Str(logger.AgentID, agent.Id).Msg("SCALEDEBUG new policy") actionResp, err := processPolicy(ctx, zlog, ct.bulker, agent.Id, policy) if err != nil { span.End() @@ -809,6 +812,7 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a // Repull and decode the agent object. Do not trust the cache. bSpan, bCtx := apm.StartSpan(ctx, "findAgent", "search") + zlog.Debug().Str("agentID", agentID).Msg("SCALEDEBUG start findAgent") agent, err := dl.FindAgent(bCtx, bulker, dl.QueryAgentByID, dl.FieldID, agentID) bSpan.End() if err != nil { @@ -830,6 +834,11 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a } // Iterate through the policy outputs and prepare them for _, policyOutput := range pp.Outputs { + zlog.Debug(). + Str("agentID", agentID). + Str("output_name", policyOutput.Name). + Str("output_type", policyOutput.Type). + Msg("SCALEDEBUG start prepare output") err = policyOutput.Prepare(ctx, zlog, bulker, &agent, data.Outputs) if err != nil { return nil, fmt.Errorf("failed to prepare output %q: %w", diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index 1d099cd8b..1f9ad618c 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -53,6 +53,7 @@ func (p *Output) Prepare(ctx context.Context, zlog zerolog.Logger, bulker bulk.B Str(logger.AgentID, agent.Id). Str(logger.PolicyOutputName, p.Name).Logger() + zlog.Debug().Str("output_type", p.Type).Msg("SCALEDEBUG preparing output") switch p.Type { case OutputTypeElasticsearch: zlog.Debug().Msg("preparing elasticsearch output") @@ -90,6 +91,7 @@ func (p *Output) prepareElasticsearch( agent *model.Agent, outputMap map[string]map[string]interface{}, hasConfigChanged bool) error { + zlog.Debug().Msg("SCALEDEBUG [prepare elasticsearch output] start") // The role is required to do api key management if p.Role == nil { zlog.Error(). @@ -198,6 +200,7 @@ func (p *Output) prepareElasticsearch( } if needUpdateKey { + zlog.Debug().Msg("SCALEDEBUG [prepare elasticsearch output] needUpdateKey") zlog.Debug(). RawJSON("roles", p.Role.Raw). Str("oldHash", output.PermissionsHash). @@ -246,12 +249,15 @@ func (p *Output) prepareElasticsearch( return err } + zlog.Debug().Msg("SCALEDEBUG [prepare elasticsearch output] needUpdateKey before agent update") if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { zlog.Error().Err(err).Msg("fail update agent record") return err } + zlog.Debug().Msg("SCALEDEBUG [prepare elasticsearch output] needUpdateKey after agent update") } else if needNewKey { + zlog.Debug().Msg("SCALEDEBUG [prepare elasticsearch output] needNewKey") zlog.Debug(). RawJSON("fleet.policy.roles", p.Role.Raw). Str("fleet.policy.default.oldHash", output.PermissionsHash). @@ -327,10 +333,12 @@ func (p *Output) prepareElasticsearch( return fmt.Errorf("could not update painless script: %w", err) } + zlog.Debug().Msg("SCALEDEBUG [prepare elasticsearch output] needNewKey before agent update") if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { zlog.Error().Err(err).Msg("fail update agent record") return fmt.Errorf("fail update agent record: %w", err) } + zlog.Debug().Msg("SCALEDEBUG [prepare elasticsearch output] needNewKey after agent update") // Now that all is done, we can update the output on the agent variable // Right not it's more for consistency and to ensure the in-memory agent From 5c83def44baee165fd46d18fc0535237266393db Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Tue, 22 Oct 2024 11:19:10 +0200 Subject: [PATCH 03/19] add agent id to logs --- internal/pkg/api/handleCheckin.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index b17a0265d..f4deffb47 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -253,6 +253,8 @@ func (ct *CheckinT) validateRequest(zlog zerolog.Logger, w http.ResponseWriter, } func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r *http.Request, start time.Time, agent *model.Agent, ver string) error { + zlog = zlog.With(). + Str(logger.AgentID, agent.Id).Logger() validated, err := ct.validateRequest(zlog, w, r, start, agent) if err != nil { return err From ed72b1c9961d6780e65d8515ff7c884f6d23eaef Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Mon, 28 Oct 2024 15:33:24 +0100 Subject: [PATCH 04/19] debug logs in fleet.go --- internal/pkg/server/fleet.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/pkg/server/fleet.go b/internal/pkg/server/fleet.go index 677c8027a..6db43f034 100644 --- a/internal/pkg/server/fleet.go +++ b/internal/pkg/server/fleet.go @@ -90,6 +90,7 @@ func (f *Fleet) Run(ctx context.Context, initCfg *config.Config) error { if err != nil { return fmt.Errorf("encountered error while loading server limits: %w", err) } + log.Info().Msg("DEBUG Fleet Server run") cacheCfg := config.CopyCache(initCfg) log.Info().Interface("cfg", cacheCfg).Msg("Setting cache config options") cache, err := cache.New(cacheCfg) @@ -213,6 +214,7 @@ LOOP: return err case <-ctx.Done(): f.reporter.UpdateState(client.UnitStateStopping, "Stopping", nil) //nolint:errcheck // unclear on what should we do if updating the status fails? + log.Info().Err(ctx.Err()).Msg("DEBUG Fleet Server stopping") break LOOP } } @@ -338,6 +340,7 @@ func initRuntime(cfg *config.Config) { } func (f *Fleet) initBulker(ctx context.Context, tracer *apm.Tracer, cfg *config.Config) (*bulk.Bulker, error) { + zerolog.Ctx(ctx).Info().Msg("DEBUG init bulker") es, err := es.NewClient(ctx, cfg, false, elasticsearchOptions( cfg.Inputs[0].Server.Instrumentation.Enabled, f.bi, )...) @@ -412,6 +415,7 @@ func (f *Fleet) runServer(ctx context.Context, cfg *config.Config) (err error) { case err = <-errCh: case <-ctx.Done(): err = ctx.Err() + zerolog.Ctx(ctx).Info().Err(err).Msg("DEBUG bulker error") } return }) @@ -426,6 +430,7 @@ func (f *Fleet) runServer(ctx context.Context, cfg *config.Config) (err error) { } if err = f.runSubsystems(ctx, cfg, g, bulker, tracer); err != nil { + zerolog.Ctx(ctx).Info().Err(err).Msg("DEBUG runSubsystems error") return err } @@ -525,6 +530,7 @@ func (f *Fleet) runSubsystems(ctx context.Context, cfg *config.Config, g *errgro return err } + zerolog.Ctx(ctx).Info().Msg("DEBUG create bulker") bc := checkin.NewBulk(bulker) g.Go(loggedRunFunc(ctx, "Bulk checkin", bc.Run)) From 7352efe021ea64700952a4958f4ae795b37f3ff8 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Mon, 4 Nov 2024 10:48:13 +0100 Subject: [PATCH 05/19] add default case, log deadline --- internal/pkg/bulk/engine.go | 16 +++++++++++----- internal/pkg/bulk/opApiKey.go | 1 + internal/pkg/bulk/opBulk.go | 4 +++- internal/pkg/bulk/opRead.go | 4 +++- internal/pkg/bulk/opSearch.go | 4 +++- 5 files changed, 21 insertions(+), 8 deletions(-) diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index 7f28d4451..f628fdfb6 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -411,7 +411,7 @@ func (b *Bulker) Run(ctx context.Context) error { } case <-timer.C: - zerolog.Ctx(ctx).Trace(). + zerolog.Ctx(ctx).Debug(). Str("mod", kModBulk). Int("itemCnt", itemCnt). Int("byteCnt", byteCnt). @@ -436,18 +436,20 @@ func (b *Bulker) Run(ctx context.Context) error { func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue queueT) error { start := time.Now() - zerolog.Ctx(ctx).Trace(). + deadline, _ := ctx.Deadline() + zerolog.Ctx(ctx).Debug(). Str("mod", kModBulk). Int("cnt", queue.cnt). Int("szPending", queue.pending). Str("queue", queue.Type()). + Time("deadline", deadline). Msg("flushQueue Wait") if err := w.Acquire(ctx, 1); err != nil { return err } - zerolog.Ctx(ctx).Trace(). + zerolog.Ctx(ctx).Debug(). Str("mod", kModBulk). Int("cnt", queue.cnt). Dur("tdiff", time.Since(start)). @@ -485,7 +487,7 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu apm.CaptureError(ctx, err).Send() } - zerolog.Ctx(ctx).Trace(). + zerolog.Ctx(ctx).Debug(). Err(err). Str("mod", kModBulk). Int("cnt", queue.cnt). @@ -502,8 +504,12 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu func failQueue(queue queueT, err error) { for n := queue.head; n != nil; { next := n.next // 'n' is invalid immediately on channel send - n.ch <- respT{ + select { + case n.ch <- respT{ err: err, + }: + default: + panic("Unexpected blocked response channel on failQueue") } n = next } diff --git a/internal/pkg/bulk/opApiKey.go b/internal/pkg/bulk/opApiKey.go index ca3a50d16..c41abe7c1 100644 --- a/internal/pkg/bulk/opApiKey.go +++ b/internal/pkg/bulk/opApiKey.go @@ -214,6 +214,7 @@ func (b *Bulker) flushUpdateAPIKey(ctx context.Context, queue queueT) error { req := &esapi.SecurityBulkUpdateAPIKeysRequest{ Body: bytes.NewReader(payload), } + zerolog.Ctx(ctx).Debug().Msg("flushUpdateAPIKey: Sending request to Elasticsearch") res, err := req.Do(ctx, b.es) if err != nil { diff --git a/internal/pkg/bulk/opBulk.go b/internal/pkg/bulk/opBulk.go index 9f28d5103..4ea76593c 100644 --- a/internal/pkg/bulk/opBulk.go +++ b/internal/pkg/bulk/opBulk.go @@ -207,6 +207,8 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error { req.Refresh = "true" } + zerolog.Ctx(ctx).Debug().Msg("flushBulk: Sending request to Elasticsearch") + res, err := req.Do(ctx, b.es) if err != nil { zerolog.Ctx(ctx).Error().Err(err).Str("mod", kModBulk).Msg("Fail BulkRequest req.Do") @@ -251,7 +253,7 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error { zerolog.Ctx(ctx).Debug().Err(errors.New(buf.String())).Msg("Bulk call: Es returned an error") } - zerolog.Ctx(ctx).Trace(). + zerolog.Ctx(ctx).Debug(). Err(err). Bool("refresh", queue.ty == kQueueRefreshBulk). Str("mod", kModBulk). diff --git a/internal/pkg/bulk/opRead.go b/internal/pkg/bulk/opRead.go index 657e7f085..d5ca279a3 100644 --- a/internal/pkg/bulk/opRead.go +++ b/internal/pkg/bulk/opRead.go @@ -109,6 +109,8 @@ func (b *Bulker) flushRead(ctx context.Context, queue queueT) error { req.Refresh = &refresh } + zerolog.Ctx(ctx).Debug().Msg("flushRead: Sending mget request to Elasticsearch") + res, err := req.Do(ctx, b.es) if err != nil { @@ -142,7 +144,7 @@ func (b *Bulker) flushRead(ctx context.Context, queue queueT) error { return err } - zerolog.Ctx(ctx).Trace(). + zerolog.Ctx(ctx).Debug(). Err(err). Bool("refresh", refresh). Str("mod", kModBulk). diff --git a/internal/pkg/bulk/opSearch.go b/internal/pkg/bulk/opSearch.go index 14d5252e9..bdf9a234d 100644 --- a/internal/pkg/bulk/opSearch.go +++ b/internal/pkg/bulk/opSearch.go @@ -156,6 +156,8 @@ func (b *Bulker) flushSearch(ctx context.Context, queue queueT) error { err error ) + zerolog.Ctx(ctx).Debug().Msg("flushSearch: Sending request to Elasticsearch") + if queue.ty == kQueueFleetSearch { req := esapi.FleetMsearchRequest{ Body: bytes.NewReader(buf.Bytes()), @@ -199,7 +201,7 @@ func (b *Bulker) flushSearch(ctx context.Context, queue queueT) error { return err } - zerolog.Ctx(ctx).Trace(). + zerolog.Ctx(ctx).Debug(). Err(err). Str("mod", kModBulk). Dur("rtt", time.Since(start)). From a9a82eaae87b7c2d4b54d714d34943a970c8f3cd Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Mon, 4 Nov 2024 14:23:03 +0100 Subject: [PATCH 06/19] 5m timeout --- internal/pkg/bulk/engine.go | 6 +++++- internal/pkg/bulk/opApiKey.go | 2 +- internal/pkg/bulk/opRead.go | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index f628fdfb6..4ec827adf 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -336,6 +336,9 @@ func (b *Bulker) Run(ctx context.Context) error { w := semaphore.NewWeighted(int64(b.opts.maxPending)) + ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + var queues [kNumQueues]queueT var i queueType @@ -436,13 +439,14 @@ func (b *Bulker) Run(ctx context.Context) error { func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue queueT) error { start := time.Now() - deadline, _ := ctx.Deadline() + deadline, ok := ctx.Deadline() zerolog.Ctx(ctx).Debug(). Str("mod", kModBulk). Int("cnt", queue.cnt). Int("szPending", queue.pending). Str("queue", queue.Type()). Time("deadline", deadline). + Bool("hasDeadline", ok). Msg("flushQueue Wait") if err := w.Acquire(ctx, 1); err != nil { diff --git a/internal/pkg/bulk/opApiKey.go b/internal/pkg/bulk/opApiKey.go index c41abe7c1..6258632de 100644 --- a/internal/pkg/bulk/opApiKey.go +++ b/internal/pkg/bulk/opApiKey.go @@ -229,7 +229,7 @@ func (b *Bulker) flushUpdateAPIKey(ctx context.Context, queue queueT) error { return parseError(res, zerolog.Ctx(ctx)) } - zerolog.Ctx(ctx).Debug().Strs("IDs", bulkReq.IDs).RawJSON("role", role).Msg("API Keys updated.") + zerolog.Ctx(ctx).Debug().Strs("IDs", bulkReq.IDs).RawJSON("role", role).Msg("flushUpdateAPIKey") responses[responseIdx] = res.StatusCode for _, id := range idsInBatch { diff --git a/internal/pkg/bulk/opRead.go b/internal/pkg/bulk/opRead.go index d5ca279a3..dc9aa9ace 100644 --- a/internal/pkg/bulk/opRead.go +++ b/internal/pkg/bulk/opRead.go @@ -109,7 +109,7 @@ func (b *Bulker) flushRead(ctx context.Context, queue queueT) error { req.Refresh = &refresh } - zerolog.Ctx(ctx).Debug().Msg("flushRead: Sending mget request to Elasticsearch") + zerolog.Ctx(ctx).Debug().Msg("flushRead: Sending request to Elasticsearch") res, err := req.Do(ctx, b.es) From 6152b0a48cddb03dbca8e6ba13cf2fdd3f95cc09 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Wed, 6 Nov 2024 10:13:06 +0100 Subject: [PATCH 07/19] cleanup logs, move deadline to doFlush --- internal/pkg/api/handleCheckin.go | 13 +------------ internal/pkg/bulk/engine.go | 11 +++++------ internal/pkg/policy/policy_output.go | 8 -------- internal/pkg/server/fleet.go | 6 ------ 4 files changed, 6 insertions(+), 32 deletions(-) diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 7727b9d07..5021d7107 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -340,20 +340,16 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r actions, ackToken = convertActions(zlog, agent.Id, pendingActions) span, ctx := apm.StartSpan(r.Context(), "longPoll", "process") - // ctx, cancel := context.WithTimeout(ctx, pollDuration) - // defer cancel() if len(actions) == 0 { LOOP: for { select { case <-ctx.Done(): - zlog.Debug().Str(logger.AgentID, agent.Id).Msg("SCALEDEBUG checkin context done") defer span.End() // If the request context is canceled, the API server is shutting down. // We want to immediately stop the long-poll and return a 200 with the ackToken and no actions. if errors.Is(ctx.Err(), context.Canceled) { - zlog.Debug().Str(logger.AgentID, agent.Id).Msg("SCALEDEBUG checkin context canceled") resp := CheckinResponse{ AckToken: &ackToken, Action: "checkin", @@ -368,7 +364,6 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r actions = append(actions, acs...) break LOOP case policy := <-sub.Output(): - zlog.Debug().Str(logger.AgentID, agent.Id).Msg("SCALEDEBUG new policy") actionResp, err := processPolicy(ctx, zlog, ct.bulker, agent.Id, policy) if err != nil { span.End() @@ -377,7 +372,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r actions = append(actions, *actionResp) break LOOP case <-longPoll.C: - zlog.Debug().Str(logger.AgentID, agent.Id).Msg("fire long poll") + zlog.Trace().Str(logger.AgentID, agent.Id).Msg("fire long poll") break LOOP case <-tick.C: err := ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, nil, rawComponents, nil, ver, unhealthyReason, false) @@ -815,7 +810,6 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a // Repull and decode the agent object. Do not trust the cache. bSpan, bCtx := apm.StartSpan(ctx, "findAgent", "search") - zlog.Debug().Str("agentID", agentID).Msg("SCALEDEBUG start findAgent") agent, err := dl.FindAgent(bCtx, bulker, dl.QueryAgentByID, dl.FieldID, agentID) bSpan.End() if err != nil { @@ -839,11 +833,6 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a } // Iterate through the policy outputs and prepare them for _, policyOutput := range pp.Outputs { - zlog.Debug(). - Str("agentID", agentID). - Str("output_name", policyOutput.Name). - Str("output_type", policyOutput.Type). - Msg("SCALEDEBUG start prepare output") err = policyOutput.Prepare(ctx, zlog, bulker, &agent, data.Outputs) if err != nil { return nil, fmt.Errorf("failed to prepare output %q: %w", diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index 4ec827adf..2a2fb1c69 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -336,9 +336,6 @@ func (b *Bulker) Run(ctx context.Context) error { w := semaphore.NewWeighted(int64(b.opts.maxPending)) - ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) - defer cancel() - var queues [kNumQueues]queueT var i queueType @@ -351,6 +348,10 @@ func (b *Bulker) Run(ctx context.Context) error { doFlush := func() error { + // deadline prevents bulker being blocked on flush + ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + for i := range queues { q := &queues[i] if q.pending > 0 { @@ -439,17 +440,15 @@ func (b *Bulker) Run(ctx context.Context) error { func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue queueT) error { start := time.Now() - deadline, ok := ctx.Deadline() zerolog.Ctx(ctx).Debug(). Str("mod", kModBulk). Int("cnt", queue.cnt). Int("szPending", queue.pending). Str("queue", queue.Type()). - Time("deadline", deadline). - Bool("hasDeadline", ok). Msg("flushQueue Wait") if err := w.Acquire(ctx, 1); err != nil { + zerolog.Ctx(ctx).Error().Err(err).Msg("flushQueue Wait error") return err } diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index 1f9ad618c..1d099cd8b 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -53,7 +53,6 @@ func (p *Output) Prepare(ctx context.Context, zlog zerolog.Logger, bulker bulk.B Str(logger.AgentID, agent.Id). Str(logger.PolicyOutputName, p.Name).Logger() - zlog.Debug().Str("output_type", p.Type).Msg("SCALEDEBUG preparing output") switch p.Type { case OutputTypeElasticsearch: zlog.Debug().Msg("preparing elasticsearch output") @@ -91,7 +90,6 @@ func (p *Output) prepareElasticsearch( agent *model.Agent, outputMap map[string]map[string]interface{}, hasConfigChanged bool) error { - zlog.Debug().Msg("SCALEDEBUG [prepare elasticsearch output] start") // The role is required to do api key management if p.Role == nil { zlog.Error(). @@ -200,7 +198,6 @@ func (p *Output) prepareElasticsearch( } if needUpdateKey { - zlog.Debug().Msg("SCALEDEBUG [prepare elasticsearch output] needUpdateKey") zlog.Debug(). RawJSON("roles", p.Role.Raw). Str("oldHash", output.PermissionsHash). @@ -249,15 +246,12 @@ func (p *Output) prepareElasticsearch( return err } - zlog.Debug().Msg("SCALEDEBUG [prepare elasticsearch output] needUpdateKey before agent update") if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { zlog.Error().Err(err).Msg("fail update agent record") return err } - zlog.Debug().Msg("SCALEDEBUG [prepare elasticsearch output] needUpdateKey after agent update") } else if needNewKey { - zlog.Debug().Msg("SCALEDEBUG [prepare elasticsearch output] needNewKey") zlog.Debug(). RawJSON("fleet.policy.roles", p.Role.Raw). Str("fleet.policy.default.oldHash", output.PermissionsHash). @@ -333,12 +327,10 @@ func (p *Output) prepareElasticsearch( return fmt.Errorf("could not update painless script: %w", err) } - zlog.Debug().Msg("SCALEDEBUG [prepare elasticsearch output] needNewKey before agent update") if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { zlog.Error().Err(err).Msg("fail update agent record") return fmt.Errorf("fail update agent record: %w", err) } - zlog.Debug().Msg("SCALEDEBUG [prepare elasticsearch output] needNewKey after agent update") // Now that all is done, we can update the output on the agent variable // Right not it's more for consistency and to ensure the in-memory agent diff --git a/internal/pkg/server/fleet.go b/internal/pkg/server/fleet.go index 6db43f034..677c8027a 100644 --- a/internal/pkg/server/fleet.go +++ b/internal/pkg/server/fleet.go @@ -90,7 +90,6 @@ func (f *Fleet) Run(ctx context.Context, initCfg *config.Config) error { if err != nil { return fmt.Errorf("encountered error while loading server limits: %w", err) } - log.Info().Msg("DEBUG Fleet Server run") cacheCfg := config.CopyCache(initCfg) log.Info().Interface("cfg", cacheCfg).Msg("Setting cache config options") cache, err := cache.New(cacheCfg) @@ -214,7 +213,6 @@ LOOP: return err case <-ctx.Done(): f.reporter.UpdateState(client.UnitStateStopping, "Stopping", nil) //nolint:errcheck // unclear on what should we do if updating the status fails? - log.Info().Err(ctx.Err()).Msg("DEBUG Fleet Server stopping") break LOOP } } @@ -340,7 +338,6 @@ func initRuntime(cfg *config.Config) { } func (f *Fleet) initBulker(ctx context.Context, tracer *apm.Tracer, cfg *config.Config) (*bulk.Bulker, error) { - zerolog.Ctx(ctx).Info().Msg("DEBUG init bulker") es, err := es.NewClient(ctx, cfg, false, elasticsearchOptions( cfg.Inputs[0].Server.Instrumentation.Enabled, f.bi, )...) @@ -415,7 +412,6 @@ func (f *Fleet) runServer(ctx context.Context, cfg *config.Config) (err error) { case err = <-errCh: case <-ctx.Done(): err = ctx.Err() - zerolog.Ctx(ctx).Info().Err(err).Msg("DEBUG bulker error") } return }) @@ -430,7 +426,6 @@ func (f *Fleet) runServer(ctx context.Context, cfg *config.Config) (err error) { } if err = f.runSubsystems(ctx, cfg, g, bulker, tracer); err != nil { - zerolog.Ctx(ctx).Info().Err(err).Msg("DEBUG runSubsystems error") return err } @@ -530,7 +525,6 @@ func (f *Fleet) runSubsystems(ctx context.Context, cfg *config.Config, g *errgro return err } - zerolog.Ctx(ctx).Info().Msg("DEBUG create bulker") bc := checkin.NewBulk(bulker) g.Go(loggedRunFunc(ctx, "Bulk checkin", bc.Run)) From 17de4026a287344f634dfb36e100ff2c0c064de8 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Wed, 6 Nov 2024 14:23:52 +0100 Subject: [PATCH 08/19] move timeout before doFlush --- internal/pkg/api/handleAck.go | 7 +++++-- internal/pkg/bulk/engine.go | 19 +++++++++++-------- internal/pkg/bulk/opApiKey.go | 4 ++-- internal/pkg/bulk/opBulk.go | 4 ++-- 4 files changed, 20 insertions(+), 14 deletions(-) diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index bf0a7af66..69e9e85b4 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -265,6 +265,7 @@ func (ack *AckT) handleAckEvents(ctx context.Context, zlog zerolog.Logger, agent policyAcks = append(policyAcks, event.ActionId) policyIdxs = append(policyIdxs, n) } + log.Debug().Msg("ack policy change") // Set OK status, this can be overwritten in case of the errors later when the policy change events acked setResult(n, http.StatusOK) span.End() @@ -365,14 +366,14 @@ func (ack *AckT) handleActionResult(ctx context.Context, zlog zerolog.Logger, ag // Save action result document if err := dl.CreateActionResult(ctx, ack.bulk, acr); err != nil { - zlog.Error().Err(err).Msg("create action result") + zlog.Error().Err(err).Str(logger.AgentID, agent.Agent.ID).Str(logger.ActionID, action.Id).Msg("create action result") return err } if action.Type == TypeUpgrade { event, _ := ev.AsUpgradeEvent() if err := ack.handleUpgrade(ctx, zlog, agent, event); err != nil { - zlog.Error().Err(err).Msg("handle upgrade event") + zlog.Error().Err(err).Str(logger.AgentID, agent.Agent.ID).Str(logger.ActionID, action.Id).Msg("handle upgrade event") return err } } @@ -634,6 +635,8 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent * zlog.Info(). Str("lastReportedVersion", agent.Agent.Version). Str("upgradedAt", now). + Str(logger.AgentID, agent.Agent.ID). + Str(logger.ActionID, event.ActionId). Msg("ack upgrade") return nil diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index 2a2fb1c69..db6790d3a 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -346,18 +346,14 @@ func (b *Bulker) Run(ctx context.Context) error { var itemCnt int var byteCnt int - doFlush := func() error { - - // deadline prevents bulker being blocked on flush - ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) - defer cancel() + doFlush := func(flushCtx context.Context) error { for i := range queues { q := &queues[i] if q.pending > 0 { // Pass queue structure by value - if err := b.flushQueue(ctx, w, *q); err != nil { + if err := b.flushQueue(flushCtx, w, *q); err != nil { return err } @@ -409,7 +405,10 @@ func (b *Bulker) Run(ctx context.Context) error { Int("byteCnt", byteCnt). Msg("Flush on threshold") - err = doFlush() + // deadline prevents bulker being blocked on flush + flushCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + err = doFlush(flushCtx) stopTimer(timer) } @@ -420,7 +419,11 @@ func (b *Bulker) Run(ctx context.Context) error { Int("itemCnt", itemCnt). Int("byteCnt", byteCnt). Msg("Flush on timer") - err = doFlush() + + // deadline prevents bulker being blocked on flush + flushCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + err = doFlush(flushCtx) case <-ctx.Done(): err = ctx.Err() diff --git a/internal/pkg/bulk/opApiKey.go b/internal/pkg/bulk/opApiKey.go index 6258632de..0d64e2536 100644 --- a/internal/pkg/bulk/opApiKey.go +++ b/internal/pkg/bulk/opApiKey.go @@ -218,7 +218,7 @@ func (b *Bulker) flushUpdateAPIKey(ctx context.Context, queue queueT) error { res, err := req.Do(ctx, b.es) if err != nil { - zerolog.Ctx(ctx).Error().Err(err).Msg("Error sending bulk API Key update request to Elasticsearch") + zerolog.Ctx(ctx).Error().Err(err).Msg("flushUpdateAPIKey: Error sending bulk API Key update request to Elasticsearch") return err } if res.Body != nil { @@ -229,7 +229,7 @@ func (b *Bulker) flushUpdateAPIKey(ctx context.Context, queue queueT) error { return parseError(res, zerolog.Ctx(ctx)) } - zerolog.Ctx(ctx).Debug().Strs("IDs", bulkReq.IDs).RawJSON("role", role).Msg("flushUpdateAPIKey") + zerolog.Ctx(ctx).Debug().Strs("IDs", bulkReq.IDs).RawJSON("role", role).Msg("flushUpdateAPIKey: API Keys updated.") responses[responseIdx] = res.StatusCode for _, id := range idsInBatch { diff --git a/internal/pkg/bulk/opBulk.go b/internal/pkg/bulk/opBulk.go index 4ea76593c..881eebb49 100644 --- a/internal/pkg/bulk/opBulk.go +++ b/internal/pkg/bulk/opBulk.go @@ -211,7 +211,7 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error { res, err := req.Do(ctx, b.es) if err != nil { - zerolog.Ctx(ctx).Error().Err(err).Str("mod", kModBulk).Msg("Fail BulkRequest req.Do") + zerolog.Ctx(ctx).Error().Err(err).Str("mod", kModBulk).Msg("flushBulk: Fail BulkRequest req.Do") return err } @@ -220,7 +220,7 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error { } if res.IsError() { - zerolog.Ctx(ctx).Error().Str("mod", kModBulk).Str("error.message", res.String()).Msg("Fail BulkRequest result") + zerolog.Ctx(ctx).Error().Str("mod", kModBulk).Str("error.message", res.String()).Msg("flushBulk: Fail BulkRequest result") return parseError(res, zerolog.Ctx(ctx)) } From dcad89b9fe659d547481868e0d1ca55ad7b86a56 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Wed, 6 Nov 2024 16:51:26 +0100 Subject: [PATCH 09/19] cleanup logs --- internal/pkg/api/handleCheckin.go | 2 +- internal/pkg/bulk/engine.go | 8 ++++---- internal/pkg/bulk/opApiKey.go | 5 ++--- internal/pkg/bulk/opBulk.go | 8 +++----- internal/pkg/bulk/opRead.go | 4 +--- internal/pkg/bulk/opSearch.go | 4 +--- 6 files changed, 12 insertions(+), 19 deletions(-) diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 5021d7107..28ee78b7c 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -372,7 +372,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r actions = append(actions, *actionResp) break LOOP case <-longPoll.C: - zlog.Trace().Str(logger.AgentID, agent.Id).Msg("fire long poll") + zlog.Trace().Msg("fire long poll") break LOOP case <-tick.C: err := ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, nil, rawComponents, nil, ver, unhealthyReason, false) diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index db6790d3a..b255534d9 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -414,7 +414,7 @@ func (b *Bulker) Run(ctx context.Context) error { } case <-timer.C: - zerolog.Ctx(ctx).Debug(). + zerolog.Ctx(ctx).Trace(). Str("mod", kModBulk). Int("itemCnt", itemCnt). Int("byteCnt", byteCnt). @@ -443,7 +443,7 @@ func (b *Bulker) Run(ctx context.Context) error { func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue queueT) error { start := time.Now() - zerolog.Ctx(ctx).Debug(). + zerolog.Ctx(ctx).Trace(). Str("mod", kModBulk). Int("cnt", queue.cnt). Int("szPending", queue.pending). @@ -455,7 +455,7 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu return err } - zerolog.Ctx(ctx).Debug(). + zerolog.Ctx(ctx).Trace(). Str("mod", kModBulk). Int("cnt", queue.cnt). Dur("tdiff", time.Since(start)). @@ -493,7 +493,7 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu apm.CaptureError(ctx, err).Send() } - zerolog.Ctx(ctx).Debug(). + zerolog.Ctx(ctx).Trace(). Err(err). Str("mod", kModBulk). Int("cnt", queue.cnt). diff --git a/internal/pkg/bulk/opApiKey.go b/internal/pkg/bulk/opApiKey.go index 0d64e2536..ca3a50d16 100644 --- a/internal/pkg/bulk/opApiKey.go +++ b/internal/pkg/bulk/opApiKey.go @@ -214,11 +214,10 @@ func (b *Bulker) flushUpdateAPIKey(ctx context.Context, queue queueT) error { req := &esapi.SecurityBulkUpdateAPIKeysRequest{ Body: bytes.NewReader(payload), } - zerolog.Ctx(ctx).Debug().Msg("flushUpdateAPIKey: Sending request to Elasticsearch") res, err := req.Do(ctx, b.es) if err != nil { - zerolog.Ctx(ctx).Error().Err(err).Msg("flushUpdateAPIKey: Error sending bulk API Key update request to Elasticsearch") + zerolog.Ctx(ctx).Error().Err(err).Msg("Error sending bulk API Key update request to Elasticsearch") return err } if res.Body != nil { @@ -229,7 +228,7 @@ func (b *Bulker) flushUpdateAPIKey(ctx context.Context, queue queueT) error { return parseError(res, zerolog.Ctx(ctx)) } - zerolog.Ctx(ctx).Debug().Strs("IDs", bulkReq.IDs).RawJSON("role", role).Msg("flushUpdateAPIKey: API Keys updated.") + zerolog.Ctx(ctx).Debug().Strs("IDs", bulkReq.IDs).RawJSON("role", role).Msg("API Keys updated.") responses[responseIdx] = res.StatusCode for _, id := range idsInBatch { diff --git a/internal/pkg/bulk/opBulk.go b/internal/pkg/bulk/opBulk.go index 881eebb49..9f28d5103 100644 --- a/internal/pkg/bulk/opBulk.go +++ b/internal/pkg/bulk/opBulk.go @@ -207,11 +207,9 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error { req.Refresh = "true" } - zerolog.Ctx(ctx).Debug().Msg("flushBulk: Sending request to Elasticsearch") - res, err := req.Do(ctx, b.es) if err != nil { - zerolog.Ctx(ctx).Error().Err(err).Str("mod", kModBulk).Msg("flushBulk: Fail BulkRequest req.Do") + zerolog.Ctx(ctx).Error().Err(err).Str("mod", kModBulk).Msg("Fail BulkRequest req.Do") return err } @@ -220,7 +218,7 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error { } if res.IsError() { - zerolog.Ctx(ctx).Error().Str("mod", kModBulk).Str("error.message", res.String()).Msg("flushBulk: Fail BulkRequest result") + zerolog.Ctx(ctx).Error().Str("mod", kModBulk).Str("error.message", res.String()).Msg("Fail BulkRequest result") return parseError(res, zerolog.Ctx(ctx)) } @@ -253,7 +251,7 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error { zerolog.Ctx(ctx).Debug().Err(errors.New(buf.String())).Msg("Bulk call: Es returned an error") } - zerolog.Ctx(ctx).Debug(). + zerolog.Ctx(ctx).Trace(). Err(err). Bool("refresh", queue.ty == kQueueRefreshBulk). Str("mod", kModBulk). diff --git a/internal/pkg/bulk/opRead.go b/internal/pkg/bulk/opRead.go index dc9aa9ace..657e7f085 100644 --- a/internal/pkg/bulk/opRead.go +++ b/internal/pkg/bulk/opRead.go @@ -109,8 +109,6 @@ func (b *Bulker) flushRead(ctx context.Context, queue queueT) error { req.Refresh = &refresh } - zerolog.Ctx(ctx).Debug().Msg("flushRead: Sending request to Elasticsearch") - res, err := req.Do(ctx, b.es) if err != nil { @@ -144,7 +142,7 @@ func (b *Bulker) flushRead(ctx context.Context, queue queueT) error { return err } - zerolog.Ctx(ctx).Debug(). + zerolog.Ctx(ctx).Trace(). Err(err). Bool("refresh", refresh). Str("mod", kModBulk). diff --git a/internal/pkg/bulk/opSearch.go b/internal/pkg/bulk/opSearch.go index bdf9a234d..14d5252e9 100644 --- a/internal/pkg/bulk/opSearch.go +++ b/internal/pkg/bulk/opSearch.go @@ -156,8 +156,6 @@ func (b *Bulker) flushSearch(ctx context.Context, queue queueT) error { err error ) - zerolog.Ctx(ctx).Debug().Msg("flushSearch: Sending request to Elasticsearch") - if queue.ty == kQueueFleetSearch { req := esapi.FleetMsearchRequest{ Body: bytes.NewReader(buf.Bytes()), @@ -201,7 +199,7 @@ func (b *Bulker) flushSearch(ctx context.Context, queue queueT) error { return err } - zerolog.Ctx(ctx).Debug(). + zerolog.Ctx(ctx).Trace(). Err(err). Str("mod", kModBulk). Dur("rtt", time.Since(start)). From 6f82ac4ef77d2ba37e45d8da6743aa54a6f4e159 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Thu, 7 Nov 2024 11:55:28 +0100 Subject: [PATCH 10/19] extracted const --- internal/pkg/bulk/engine.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index b255534d9..4059deb48 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -97,13 +97,14 @@ type Bulker struct { } const ( - defaultFlushInterval = time.Second * 5 - defaultFlushThresholdCnt = 32768 - defaultFlushThresholdSz = 1024 * 1024 * 10 - defaultMaxPending = 32 - defaultBlockQueueSz = 32 // Small capacity to allow multiOp to spin fast - defaultAPIKeyMaxParallel = 32 - defaultApikeyMaxReqSize = 100 * 1024 * 1024 + defaultFlushInterval = time.Second * 5 + defaultFlushThresholdCnt = 32768 + defaultFlushThresholdSz = 1024 * 1024 * 10 + defaultMaxPending = 32 + defaultBlockQueueSz = 32 // Small capacity to allow multiOp to spin fast + defaultAPIKeyMaxParallel = 32 + defaultApikeyMaxReqSize = 100 * 1024 * 1024 + defaultFlushContextTimeout = time.Minute * 1 ) func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker { @@ -406,7 +407,7 @@ func (b *Bulker) Run(ctx context.Context) error { Msg("Flush on threshold") // deadline prevents bulker being blocked on flush - flushCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + flushCtx, cancel := context.WithTimeout(ctx, defaultFlushContextTimeout) defer cancel() err = doFlush(flushCtx) @@ -421,7 +422,7 @@ func (b *Bulker) Run(ctx context.Context) error { Msg("Flush on timer") // deadline prevents bulker being blocked on flush - flushCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + flushCtx, cancel := context.WithTimeout(ctx, defaultFlushContextTimeout) defer cancel() err = doFlush(flushCtx) From 2fcfcd0aee8cf33cc04f6ec98061f697f2b95f7a Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Fri, 8 Nov 2024 10:07:08 +0100 Subject: [PATCH 11/19] remove log --- internal/pkg/api/handleAck.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index 69e9e85b4..0e872c40f 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -265,7 +265,6 @@ func (ack *AckT) handleAckEvents(ctx context.Context, zlog zerolog.Logger, agent policyAcks = append(policyAcks, event.ActionId) policyIdxs = append(policyIdxs, n) } - log.Debug().Msg("ack policy change") // Set OK status, this can be overwritten in case of the errors later when the policy change events acked setResult(n, http.StatusOK) span.End() From 7bb949e5963b5b183dbdb486045158cafda087a9 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Fri, 8 Nov 2024 15:16:13 +0100 Subject: [PATCH 12/19] exit bulker on checkin error --- internal/pkg/checkin/bulk.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/pkg/checkin/bulk.go b/internal/pkg/checkin/bulk.go index 1cc27f201..cd7345a2c 100644 --- a/internal/pkg/checkin/bulk.go +++ b/internal/pkg/checkin/bulk.go @@ -153,6 +153,7 @@ LOOP: case <-tick.C: if err = bc.flush(ctx); err != nil { zerolog.Ctx(ctx).Error().Err(err).Msg("Eat bulk checkin error; Keep on truckin'") + break LOOP } case <-ctx.Done(): From e54562706c2133c253213b988ce7eb7cc33ede63 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Fri, 8 Nov 2024 15:57:35 +0100 Subject: [PATCH 13/19] update to latest stack snapshot --- dev-tools/integration/.env | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev-tools/integration/.env b/dev-tools/integration/.env index 1b8aa273d..48ec2e991 100644 --- a/dev-tools/integration/.env +++ b/dev-tools/integration/.env @@ -1,6 +1,6 @@ # If you use change this version without a pinned one, please update # .ci/bump-elastic-stack-snapshot.yml or .github/workflows/bump-golang.yml -ELASTICSEARCH_VERSION=9.0.0-32892611-SNAPSHOT +ELASTICSEARCH_VERSION=9.0.0-00f0da10-SNAPSHOT ELASTICSEARCH_USERNAME=elastic ELASTICSEARCH_PASSWORD=changeme TEST_ELASTICSEARCH_HOSTS=localhost:9200 From 3d5f55760aad65708c4d0ac1e3c1cd07855936cf Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Mon, 11 Nov 2024 09:12:08 +0100 Subject: [PATCH 14/19] revert break LOOP --- internal/pkg/checkin/bulk.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/pkg/checkin/bulk.go b/internal/pkg/checkin/bulk.go index cd7345a2c..1cc27f201 100644 --- a/internal/pkg/checkin/bulk.go +++ b/internal/pkg/checkin/bulk.go @@ -153,7 +153,6 @@ LOOP: case <-tick.C: if err = bc.flush(ctx); err != nil { zerolog.Ctx(ctx).Error().Err(err).Msg("Eat bulk checkin error; Keep on truckin'") - break LOOP } case <-ctx.Done(): From 11b16cc90a581522e419e6137069bc056243ff27 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Tue, 26 Nov 2024 13:24:47 +0100 Subject: [PATCH 15/19] move deadline inside doFlush --- internal/pkg/bulk/engine.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index 4059deb48..9d3c0ff55 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -421,10 +421,7 @@ func (b *Bulker) Run(ctx context.Context) error { Int("byteCnt", byteCnt). Msg("Flush on timer") - // deadline prevents bulker being blocked on flush - flushCtx, cancel := context.WithTimeout(ctx, defaultFlushContextTimeout) - defer cancel() - err = doFlush(flushCtx) + err = doFlush(ctx) case <-ctx.Done(): err = ctx.Err() @@ -451,6 +448,9 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu Str("queue", queue.Type()). Msg("flushQueue Wait") + ctx, cancel = context.WithTimeout(ctx, defaultFlushContextTimeout) + defer cancel() + if err := w.Acquire(ctx, 1); err != nil { zerolog.Ctx(ctx).Error().Err(err).Msg("flushQueue Wait error") return err @@ -467,6 +467,10 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu go func() { start := time.Now() + // deadline prevents bulker being blocked on flush + ctx, cancel = context.WithTimeout(ctx, defaultFlushContextTimeout) + defer cancel() + if b.tracer != nil { trans := b.tracer.StartTransaction(fmt.Sprintf("Flush queue %s", queue.Type()), "bulker") trans.Context.SetLabel("queue.size", queue.cnt) From a4525ed6b1c23e1b87f7ea775e77ba7e3a30a1b7 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Tue, 26 Nov 2024 14:05:20 +0100 Subject: [PATCH 16/19] fix cancel --- internal/pkg/bulk/engine.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index 9d3c0ff55..01b405f97 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -448,7 +448,7 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu Str("queue", queue.Type()). Msg("flushQueue Wait") - ctx, cancel = context.WithTimeout(ctx, defaultFlushContextTimeout) + ctx, cancel := context.WithTimeout(ctx, defaultFlushContextTimeout) defer cancel() if err := w.Acquire(ctx, 1); err != nil { @@ -468,7 +468,7 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu start := time.Now() // deadline prevents bulker being blocked on flush - ctx, cancel = context.WithTimeout(ctx, defaultFlushContextTimeout) + ctx, cancel := context.WithTimeout(ctx, defaultFlushContextTimeout) defer cancel() if b.tracer != nil { From 713cb2579cb48c73342a41c2d1e3b9c08652d1f2 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Tue, 26 Nov 2024 14:12:11 +0100 Subject: [PATCH 17/19] remove doFlush param --- internal/pkg/bulk/engine.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index 01b405f97..b79558085 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -347,14 +347,14 @@ func (b *Bulker) Run(ctx context.Context) error { var itemCnt int var byteCnt int - doFlush := func(flushCtx context.Context) error { + doFlush := func() error { for i := range queues { q := &queues[i] if q.pending > 0 { // Pass queue structure by value - if err := b.flushQueue(flushCtx, w, *q); err != nil { + if err := b.flushQueue(ctx, w, *q); err != nil { return err } @@ -406,10 +406,7 @@ func (b *Bulker) Run(ctx context.Context) error { Int("byteCnt", byteCnt). Msg("Flush on threshold") - // deadline prevents bulker being blocked on flush - flushCtx, cancel := context.WithTimeout(ctx, defaultFlushContextTimeout) - defer cancel() - err = doFlush(flushCtx) + err = doFlush() stopTimer(timer) } @@ -421,7 +418,7 @@ func (b *Bulker) Run(ctx context.Context) error { Int("byteCnt", byteCnt). Msg("Flush on timer") - err = doFlush(ctx) + err = doFlush() case <-ctx.Done(): err = ctx.Err() From b18c952442f26ff408c9ef7c8bf9c79c9e5a0ee6 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Fri, 29 Nov 2024 10:18:09 +0100 Subject: [PATCH 18/19] separate context --- internal/pkg/bulk/engine.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index b79558085..11c021caf 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -445,10 +445,10 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu Str("queue", queue.Type()). Msg("flushQueue Wait") - ctx, cancel := context.WithTimeout(ctx, defaultFlushContextTimeout) + acquireCtx, cancel := context.WithTimeout(ctx, defaultFlushContextTimeout) defer cancel() - if err := w.Acquire(ctx, 1); err != nil { + if err := w.Acquire(acquireCtx, 1); err != nil { zerolog.Ctx(ctx).Error().Err(err).Msg("flushQueue Wait error") return err } @@ -465,7 +465,7 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu start := time.Now() // deadline prevents bulker being blocked on flush - ctx, cancel := context.WithTimeout(ctx, defaultFlushContextTimeout) + flushCtx, cancel := context.WithTimeout(ctx, defaultFlushContextTimeout) defer cancel() if b.tracer != nil { @@ -481,13 +481,13 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu var err error switch queue.ty { case kQueueRead, kQueueRefreshRead: - err = b.flushRead(ctx, queue) + err = b.flushRead(flushCtx, queue) case kQueueSearch, kQueueFleetSearch: - err = b.flushSearch(ctx, queue) + err = b.flushSearch(flushCtx, queue) case kQueueAPIKeyUpdate: - err = b.flushUpdateAPIKey(ctx, queue) + err = b.flushUpdateAPIKey(flushCtx, queue) default: - err = b.flushBulk(ctx, queue) + err = b.flushBulk(flushCtx, queue) } if err != nil { From 2bcf456402fc8c78e94e4e8d4dadd43b7eb34ac8 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Fri, 29 Nov 2024 15:17:39 +0100 Subject: [PATCH 19/19] added changelog --- .../1732889737-deadline-bulk-flush.yaml | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 changelog/fragments/1732889737-deadline-bulk-flush.yaml diff --git a/changelog/fragments/1732889737-deadline-bulk-flush.yaml b/changelog/fragments/1732889737-deadline-bulk-flush.yaml new file mode 100644 index 000000000..fcecdd134 --- /dev/null +++ b/changelog/fragments/1732889737-deadline-bulk-flush.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Added context deadline around flush bulk queue + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234