Skip to content

Commit

Permalink
[filebeat][gcs] - Added missing locks for safe concurrency (#34914)
Browse files Browse the repository at this point in the history
* added missing locks for safe concurrency

* updated asciidoc

* updated asciidoc

* updated locks for publishing events

* added nil state fail safe mechanism

---------

Co-authored-by: Denis <denis.rechkunov@elastic.co>
(cherry picked from commit 62a2700)
  • Loading branch information
ShourieG authored and mergify[bot] committed Mar 31, 2023
1 parent 9be62b9 commit 97a510a
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 3 deletions.
36 changes: 36 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,42 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]


*Filebeat*
- [Auditbeat System Package] Added support for Apple Silicon chips. {pull}34433[34433]
- [Azure blob storage] Changed logger field name from `container` to `container_name` so that it does not clash
with the ecs field name `container`. {pull}34403[34403]
- [GCS] Added support for more mime types & introduced offset tracking via cursor state. Also added support for
automatic splitting at root level, if root level element is an array. {pull}34155[34155]
- [httpsjon] Improved error handling during pagination with chaining & split processor {pull}34127[34127]
- [Azure blob storage] Added support for more mime types & introduced offset tracking via cursor state. {pull}33981[33981]
- Fix EOF on single line not producing any event. {issue}30436[30436] {pull}33568[33568]
- Fix handling of error in states in direct aws-s3 listing input {issue}33513[33513] {pull}33722[33722]
- Fix `httpjson` input page number initialization and documentation. {pull}33400[33400]
- Add handling of AAA operations for Cisco ASA module. {issue}32257[32257] {pull}32789[32789]
- Fix gc.log always shipped even if gc fileset is disabled {issue}30995[30995]
- Fix handling of empty array in httpjson input. {pull}32001[32001]
- Fix reporting of `filebeat.events.active` in log events such that the current value is always reported instead of the difference from the last value. {pull}33597[33597]
- Fix splitting array of strings/arrays in httpjson input {issue}30345[30345] {pull}33609[33609]
- Fix Google workspace pagination and document ID generation. {pull}33666[33666]
- Fix PANW handling of messages with event.original already set. {issue}33829[33829] {pull}33830[33830]
- Rename identity as identity_name when the value is a string in Azure Platform Logs. {pull}33654[33654]
- Fix 'requires pointer' error while getting cursor metadata. {pull}33956[33956]
- Fix input cancellation handling when HTTP client does not support contexts. {issue}33962[33962] {pull}33968[33968]
- Update mito CEL extension library to v0.0.0-20221207004749-2f0f2875e464 {pull}33974[33974]
- Fix CEL result deserialisation when evaluation fails. {issue}33992[33992] {pull}33996[33996]
- Fix handling of non-200/non-429 status codes. {issue}33999[33999] {pull}34002[34002]
- [azure-eventhub input] Switch the run EPH run mode to non-blocking {pull}34075[34075]
- [google_workspace] Fix pagination and cursor value update. {pull}34274[34274]
- Fix handling of quoted values in auditd module. {issue}22587[22587] {pull}34069[34069]
- Fixing system tests not returning expected content encoding for azure blob storage input. {pull}34412[34412]
- [Azure Logs] Fix authentication_processing_details parsing in sign-in logs. {issue}34330[34330] {pull}34478[34478]
- Prevent Elasticsearch from spewing log warnings about redundant wildcard when setting up ingest pipelines. {issue}34249[34249] {pull}34550[34550]
- Gracefully handle Windows event channel not found errors in winlog input. {issue}30201[30201] {pull}34605[34605]
- Fix the issue of `cometd` input worker getting closed in case of a network connection issue and an EOF error. {issue}34326[34326] {pull}34327[34327]
- Fix for httpjson first_response object throwing false positive errors by making it a flag based object {issue}34747[34747] {pull}34748[34748]
- Fix errors and panics due to re-used processors {pull}34761[34761]
- Add missing Basic Authentication support to CEL input {issue}34609[34609] {pull}34689[34689]
- [Gcs Input] - Added missing locks for safe concurrency {pull}34914[34914]
- Fix the ignore_inactive option being ignored in Filebeat's filestream input {pull}34770[34770]

*Heartbeat*

Expand Down
9 changes: 9 additions & 0 deletions x-pack/filebeat/input/gcs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"errors"
"fmt"
"io"
"sync"
"time"
"unicode"

Expand All @@ -27,6 +28,8 @@ import (
)

type job struct {
// Mutex lock for concurrent publishes
mu sync.Mutex
// gcs bucket handle
bucket *storage.BucketHandle
// gcs object attribute struct
Expand Down Expand Up @@ -107,9 +110,12 @@ func (j *job) do(ctx context.Context, id string) {
}
event.SetID(objectID(j.hash, 0))
j.state.save(j.object.Name, j.object.Updated)
// locks while data is being published to avoid concurrent map read/writes
j.mu.Lock()
if err := j.publisher.Publish(event, j.state.checkpoint()); err != nil {
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
}
j.mu.Unlock()
}
}

Expand Down Expand Up @@ -217,9 +223,12 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er
// partially saves read state using offset
j.state.savePartial(j.object.Name, offset+relativeOffset)
}
// locks while data is being published to avoid concurrent map read/writes
j.mu.Lock()
if err := j.publisher.Publish(evt, j.state.checkpoint()); err != nil {
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
}
j.mu.Unlock()
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/gcs/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (l *limiter) wait() {
l.wg.Wait()
}

// release puts pack a worker thread.
// release puts back a worker thread.
func (l *limiter) release() {
<-l.limit
l.wg.Done()
Expand Down Expand Up @@ -167,7 +167,7 @@ func (s *scheduler) fetchObjectPager(ctx context.Context, pageSize int) *iterato
}

// moveToLastSeenJob, moves to the latest job position past the last seen job
// Jobs are stored in lexicographical order always , hence the latest position can be found either on the basis of job name or timestamp
// Jobs are stored in lexicographical order always, hence the latest position can be found either on the basis of job name or timestamp
func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job {
var latestJobs []*job
jobsToReturn := make([]*job, 0)
Expand Down
17 changes: 16 additions & 1 deletion x-pack/filebeat/input/gcs/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,16 @@ func (s *state) save(name string, lastModifiedOn time.Time) {

// setRootArray, sets boolean true for objects that have their roots defined as an array type
func (s *state) setRootArray(name string) {
s.mu.Lock()
s.cp.IsRootArray[name] = true
s.mu.Unlock()
}

// savePartial, partially saves/updates the current state for cursor checkpoint
func (s *state) savePartial(name string, offset int64) {
s.mu.Lock()
s.cp.LastProcessedOffset[name] = offset
s.mu.Unlock()
}

// updateFailedJobs, adds a job name to a failedJobs map, which helps
Expand All @@ -87,11 +91,11 @@ func (s *state) savePartial(name string, offset int64) {
// A failed job will be re-tried a maximum of 3 times after which the
// entry is removed from the map
func (s *state) updateFailedJobs(jobName string) {
s.mu.Lock()
// we do not store partially processed jobs as failed jobs
if _, ok := s.cp.LastProcessedOffset[jobName]; ok {
return
}
s.mu.Lock()
s.cp.FailedJobs[jobName]++
if s.cp.FailedJobs[jobName] > maxFailedJobRetries {
delete(s.cp.FailedJobs, jobName)
Expand All @@ -100,7 +104,18 @@ func (s *state) updateFailedJobs(jobName string) {
}

// setCheckpoint, sets checkpoint from source to current state instance
// If for some reason the current state is empty, assigns new states as
// a fail safe mechanism
func (s *state) setCheckpoint(chkpt *Checkpoint) {
if chkpt.FailedJobs == nil {
chkpt.FailedJobs = make(map[string]int)
}
if chkpt.IsRootArray == nil {
chkpt.IsRootArray = make(map[string]bool)
}
if chkpt.LastProcessedOffset == nil {
chkpt.LastProcessedOffset = make(map[string]int64)
}
s.cp = chkpt
}

Expand Down

0 comments on commit 97a510a

Please sign in to comment.