From 97a510aa824baf2f1a8cc31181a01236c77a673e Mon Sep 17 00:00:00 2001 From: ShourieG <105607378+ShourieG@users.noreply.github.com> Date: Fri, 31 Mar 2023 13:57:24 +0530 Subject: [PATCH] [filebeat][gcs] - Added missing locks for safe concurrency (#34914) * added missing locks for safe concurrency * updated asciidoc * updated asciidoc * updated locks for publishing events * added nil state fail safe mechanism --------- Co-authored-by: Denis (cherry picked from commit 62a27007e67c337b3d9e6fd78acffad1799a7c2e) --- CHANGELOG.next.asciidoc | 36 ++++++++++++++++++++++++++ x-pack/filebeat/input/gcs/job.go | 9 +++++++ x-pack/filebeat/input/gcs/scheduler.go | 4 +-- x-pack/filebeat/input/gcs/state.go | 17 +++++++++++- 4 files changed, 63 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 522a05a68f6..1f8344b4bc9 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -42,6 +42,42 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] *Filebeat* +- [Auditbeat System Package] Added support for Apple Silicon chips. {pull}34433[34433] +- [Azure blob storage] Changed logger field name from `container` to `container_name` so that it does not clash + with the ecs field name `container`. {pull}34403[34403] +- [GCS] Added support for more mime types & introduced offset tracking via cursor state. Also added support for + automatic splitting at root level, if root level element is an array. {pull}34155[34155] +- [httpsjon] Improved error handling during pagination with chaining & split processor {pull}34127[34127] +- [Azure blob storage] Added support for more mime types & introduced offset tracking via cursor state. {pull}33981[33981] +- Fix EOF on single line not producing any event. {issue}30436[30436] {pull}33568[33568] +- Fix handling of error in states in direct aws-s3 listing input {issue}33513[33513] {pull}33722[33722] +- Fix `httpjson` input page number initialization and documentation. {pull}33400[33400] +- Add handling of AAA operations for Cisco ASA module. {issue}32257[32257] {pull}32789[32789] +- Fix gc.log always shipped even if gc fileset is disabled {issue}30995[30995] +- Fix handling of empty array in httpjson input. {pull}32001[32001] +- Fix reporting of `filebeat.events.active` in log events such that the current value is always reported instead of the difference from the last value. {pull}33597[33597] +- Fix splitting array of strings/arrays in httpjson input {issue}30345[30345] {pull}33609[33609] +- Fix Google workspace pagination and document ID generation. {pull}33666[33666] +- Fix PANW handling of messages with event.original already set. {issue}33829[33829] {pull}33830[33830] +- Rename identity as identity_name when the value is a string in Azure Platform Logs. {pull}33654[33654] +- Fix 'requires pointer' error while getting cursor metadata. {pull}33956[33956] +- Fix input cancellation handling when HTTP client does not support contexts. {issue}33962[33962] {pull}33968[33968] +- Update mito CEL extension library to v0.0.0-20221207004749-2f0f2875e464 {pull}33974[33974] +- Fix CEL result deserialisation when evaluation fails. {issue}33992[33992] {pull}33996[33996] +- Fix handling of non-200/non-429 status codes. {issue}33999[33999] {pull}34002[34002] +- [azure-eventhub input] Switch the run EPH run mode to non-blocking {pull}34075[34075] +- [google_workspace] Fix pagination and cursor value update. {pull}34274[34274] +- Fix handling of quoted values in auditd module. {issue}22587[22587] {pull}34069[34069] +- Fixing system tests not returning expected content encoding for azure blob storage input. {pull}34412[34412] +- [Azure Logs] Fix authentication_processing_details parsing in sign-in logs. {issue}34330[34330] {pull}34478[34478] +- Prevent Elasticsearch from spewing log warnings about redundant wildcard when setting up ingest pipelines. {issue}34249[34249] {pull}34550[34550] +- Gracefully handle Windows event channel not found errors in winlog input. {issue}30201[30201] {pull}34605[34605] +- Fix the issue of `cometd` input worker getting closed in case of a network connection issue and an EOF error. {issue}34326[34326] {pull}34327[34327] +- Fix for httpjson first_response object throwing false positive errors by making it a flag based object {issue}34747[34747] {pull}34748[34748] +- Fix errors and panics due to re-used processors {pull}34761[34761] +- Add missing Basic Authentication support to CEL input {issue}34609[34609] {pull}34689[34689] +- [Gcs Input] - Added missing locks for safe concurrency {pull}34914[34914] +- Fix the ignore_inactive option being ignored in Filebeat's filestream input {pull}34770[34770] *Heartbeat* diff --git a/x-pack/filebeat/input/gcs/job.go b/x-pack/filebeat/input/gcs/job.go index 3ca5b48a2d7..118e89287ac 100644 --- a/x-pack/filebeat/input/gcs/job.go +++ b/x-pack/filebeat/input/gcs/job.go @@ -15,6 +15,7 @@ import ( "errors" "fmt" "io" + "sync" "time" "unicode" @@ -27,6 +28,8 @@ import ( ) type job struct { + // Mutex lock for concurrent publishes + mu sync.Mutex // gcs bucket handle bucket *storage.BucketHandle // gcs object attribute struct @@ -107,9 +110,12 @@ func (j *job) do(ctx context.Context, id string) { } event.SetID(objectID(j.hash, 0)) j.state.save(j.object.Name, j.object.Updated) + // locks while data is being published to avoid concurrent map read/writes + j.mu.Lock() if err := j.publisher.Publish(event, j.state.checkpoint()); err != nil { j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err) } + j.mu.Unlock() } } @@ -217,9 +223,12 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er // partially saves read state using offset j.state.savePartial(j.object.Name, offset+relativeOffset) } + // locks while data is being published to avoid concurrent map read/writes + j.mu.Lock() if err := j.publisher.Publish(evt, j.state.checkpoint()); err != nil { j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err) } + j.mu.Unlock() } return nil } diff --git a/x-pack/filebeat/input/gcs/scheduler.go b/x-pack/filebeat/input/gcs/scheduler.go index 880771247ff..7feb57f7c1e 100644 --- a/x-pack/filebeat/input/gcs/scheduler.go +++ b/x-pack/filebeat/input/gcs/scheduler.go @@ -86,7 +86,7 @@ func (l *limiter) wait() { l.wg.Wait() } -// release puts pack a worker thread. +// release puts back a worker thread. func (l *limiter) release() { <-l.limit l.wg.Done() @@ -167,7 +167,7 @@ func (s *scheduler) fetchObjectPager(ctx context.Context, pageSize int) *iterato } // moveToLastSeenJob, moves to the latest job position past the last seen job -// Jobs are stored in lexicographical order always , hence the latest position can be found either on the basis of job name or timestamp +// Jobs are stored in lexicographical order always, hence the latest position can be found either on the basis of job name or timestamp func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { var latestJobs []*job jobsToReturn := make([]*job, 0) diff --git a/x-pack/filebeat/input/gcs/state.go b/x-pack/filebeat/input/gcs/state.go index 6b26dfa2dfa..6b2a269481f 100644 --- a/x-pack/filebeat/input/gcs/state.go +++ b/x-pack/filebeat/input/gcs/state.go @@ -73,12 +73,16 @@ func (s *state) save(name string, lastModifiedOn time.Time) { // setRootArray, sets boolean true for objects that have their roots defined as an array type func (s *state) setRootArray(name string) { + s.mu.Lock() s.cp.IsRootArray[name] = true + s.mu.Unlock() } // savePartial, partially saves/updates the current state for cursor checkpoint func (s *state) savePartial(name string, offset int64) { + s.mu.Lock() s.cp.LastProcessedOffset[name] = offset + s.mu.Unlock() } // updateFailedJobs, adds a job name to a failedJobs map, which helps @@ -87,11 +91,11 @@ func (s *state) savePartial(name string, offset int64) { // A failed job will be re-tried a maximum of 3 times after which the // entry is removed from the map func (s *state) updateFailedJobs(jobName string) { + s.mu.Lock() // we do not store partially processed jobs as failed jobs if _, ok := s.cp.LastProcessedOffset[jobName]; ok { return } - s.mu.Lock() s.cp.FailedJobs[jobName]++ if s.cp.FailedJobs[jobName] > maxFailedJobRetries { delete(s.cp.FailedJobs, jobName) @@ -100,7 +104,18 @@ func (s *state) updateFailedJobs(jobName string) { } // setCheckpoint, sets checkpoint from source to current state instance +// If for some reason the current state is empty, assigns new states as +// a fail safe mechanism func (s *state) setCheckpoint(chkpt *Checkpoint) { + if chkpt.FailedJobs == nil { + chkpt.FailedJobs = make(map[string]int) + } + if chkpt.IsRootArray == nil { + chkpt.IsRootArray = make(map[string]bool) + } + if chkpt.LastProcessedOffset == nil { + chkpt.LastProcessedOffset = make(map[string]int64) + } s.cp = chkpt }