Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.8](backport #35605) [filebeat][gcs] Fix for concurrency issues and context timeouts in the GCS input #35666

Merged
merged 3 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Fix handling of MySQL audit logs with strict JSON parser. {issue}35158[35158] {pull}35160[35160]
- Fix accidental error overwrite in defer statement in entityanalytics Azure AD input. {issue}35153[35153] {pull}35169[35169]
- Fixing the grok expression outputs of log files {pull}35221[35221]
- [GCS Input] - Fixed an issue where bucket_timeout was being applied to the entire bucket poll interval and not individual bucket object read operations. Fixed a map write concurrency issue arising from data races when using a high number of workers. Fixed the flaky tests that were present in the GCS test suit. {pull}35605[35605]
- Fix handling of IPv6 unspecified addresses in TCP input. {issue}35064[35064] {pull}35637[35637]

*Heartbeat*
Expand Down
3 changes: 1 addition & 2 deletions x-pack/filebeat/input/gcs/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

"cloud.google.com/go/storage"
"github.com/googleapis/gax-go/v2"
gax "github.com/googleapis/gax-go/v2"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
Expand Down Expand Up @@ -127,7 +127,6 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source,

log := inputCtx.Logger.With("project_id", currentSource.ProjectId).With("bucket", currentSource.BucketName)
log.Infof("Running google cloud storage for project: %s", input.config.ProjectId)

var cp *Checkpoint
if !cursor.IsNew() {
if err := cursor.Unpack(&cp); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/gcs/input_stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

"cloud.google.com/go/storage"
"github.com/googleapis/gax-go/v2"
gax "github.com/googleapis/gax-go/v2"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
Expand Down
60 changes: 21 additions & 39 deletions x-pack/filebeat/input/gcs/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,13 @@ const (
)

func Test_StorageClient(t *testing.T) {
t.Skip("Flaky test: issue (could possibly affect this also) - https://github.com/elastic/beats/issues/34332")
tests := []struct {
name string
baseConfig map[string]interface{}
mockHandler func() http.Handler
expected map[string]bool
checkJSON bool
isError error
unexpectedError error
name string
baseConfig map[string]interface{}
mockHandler func() http.Handler
expected map[string]bool
checkJSON bool
isError error
}{
{
name: "SingleBucketWithPoll_NoErr",
Expand All @@ -67,7 +65,6 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_new_object_data3_json: true,
mock.Gcs_test_new_object_docs_ata_json: true,
},
unexpectedError: context.Canceled,
},
{
name: "SingleBucketWithoutPoll_NoErr",
Expand All @@ -89,7 +86,6 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_new_object_data3_json: true,
mock.Gcs_test_new_object_docs_ata_json: true,
},
unexpectedError: nil,
},
{
name: "TwoBucketsWithPoll_NoErr",
Expand All @@ -116,7 +112,6 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_latest_object_ata_json: true,
mock.Gcs_test_latest_object_data3_json: true,
},
unexpectedError: context.Canceled,
},
{
name: "TwoBucketsWithoutPoll_NoErr",
Expand All @@ -143,7 +138,6 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_latest_object_ata_json: true,
mock.Gcs_test_latest_object_data3_json: true,
},
unexpectedError: nil,
},
{
name: "SingleBucketWithPoll_InvalidBucketErr",
Expand All @@ -159,10 +153,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
unexpectedError: nil,
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
},
{
name: "SingleBucketWithoutPoll_InvalidBucketErr",
Expand All @@ -178,10 +171,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
unexpectedError: nil,
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
},
{
name: "TwoBucketsWithPoll_InvalidBucketErr",
Expand All @@ -200,10 +192,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
unexpectedError: nil,
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
},
{
name: "SingleBucketWithPoll_InvalidConfigValue",
Expand All @@ -219,10 +210,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("requires value <= 5000 accessing 'max_workers'"),
unexpectedError: nil,
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("requires value <= 5000 accessing 'max_workers'"),
},
{
name: "TwoBucketWithPoll_InvalidConfigValue",
Expand All @@ -241,10 +231,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("requires value <= 5000 accessing 'max_workers'"),
unexpectedError: nil,
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("requires value <= 5000 accessing 'max_workers'"),
},
{
name: "SingleBucketWithPoll_parseJSON",
Expand All @@ -267,7 +256,6 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_latest_object_ata_json_parsed: true,
mock.Gcs_test_latest_object_data3_json_parsed: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadJSON",
Expand All @@ -289,7 +277,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesBucket_log_json[1]: true,
mock.BeatsFilesBucket_log_json[2]: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadOctetStreamJSON",
Expand All @@ -310,7 +297,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesBucket_multiline_json[0]: true,
mock.BeatsFilesBucket_multiline_json[1]: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadNDJSON",
Expand All @@ -331,7 +317,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesBucket_log_ndjson[0]: true,
mock.BeatsFilesBucket_log_ndjson[1]: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadMultilineGzJSON",
Expand All @@ -352,7 +337,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesBucket_multiline_json_gz[0]: true,
mock.BeatsFilesBucket_multiline_json_gz[1]: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadJSONWithRootAsArray",
Expand All @@ -375,7 +359,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesBucket_json_array[2]: true,
mock.BeatsFilesBucket_json_array[3]: true,
},
unexpectedError: context.Canceled,
},
}
for _, tt := range tests {
Expand Down Expand Up @@ -462,7 +445,6 @@ func Test_StorageClient(t *testing.T) {
}
}
}
assert.ErrorIs(t, g.Wait(), tt.unexpectedError)
})
}
}
Expand Down
29 changes: 15 additions & 14 deletions x-pack/filebeat/input/gcs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"errors"
"fmt"
"io"
"sync"
"time"
"unicode"

Expand All @@ -28,8 +27,6 @@ import (
)

type job struct {
// Mutex lock for concurrent publishes
mu sync.Mutex
// gcs bucket handle
bucket *storage.BucketHandle
// gcs object attribute struct
Expand Down Expand Up @@ -109,13 +106,13 @@ func (j *job) do(ctx context.Context, id string) {
Fields: fields,
}
event.SetID(objectID(j.hash, 0))
j.state.save(j.object.Name, j.object.Updated)
// locks while data is being published to avoid concurrent map read/writes
j.mu.Lock()
if err := j.publisher.Publish(event, j.state.checkpoint()); err != nil {
// locks while data is being saved and published to avoid concurrent map read/writes
cp, done := j.state.saveForTx(j.object.Name, j.object.Updated)
if err := j.publisher.Publish(event, cp); err != nil {
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
}
j.mu.Unlock()
// unlocks after data is saved and published
done()
}
}

Expand Down Expand Up @@ -216,19 +213,23 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er
// updates the offset after reading the file
// this avoids duplicates for the last read when resuming operation
offset = dec.InputOffset()
// locks while data is being saved and published to avoid concurrent map read/writes
var (
done func()
cp *Checkpoint
)
if !dec.More() {
// if this is the last object, then peform a complete state save
j.state.save(j.object.Name, j.object.Updated)
cp, done = j.state.saveForTx(j.object.Name, j.object.Updated)
} else {
// partially saves read state using offset
j.state.savePartial(j.object.Name, offset+relativeOffset)
cp, done = j.state.savePartialForTx(j.object.Name, offset+relativeOffset)
}
// locks while data is being published to avoid concurrent map read/writes
j.mu.Lock()
if err := j.publisher.Publish(evt, j.state.checkpoint()); err != nil {
if err := j.publisher.Publish(evt, cp); err != nil {
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
}
j.mu.Unlock()
// unlocks after data is saved and published
done()
}
return nil
}
Expand Down
18 changes: 7 additions & 11 deletions x-pack/filebeat/input/gcs/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,11 @@ func newScheduler(ctx context.Context, publisher cursor.Publisher, bucket *stora
// Schedule, is responsible for fetching & scheduling jobs using the workerpool model
func (s *scheduler) schedule() error {
if !s.src.Poll {
ctxWithTimeout, cancel := context.WithTimeout(s.parentCtx, s.src.BucketTimeOut)
defer cancel()
return s.scheduleOnce(ctxWithTimeout)
return s.scheduleOnce(s.parentCtx)
}

for {
ctxWithTimeout, cancel := context.WithTimeout(s.parentCtx, s.src.BucketTimeOut)
defer cancel()

err := s.scheduleOnce(ctxWithTimeout)
err := s.scheduleOnce(s.parentCtx)
if err != nil {
return err
}
Expand Down Expand Up @@ -92,9 +87,9 @@ func (l *limiter) release() {
l.wg.Done()
}

func (s *scheduler) scheduleOnce(ctxWithTimeout context.Context) error {
func (s *scheduler) scheduleOnce(ctx context.Context) error {
defer s.limiter.wait()
pager := s.fetchObjectPager(ctxWithTimeout, s.src.MaxWorkers)
pager := s.fetchObjectPager(ctx, *s.cfg.MaxWorkers)
for {
var objects []*storage.ObjectAttrs
nextPageToken, err := pager.NextPage(&objects)
Expand All @@ -107,7 +102,7 @@ func (s *scheduler) scheduleOnce(ctxWithTimeout context.Context) error {
if !s.state.checkpoint().LatestEntryTime.IsZero() {
jobs = s.moveToLastSeenJob(jobs)
if len(s.state.checkpoint().FailedJobs) > 0 {
jobs = s.addFailedJobs(ctxWithTimeout, jobs)
jobs = s.addFailedJobs(ctx, jobs)
}
}

Expand All @@ -118,14 +113,15 @@ func (s *scheduler) scheduleOnce(ctxWithTimeout context.Context) error {
s.limiter.acquire()
go func() {
defer s.limiter.release()
job.do(s.parentCtx, id)
job.do(ctx, id)
}()
}

if nextPageToken == "" {
break
}
}

return nil
}

Expand Down
26 changes: 16 additions & 10 deletions x-pack/filebeat/input/gcs/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ func newState() *state {
}
}

// save, saves/updates the current state for cursor checkpoint
func (s *state) save(name string, lastModifiedOn time.Time) {
// saveForTx updates and returns the current state checkpoint, locks the state
// and returns an unlock function, done. The caller must call done when
// s and cp are no longer needed in a locked state. done may not be called
// more than once.
func (s *state) saveForTx(name string, lastModifiedOn time.Time) (cp *Checkpoint, done func()) {
s.mu.Lock()
delete(s.cp.LastProcessedOffset, name)
delete(s.cp.IsRootArray, name)
Expand All @@ -68,20 +71,23 @@ func (s *state) save(name string, lastModifiedOn time.Time) {
// clear entry if this is a failed job
delete(s.cp.FailedJobs, name)
}
s.mu.Unlock()
return s.cp, func() { s.mu.Unlock() }
}

// setRootArray, sets boolean true for objects that have their roots defined as an array type
func (s *state) setRootArray(name string) {
// savePartialForTx partially updates and returns the current state checkpoint, locks the state
// and returns an unlock function, done. The caller must call done when
// s and cp are no longer needed in a locked state. done may not be called
// more than once.
func (s *state) savePartialForTx(name string, offset int64) (cp *Checkpoint, done func()) {
s.mu.Lock()
s.cp.IsRootArray[name] = true
s.mu.Unlock()
s.cp.LastProcessedOffset[name] = offset
return s.cp, func() { s.mu.Unlock() }
}

// savePartial, partially saves/updates the current state for cursor checkpoint
func (s *state) savePartial(name string, offset int64) {
// setRootArray, sets boolean true for objects that have their roots defined as an array type
func (s *state) setRootArray(name string) {
s.mu.Lock()
s.cp.LastProcessedOffset[name] = offset
s.cp.IsRootArray[name] = true
s.mu.Unlock()
}

Expand Down