diff --git a/changelog/fragments/1732889737-deadline-bulk-flush.yaml b/changelog/fragments/1732889737-deadline-bulk-flush.yaml new file mode 100644 index 000000000..fcecdd134 --- /dev/null +++ b/changelog/fragments/1732889737-deadline-bulk-flush.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Added context deadline around flush bulk queue + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index bf0a7af66..0e872c40f 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -365,14 +365,14 @@ func (ack *AckT) handleActionResult(ctx context.Context, zlog zerolog.Logger, ag // Save action result document if err := dl.CreateActionResult(ctx, ack.bulk, acr); err != nil { - zlog.Error().Err(err).Msg("create action result") + zlog.Error().Err(err).Str(logger.AgentID, agent.Agent.ID).Str(logger.ActionID, action.Id).Msg("create action result") return err } if action.Type == TypeUpgrade { event, _ := ev.AsUpgradeEvent() if err := ack.handleUpgrade(ctx, zlog, agent, event); err != nil { - zlog.Error().Err(err).Msg("handle upgrade event") + zlog.Error().Err(err).Str(logger.AgentID, agent.Agent.ID).Str(logger.ActionID, action.Id).Msg("handle upgrade event") return err } } @@ -634,6 +634,8 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent * zlog.Info(). Str("lastReportedVersion", agent.Agent.Version). Str("upgradedAt", now). + Str(logger.AgentID, agent.Agent.ID). + Str(logger.ActionID, event.ActionId). Msg("ack upgrade") return nil diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 9adbc4791..28ee78b7c 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -254,6 +254,8 @@ func (ct *CheckinT) validateRequest(zlog zerolog.Logger, w http.ResponseWriter, } func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r *http.Request, start time.Time, agent *model.Agent, ver string) error { + zlog = zlog.With(). + Str(logger.AgentID, agent.Id).Logger() validated, err := ct.validateRequest(zlog, w, r, start, agent) if err != nil { return err @@ -338,6 +340,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r actions, ackToken = convertActions(zlog, agent.Id, pendingActions) span, ctx := apm.StartSpan(r.Context(), "longPoll", "process") + if len(actions) == 0 { LOOP: for { diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index 7f28d4451..11c021caf 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -97,13 +97,14 @@ type Bulker struct { } const ( - defaultFlushInterval = time.Second * 5 - defaultFlushThresholdCnt = 32768 - defaultFlushThresholdSz = 1024 * 1024 * 10 - defaultMaxPending = 32 - defaultBlockQueueSz = 32 // Small capacity to allow multiOp to spin fast - defaultAPIKeyMaxParallel = 32 - defaultApikeyMaxReqSize = 100 * 1024 * 1024 + defaultFlushInterval = time.Second * 5 + defaultFlushThresholdCnt = 32768 + defaultFlushThresholdSz = 1024 * 1024 * 10 + defaultMaxPending = 32 + defaultBlockQueueSz = 32 // Small capacity to allow multiOp to spin fast + defaultAPIKeyMaxParallel = 32 + defaultApikeyMaxReqSize = 100 * 1024 * 1024 + defaultFlushContextTimeout = time.Minute * 1 ) func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker { @@ -416,6 +417,7 @@ func (b *Bulker) Run(ctx context.Context) error { Int("itemCnt", itemCnt). Int("byteCnt", byteCnt). Msg("Flush on timer") + err = doFlush() case <-ctx.Done(): @@ -443,7 +445,11 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu Str("queue", queue.Type()). Msg("flushQueue Wait") - if err := w.Acquire(ctx, 1); err != nil { + acquireCtx, cancel := context.WithTimeout(ctx, defaultFlushContextTimeout) + defer cancel() + + if err := w.Acquire(acquireCtx, 1); err != nil { + zerolog.Ctx(ctx).Error().Err(err).Msg("flushQueue Wait error") return err } @@ -458,6 +464,10 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu go func() { start := time.Now() + // deadline prevents bulker being blocked on flush + flushCtx, cancel := context.WithTimeout(ctx, defaultFlushContextTimeout) + defer cancel() + if b.tracer != nil { trans := b.tracer.StartTransaction(fmt.Sprintf("Flush queue %s", queue.Type()), "bulker") trans.Context.SetLabel("queue.size", queue.cnt) @@ -471,13 +481,13 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu var err error switch queue.ty { case kQueueRead, kQueueRefreshRead: - err = b.flushRead(ctx, queue) + err = b.flushRead(flushCtx, queue) case kQueueSearch, kQueueFleetSearch: - err = b.flushSearch(ctx, queue) + err = b.flushSearch(flushCtx, queue) case kQueueAPIKeyUpdate: - err = b.flushUpdateAPIKey(ctx, queue) + err = b.flushUpdateAPIKey(flushCtx, queue) default: - err = b.flushBulk(ctx, queue) + err = b.flushBulk(flushCtx, queue) } if err != nil { @@ -502,8 +512,12 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu func failQueue(queue queueT, err error) { for n := queue.head; n != nil; { next := n.next // 'n' is invalid immediately on channel send - n.ch <- respT{ + select { + case n.ch <- respT{ err: err, + }: + default: + panic("Unexpected blocked response channel on failQueue") } n = next }