Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][gcs] Fix for concurrency issues and context timeouts in the GCS input #35605

Merged
merged 10 commits into from
Jun 3, 2023
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ https://github.com/elastic/beats/compare/v8.7.1\...main[Check the HEAD diff]
- Fix crash when loading azurewebstorage cursor with no partially processed data. {pull}35433[35433]
- RFC5424 syslog timestamps with offset 'Z' will be treated as UTC rather than using the default timezone. {pull}35360[35360]
- [system] sync system/auth dataset with system integration 1.29.0. {pull}35581[35581]
- [GCS Input] - Enhancement and bugfix for concurrency issues, flakey tests and context timing out. {pull}35605[35605]

*Heartbeat*

Expand Down
7 changes: 3 additions & 4 deletions x-pack/filebeat/input/gcs/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

"cloud.google.com/go/storage"
"github.com/googleapis/gax-go/v2"
gax "github.com/googleapis/gax-go/v2"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
Expand Down Expand Up @@ -127,7 +127,6 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source,

log := inputCtx.Logger.With("project_id", currentSource.ProjectId).With("bucket", currentSource.BucketName)
log.Infof("Running google cloud storage for project: %s", input.config.ProjectId)

var cp *Checkpoint
if !cursor.IsNew() {
if err := cursor.Unpack(&cp); err != nil {
Expand Down Expand Up @@ -156,7 +155,7 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source,
// Since we are only reading, the operation is always idempotent
storage.WithPolicy(storage.RetryAlways),
)
scheduler := newScheduler(ctx, publisher, bucket, currentSource, &input.config, st, log)
scheduler := newScheduler(publisher, bucket, currentSource, &input.config, st, log)

return scheduler.schedule()
return scheduler.schedule(ctx)
}
6 changes: 3 additions & 3 deletions x-pack/filebeat/input/gcs/input_stateless.go
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

"cloud.google.com/go/storage"
"github.com/googleapis/gax-go/v2"
gax "github.com/googleapis/gax-go/v2"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
Expand Down Expand Up @@ -79,9 +79,9 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher
storage.WithPolicy(storage.RetryAlways),
)

scheduler := newScheduler(ctx, pub, bkt, currentSource, &in.config, st, log)
scheduler := newScheduler(pub, bkt, currentSource, &in.config, st, log)

return scheduler.schedule()
return scheduler.schedule(ctx)
}
return nil
}
60 changes: 21 additions & 39 deletions x-pack/filebeat/input/gcs/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,13 @@ const (
)

func Test_StorageClient(t *testing.T) {
t.Skip("Flaky test: issue (could possibly affect this also) - https://github.com/elastic/beats/issues/34332")
tests := []struct {
name string
baseConfig map[string]interface{}
mockHandler func() http.Handler
expected map[string]bool
checkJSON bool
isError error
unexpectedError error
name string
baseConfig map[string]interface{}
mockHandler func() http.Handler
expected map[string]bool
checkJSON bool
isError error
}{
{
name: "SingleBucketWithPoll_NoErr",
Expand All @@ -67,7 +65,6 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_new_object_data3_json: true,
mock.Gcs_test_new_object_docs_ata_json: true,
},
unexpectedError: context.Canceled,
},
{
name: "SingleBucketWithoutPoll_NoErr",
Expand All @@ -89,7 +86,6 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_new_object_data3_json: true,
mock.Gcs_test_new_object_docs_ata_json: true,
},
unexpectedError: nil,
},
{
name: "TwoBucketsWithPoll_NoErr",
Expand All @@ -116,7 +112,6 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_latest_object_ata_json: true,
mock.Gcs_test_latest_object_data3_json: true,
},
unexpectedError: context.Canceled,
},
{
name: "TwoBucketsWithoutPoll_NoErr",
Expand All @@ -143,7 +138,6 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_latest_object_ata_json: true,
mock.Gcs_test_latest_object_data3_json: true,
},
unexpectedError: nil,
},
{
name: "SingleBucketWithPoll_InvalidBucketErr",
Expand All @@ -159,10 +153,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
unexpectedError: nil,
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
},
{
name: "SingleBucketWithoutPoll_InvalidBucketErr",
Expand All @@ -178,10 +171,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
unexpectedError: nil,
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
},
{
name: "TwoBucketsWithPoll_InvalidBucketErr",
Expand All @@ -200,10 +192,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
unexpectedError: nil,
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
},
{
name: "SingleBucketWithPoll_InvalidConfigValue",
Expand All @@ -219,10 +210,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("requires value <= 5000 accessing 'max_workers'"),
unexpectedError: nil,
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("requires value <= 5000 accessing 'max_workers'"),
},
{
name: "TwoBucketWithPoll_InvalidConfigValue",
Expand All @@ -241,10 +231,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("requires value <= 5000 accessing 'max_workers'"),
unexpectedError: nil,
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("requires value <= 5000 accessing 'max_workers'"),
},
{
name: "SingleBucketWithPoll_parseJSON",
Expand All @@ -267,7 +256,6 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_latest_object_ata_json_parsed: true,
mock.Gcs_test_latest_object_data3_json_parsed: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadJSON",
Expand All @@ -289,7 +277,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesBucket_log_json[1]: true,
mock.BeatsFilesBucket_log_json[2]: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadOctetStreamJSON",
Expand All @@ -310,7 +297,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesBucket_multiline_json[0]: true,
mock.BeatsFilesBucket_multiline_json[1]: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadNDJSON",
Expand All @@ -331,7 +317,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesBucket_log_ndjson[0]: true,
mock.BeatsFilesBucket_log_ndjson[1]: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadMultilineGzJSON",
Expand All @@ -352,7 +337,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesBucket_multiline_json_gz[0]: true,
mock.BeatsFilesBucket_multiline_json_gz[1]: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadJSONWithRootAsArray",
Expand All @@ -375,7 +359,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesBucket_json_array[2]: true,
mock.BeatsFilesBucket_json_array[3]: true,
},
unexpectedError: context.Canceled,
},
}
for _, tt := range tests {
Expand Down Expand Up @@ -462,7 +445,6 @@ func Test_StorageClient(t *testing.T) {
}
}
}
assert.ErrorIs(t, g.Wait(), tt.unexpectedError)
})
}
}
Expand Down
27 changes: 13 additions & 14 deletions x-pack/filebeat/input/gcs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"errors"
"fmt"
"io"
"sync"
"time"
"unicode"

Expand All @@ -28,8 +27,6 @@ import (
)

type job struct {
// Mutex lock for concurrent publishes
mu sync.Mutex
// gcs bucket handle
bucket *storage.BucketHandle
// gcs object attribute struct
Expand Down Expand Up @@ -109,13 +106,13 @@ func (j *job) do(ctx context.Context, id string) {
Fields: fields,
}
event.SetID(objectID(j.hash, 0))
j.state.save(j.object.Name, j.object.Updated)
// locks while data is being published to avoid concurrent map read/writes
j.mu.Lock()
if err := j.publisher.Publish(event, j.state.checkpoint()); err != nil {
// locks while data is being saved and published to avoid concurrent map read/writes
cp, done := j.state.saveForTx(j.object.Name, j.object.Updated)
if err := j.publisher.Publish(event, cp); err != nil {
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
}
j.mu.Unlock()
// unlocks after data is saved and published
done()
}
}

Expand Down Expand Up @@ -216,19 +213,21 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er
// updates the offset after reading the file
// this avoids duplicates for the last read when resuming operation
offset = dec.InputOffset()
// locks while data is being saved and published to avoid concurrent map read/writes
var done func()
var cp *Checkpoint
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
if !dec.More() {
// if this is the last object, then peform a complete state save
j.state.save(j.object.Name, j.object.Updated)
cp, done = j.state.saveForTx(j.object.Name, j.object.Updated)
} else {
// partially saves read state using offset
j.state.savePartial(j.object.Name, offset+relativeOffset)
cp, done = j.state.savePartialForTx(j.object.Name, offset+relativeOffset)
}
// locks while data is being published to avoid concurrent map read/writes
j.mu.Lock()
if err := j.publisher.Publish(evt, j.state.checkpoint()); err != nil {
if err := j.publisher.Publish(evt, cp); err != nil {
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
}
j.mu.Unlock()
// unlocks after data is saved and published
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the assumptions that you are making about the published state of the event after Publish returns? I see this comment about it being "saved and published" and that's not correct. So if that is important to your implementation you'll need changes.

When Publish returns the event has been queued internally. It will have undergone processing via Beats processors then put onto the queue. If it needs to know when the event has been accepted by the destination output then you can register and EventListener to get a callback.

// Callbacks for when events are added / acknowledged
EventListener EventListener

As an example, the GCP pub/sub input uses an EventListener to defer ACK'ing the pubsub message until the event has been written to ES.

EventListener: acker.ConnectionOnly(
acker.EventPrivateReporter(func(_ int, privates []interface{}) {
for _, priv := range privates {
if msg, ok := priv.(*pubsub.Message); ok {
msg.Ack()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewkroh thanks for the feedback, I will revisit this when I do some planned feature enhancements for the gcs input in the near future, will have to separate out the cursor publish from the event publish to properly implement this feature. For now I'm merging then PR since it's related to an sdh.

done()
}
return nil
}
Expand Down
Loading