diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6bd5f1f92b3..0264269b5a9 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -104,6 +104,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix handling of NUL-terminated log lines in Fortinet Firewall module. {issue}36026[36026] {pull}36027[36027] - Make redact field configuration recommended in CEL input and log warning if missing. {pull}36008[36008] - Fix handling of region name configuration in awss3 input {pull}36034[36034] +- Fixed concurrency and flakey tests issue in azure blob storage input. {issue}35983[35983] {pull}36124[36124] - Fix panic when sqs input metrics getter is invoked {pull}36101[36101] {issue}36077[36077] - Make CEL input's `now` global variable static for evaluation lifetime. {pull}36107[36107] - Update mito CEL extension library to v1.5.0. {pull}36146[36146] diff --git a/x-pack/filebeat/input/azureblobstorage/input.go b/x-pack/filebeat/input/azureblobstorage/input.go index 8abfd6c810c..3cca206174d 100644 --- a/x-pack/filebeat/input/azureblobstorage/input.go +++ b/x-pack/filebeat/input/azureblobstorage/input.go @@ -12,6 +12,7 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" + "github.com/elastic/beats/v7/libbeat/feature" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -48,6 +49,7 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) { return nil, nil, err } + //nolint:prealloc // No need to preallocate the slice here var sources []cursor.Source for _, c := range config.Containers { container := tryOverrideOrDefault(config, c) @@ -111,20 +113,22 @@ func (input *azurebsInput) Test(src cursor.Source, ctx v2.TestContext) error { } func (input *azurebsInput) Run(inputCtx v2.Context, src cursor.Source, cursor cursor.Cursor, publisher cursor.Publisher) error { - currentSource := src.(*Source) - - log := inputCtx.Logger.With("account_name", currentSource.AccountName).With("container_name", currentSource.ContainerName) - log.Infof("Running azure blob storage for account: %s", input.config.AccountName) - var cp *Checkpoint st := newState() if !cursor.IsNew() { if err := cursor.Unpack(&cp); err != nil { return err } - st.setCheckpoint(cp) } + return input.run(inputCtx, src, st, publisher) +} + +func (input *azurebsInput) run(inputCtx v2.Context, src cursor.Source, st *state, publisher cursor.Publisher) error { + currentSource := src.(*Source) + + log := inputCtx.Logger.With("account_name", currentSource.AccountName).With("container_name", currentSource.ContainerName) + log.Infof("Running azure blob storage for account: %s", input.config.AccountName) ctx, cancel := context.WithCancel(context.Background()) go func() { diff --git a/x-pack/filebeat/input/azureblobstorage/input_test.go b/x-pack/filebeat/input/azureblobstorage/input_test.go index 5ad9921adee..9ce948719b0 100644 --- a/x-pack/filebeat/input/azureblobstorage/input_test.go +++ b/x-pack/filebeat/input/azureblobstorage/input_test.go @@ -6,10 +6,13 @@ package azureblobstorage import ( "context" + "encoding/json" "errors" + "fmt" "net/http" "net/http/httptest" "strings" + "sync" "testing" "time" @@ -17,6 +20,8 @@ import ( "golang.org/x/sync/errgroup" v2 "github.com/elastic/beats/v7/filebeat/input/v2" + cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" + "github.com/elastic/beats/v7/libbeat/beat" beattest "github.com/elastic/beats/v7/libbeat/publisher/testing" "github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/mock" conf "github.com/elastic/elastic-agent-libs/config" @@ -33,14 +38,12 @@ const ( ) func Test_StorageClient(t *testing.T) { - t.Skip("Flaky test: issue - https://github.com/elastic/beats/issues/34332") tests := []struct { - name string - baseConfig map[string]interface{} - mockHandler func() http.Handler - expected map[string]bool - isError error - unexpectedError error + name string + baseConfig map[string]interface{} + mockHandler func() http.Handler + expected map[string]bool + expectedError error }{ { name: "SingleContainerWithPoll_NoErr", @@ -62,7 +65,6 @@ func Test_StorageClient(t *testing.T) { mock.Beatscontainer_blob_data3_json: true, mock.Beatscontainer_blob_docs_ata_json: true, }, - unexpectedError: context.Canceled, }, { name: "SingleContainerWithoutPoll_NoErr", @@ -84,7 +86,6 @@ func Test_StorageClient(t *testing.T) { mock.Beatscontainer_blob_data3_json: true, mock.Beatscontainer_blob_docs_ata_json: true, }, - unexpectedError: nil, }, { name: "TwoContainersWithPoll_NoErr", @@ -111,7 +112,6 @@ func Test_StorageClient(t *testing.T) { mock.Beatscontainer_2_blob_ata_json: true, mock.Beatscontainer_2_blob_data3_json: true, }, - unexpectedError: context.Canceled, }, { name: "TwoContainersWithoutPoll_NoErr", @@ -138,7 +138,6 @@ func Test_StorageClient(t *testing.T) { mock.Beatscontainer_2_blob_ata_json: true, mock.Beatscontainer_2_blob_data3_json: true, }, - unexpectedError: context.Canceled, }, { name: "SingleContainerPoll_InvalidContainerErr", @@ -154,10 +153,9 @@ func Test_StorageClient(t *testing.T) { }, }, }, - mockHandler: mock.AzureStorageServer, - expected: map[string]bool{}, - isError: mock.NotFoundErr, - unexpectedError: nil, + mockHandler: mock.AzureStorageServer, + expected: map[string]bool{}, + expectedError: mock.NotFoundErr, }, { name: "SingleContainerWithoutPoll_InvalidBucketErr", @@ -173,10 +171,9 @@ func Test_StorageClient(t *testing.T) { }, }, }, - mockHandler: mock.AzureStorageServer, - expected: map[string]bool{}, - isError: mock.NotFoundErr, - unexpectedError: nil, + mockHandler: mock.AzureStorageServer, + expected: map[string]bool{}, + expectedError: mock.NotFoundErr, }, { name: "TwoContainersWithPoll_InvalidBucketErr", @@ -195,10 +192,9 @@ func Test_StorageClient(t *testing.T) { }, }, }, - mockHandler: mock.AzureStorageServer, - expected: map[string]bool{}, - isError: mock.NotFoundErr, - unexpectedError: nil, + mockHandler: mock.AzureStorageServer, + expected: map[string]bool{}, + expectedError: mock.NotFoundErr, }, { name: "SingleBucketWithPoll_InvalidConfigValue", @@ -214,10 +210,9 @@ func Test_StorageClient(t *testing.T) { }, }, }, - mockHandler: mock.AzureStorageServer, - expected: map[string]bool{}, - isError: errors.New("requires value <= 5000 accessing 'max_workers'"), - unexpectedError: nil, + mockHandler: mock.AzureStorageServer, + expected: map[string]bool{}, + expectedError: errors.New("requires value <= 5000 accessing 'max_workers'"), }, { name: "TwoBucketWithPoll_InvalidConfigValue", @@ -236,10 +231,9 @@ func Test_StorageClient(t *testing.T) { }, }, }, - mockHandler: mock.AzureStorageServer, - expected: map[string]bool{}, - isError: errors.New("requires value <= 5000 accessing 'max_workers'"), - unexpectedError: nil, + mockHandler: mock.AzureStorageServer, + expected: map[string]bool{}, + expectedError: errors.New("requires value <= 5000 accessing 'max_workers'"), }, { name: "ReadJSON", @@ -261,7 +255,6 @@ func Test_StorageClient(t *testing.T) { mock.BeatsFilesContainer_log_json[1]: true, mock.BeatsFilesContainer_log_json[2]: true, }, - unexpectedError: context.Canceled, }, { name: "ReadOctetStreamJSON", @@ -282,7 +275,6 @@ func Test_StorageClient(t *testing.T) { mock.BeatsFilesContainer_multiline_json[0]: true, mock.BeatsFilesContainer_multiline_json[1]: true, }, - unexpectedError: context.Canceled, }, { name: "ReadNdJSON", @@ -303,7 +295,6 @@ func Test_StorageClient(t *testing.T) { mock.BeatsFilesContainer_log_ndjson[0]: true, mock.BeatsFilesContainer_log_ndjson[1]: true, }, - unexpectedError: context.Canceled, }, { name: "ReadMultilineGzJSON", @@ -324,7 +315,6 @@ func Test_StorageClient(t *testing.T) { mock.BeatsFilesContainer_multiline_json_gz[0]: true, mock.BeatsFilesContainer_multiline_json_gz[1]: true, }, - unexpectedError: context.Canceled, }, } for _, tt := range tests { @@ -336,7 +326,7 @@ func Test_StorageClient(t *testing.T) { conf := config{} err := cfg.Unpack(&conf) if err != nil { - assert.EqualError(t, err, tt.isError.Error()) + assert.EqualError(t, err, tt.expectedError.Error()) return } input := newStatelessInput(conf, serv.URL+"/") @@ -349,6 +339,7 @@ func Test_StorageClient(t *testing.T) { ctx, cancel := newV2Context() t.Cleanup(cancel) + ctx.ID += tt.name var g errgroup.Group g.Go(func() error { @@ -364,14 +355,14 @@ func Test_StorageClient(t *testing.T) { t.Cleanup(func() { timeout.Stop() }) if len(tt.expected) == 0 { - if tt.isError != nil && g.Wait() != nil { + if tt.expectedError != nil && g.Wait() != nil { //nolint:errorlint // This will never be a wrapped error - if tt.isError == mock.NotFoundErr { + if tt.expectedError == mock.NotFoundErr { arr := strings.Split(g.Wait().Error(), "\n") errStr := strings.Join(arr[1:], "\n") - assert.Equal(t, tt.isError.Error(), errStr) + assert.Equal(t, tt.expectedError.Error(), errStr) } else { - assert.EqualError(t, g.Wait(), tt.isError.Error()) + assert.EqualError(t, g.Wait(), tt.expectedError.Error()) } cancel() } else { @@ -395,7 +386,7 @@ func Test_StorageClient(t *testing.T) { val, err = got.Fields.GetValue("message") assert.NoError(t, err) assert.True(t, tt.expected[val.(string)]) - assert.Equal(t, tt.isError, err) + assert.Equal(t, tt.expectedError, err) receivedCount += 1 if receivedCount == len(tt.expected) { cancel() @@ -403,16 +394,122 @@ func Test_StorageClient(t *testing.T) { } } } - assert.ErrorIs(t, g.Wait(), tt.unexpectedError) }) } } +func Test_Concurrency(t *testing.T) { + for _, workers := range []int{100, 1000, 2000, 3000} { + t.Run(fmt.Sprintf("TestConcurrency_%d_Workers", workers), func(t *testing.T) { + const expectedLen = mock.TotalRandomDataSets + serv := httptest.NewServer(mock.AzureConcurrencyServer()) + t.Cleanup(serv.Close) + + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "account_name": "beatsblobnew", + "auth.shared_credentials.account_key": "7pfLm1betGiRyyABEM/RFrLYlafLZHbLtGhB52LkWVeBxE7la9mIvk6YYAbQKYE/f0GdhiaOZeV8+AStsAdr/Q==", + "max_workers": workers, + "poll": true, + "poll_interval": "10s", + "containers": []map[string]interface{}{ + { + "name": mock.ConcurrencyContainer, + }, + }, + }) + conf := config{} + err := cfg.Unpack(&conf) + assert.NoError(t, err) + input := azurebsInput{ + config: conf, + serviceURL: serv.URL + "/", + } + name := input.Name() + if name != "azure-blob-storage" { + t.Errorf(`unexpected input name: got:%q want:"azure-blob-storage"`, name) + } + + var src cursor.Source + // This test will always have only one container + for _, c := range input.config.Containers { + container := tryOverrideOrDefault(input.config, c) + src = &Source{ + AccountName: input.config.AccountName, + ContainerName: c.Name, + MaxWorkers: *container.MaxWorkers, + Poll: *container.Poll, + PollInterval: *container.PollInterval, + } + } + v2Ctx, cancel := newV2Context() + t.Cleanup(cancel) + v2Ctx.ID += t.Name() + client := publisher{ + stop: func(e []beat.Event) { + if len(e) >= expectedLen { + cancel() + } + }, + } + st := newState() + var g errgroup.Group + g.Go(func() error { + return input.run(v2Ctx, src, st, &client) + }) + timeout := time.NewTimer(100 * time.Second) + t.Cleanup(func() { timeout.Stop() }) + select { + case <-timeout.C: + t.Errorf("timed out waiting for %d events", expectedLen) + cancel() + case <-v2Ctx.Cancelation.Done(): + } + //nolint:errcheck // We can ignore as the error will always be context canceled, which is expected in this case + g.Wait() + if len(client.events) < expectedLen { + t.Errorf("failed to get all events: got:%d want:%d", len(client.events), expectedLen) + } + }) + } +} + +type publisher struct { + stop func([]beat.Event) + events []beat.Event + mu sync.Mutex + cursors []map[string]interface{} +} + +func (p *publisher) Publish(e beat.Event, cursor interface{}) error { + p.mu.Lock() + p.events = append(p.events, e) + if cursor != nil { + var c map[string]interface{} + chkpt, ok := cursor.(*Checkpoint) + if !ok { + return fmt.Errorf("invalid cursor type for testing: %T", cursor) + } + cursorBytes, err := json.Marshal(chkpt) + if err != nil { + return fmt.Errorf("error marshaling cursor data: %w", err) + } + err = json.Unmarshal(cursorBytes, &c) + if err != nil { + return fmt.Errorf("error converting checkpoint struct to cursor map: %w", err) + } + + p.cursors = append(p.cursors, c) + } + p.stop(p.events) + p.mu.Unlock() + return nil +} + func newV2Context() (v2.Context, func()) { ctx, cancel := context.WithCancel(context.Background()) return v2.Context{ Logger: logp.NewLogger("azure-blob-storage_test"), - ID: "test_id", + ID: "test_id:", Cancelation: ctx, }, cancel } diff --git a/x-pack/filebeat/input/azureblobstorage/job.go b/x-pack/filebeat/input/azureblobstorage/job.go index d44da42abfb..e59d52be986 100644 --- a/x-pack/filebeat/input/azureblobstorage/job.go +++ b/x-pack/filebeat/input/azureblobstorage/job.go @@ -86,10 +86,13 @@ func (j *job) do(ctx context.Context, id string) { Fields: fields, } event.SetID(objectID(j.hash, 0)) - j.state.save(*j.blob.Name, *j.blob.Properties.LastModified) - if err := j.publisher.Publish(event, j.state.checkpoint()); err != nil { + // locks while data is being saved to avoid concurrent map read/writes + cp, done := j.state.saveForTx(*j.blob.Name, *j.blob.Properties.LastModified) + if err := j.publisher.Publish(event, cp); err != nil { j.log.Errorf(jobErrString, id, err) } + // unlocks after data is saved + done() } } @@ -186,16 +189,21 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er // updates the offset after reading the file // this avoids duplicates for the last read when resuming operation offset = dec.InputOffset() + var ( + cp *Checkpoint + done func() + ) if !dec.More() { // if this is the last object, then peform a complete state save - j.state.save(*j.blob.Name, *j.blob.Properties.LastModified) + cp, done = j.state.saveForTx(*j.blob.Name, *j.blob.Properties.LastModified) } else { // partially saves read state using offset - j.state.savePartial(*j.blob.Name, offset+relativeOffset, j.blob.Properties.LastModified) + cp, done = j.state.savePartialForTx(*j.blob.Name, offset+relativeOffset) } - if err := j.publisher.Publish(evt, j.state.checkpoint()); err != nil { + if err := j.publisher.Publish(evt, cp); err != nil { j.log.Errorf(jobErrString, id, err) } + done() } return nil } diff --git a/x-pack/filebeat/input/azureblobstorage/mock/data_random.go b/x-pack/filebeat/input/azureblobstorage/mock/data_random.go new file mode 100644 index 00000000000..f52fe2fdbdb --- /dev/null +++ b/x-pack/filebeat/input/azureblobstorage/mock/data_random.go @@ -0,0 +1,124 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package mock + +import ( + "encoding/json" + "encoding/xml" + "fmt" + "math/rand" + "time" +) + +const ( + TotalRandomDataSets = 10000 + ConcurrencyContainer = "concurrency_container" +) + +// Generates random Azure blob storage container metadata in XML format +type EnumerationResults struct { + XMLName xml.Name `xml:"EnumerationResults"` + ServiceEndpoint string `xml:"ServiceEndpoint,attr"` + ContainerName string `xml:"ContainerName,attr"` + Blobs []Blob `xml:"Blobs>Blob"` + NextMarker string `xml:"NextMarker"` +} + +type Blob struct { + Name string `xml:"Name"` + Properties Properties `xml:"Properties"` + Metadata string `xml:"Metadata"` +} + +type Properties struct { + LastModified string `xml:"Last-Modified"` + Etag string `xml:"Etag"` + ContentLength int `xml:"Content-Length"` + ContentType string `xml:"Content-Type"` +} + +func generateMetadata() []byte { + // Generate random data for x data sets defined by TotalRandomDataSets + const numDataSets = TotalRandomDataSets + dataSets := make([]Blob, numDataSets) + + for i := 0; i < numDataSets; i++ { + dataSets[i] = createRandomBlob(i) + } + + // Fill in the root XML structure + xmlData := EnumerationResults{ + ServiceEndpoint: "https://127.0.0.1/", + ContainerName: "concurrency_container", + Blobs: dataSets, + NextMarker: "", + } + + // Marshal the data into XML format + xmlBytes, err := xml.MarshalIndent(xmlData, "", "\t") + if err != nil { + panic(fmt.Sprintf("Error marshaling data: %v", err)) + } + return []byte(xml.Header + string(xmlBytes)) +} + +// Helper function to create a random Blob +func createRandomBlob(i int) Blob { + rand.Seed(time.Now().UnixNano()) + + return Blob{ + Name: fmt.Sprintf("data_%d.json", i), + Properties: Properties{ + LastModified: time.Now().Format(time.RFC1123), + Etag: fmt.Sprintf("0x%X", rand.Int63()), + ContentType: "application/json", + }, + Metadata: "", + } +} + +// Generate Random Blob data in JSON format +type MyData struct { + ID int `json:"id"` + Name string `json:"name"` + Age int `json:"age"` + Email string `json:"email"` + Description string `json:"description"` +} + +func generateRandomBlob() []byte { + const numObjects = 10 + dataObjects := make([]MyData, numObjects) + + for i := 0; i < numObjects; i++ { + dataObjects[i] = createRandomData() + } + + jsonBytes, err := json.MarshalIndent(dataObjects, "", "\t") + if err != nil { + panic(fmt.Sprintf("Error marshaling data: %v", err)) + } + return jsonBytes +} + +func createRandomData() MyData { + rand.Seed(time.Now().UnixNano()) + + return MyData{ + ID: rand.Intn(1000) + 1, + Name: getRandomString([]string{"John", "Alice", "Bob", "Eve"}), + Age: rand.Intn(80) + 18, + Email: getRandomString([]string{"john@example.com", "alice@example.com", "bob@example.com"}), + Description: getRandomString([]string{"Student", "Engineer", "Artist", "Doctor"}), + } +} + +func getRandomString(options []string) string { + if len(options) == 0 { + return "" + } + rand.Seed(time.Now().UnixNano()) + return options[rand.Intn(len(options))] +} diff --git a/x-pack/filebeat/input/azureblobstorage/mock/mock.go b/x-pack/filebeat/input/azureblobstorage/mock/mock.go index aadbf4ffd28..32d65cdb2e3 100644 --- a/x-pack/filebeat/input/azureblobstorage/mock/mock.go +++ b/x-pack/filebeat/input/azureblobstorage/mock/mock.go @@ -92,3 +92,29 @@ func AzureStorageFileServer() http.Handler { w.Write([]byte("resource not found")) }) } + +//nolint:errcheck // We can ignore as response writer errors cannot be handled in this scenario +func AzureConcurrencyServer() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + path := strings.Split(strings.TrimLeft(r.URL.Path, "/"), "/") + w.Header().Set(contentType, jsonType) + if r.Method == http.MethodGet { + switch len(path) { + case 1: + if path[0] == ConcurrencyContainer { + w.Header().Set(contentType, xmlType) + w.Write(generateMetadata()) + return + } + case 2: + w.Write(generateRandomBlob()) + return + default: + w.WriteHeader(http.StatusNotFound) + return + } + } + w.WriteHeader(http.StatusNotFound) + w.Write([]byte("resource not found")) + }) +} diff --git a/x-pack/filebeat/input/azureblobstorage/state.go b/x-pack/filebeat/input/azureblobstorage/state.go index f0a6e6cbce9..de1873e9042 100644 --- a/x-pack/filebeat/input/azureblobstorage/state.go +++ b/x-pack/filebeat/input/azureblobstorage/state.go @@ -37,8 +37,11 @@ func newState() *state { } } -// Save, saves/updates the current state for cursor checkpoint -func (s *state) save(name string, lastModifiedOn time.Time) { +// saveForTx updates and returns the current state checkpoint, locks the state +// and returns an unlock function done(). The caller must call done when +// s and cp are no longer needed in a locked state. done may not be called +// more than once. +func (s *state) saveForTx(name string, lastModifiedOn time.Time) (cp *Checkpoint, done func()) { s.mu.Lock() delete(s.cp.PartiallyProcessed, name) if len(s.cp.BlobName) == 0 { @@ -51,17 +54,20 @@ func (s *state) save(name string, lastModifiedOn time.Time) { } else if lastModifiedOn.After(s.cp.LatestEntryTime) { s.cp.LatestEntryTime = lastModifiedOn } - s.mu.Unlock() + return s.cp, func() { s.mu.Unlock() } } -// savePartial, partially saves/updates the current state for cursor checkpoint -func (s *state) savePartial(name string, offset int64, lastModifiedOn *time.Time) { +// savePartialForTx partially updates and returns the current state checkpoint, locks the state +// and returns an unlock function done(). The caller must call done when +// s and cp are no longer needed in a locked state. done may not be called +// more than once. +func (s *state) savePartialForTx(name string, offset int64) (cp *Checkpoint, done func()) { s.mu.Lock() s.cp.PartiallyProcessed[name] = offset - s.mu.Unlock() + return s.cp, func() { s.mu.Unlock() } } -// setCheckpoint, sets checkpoint from source to current state instance +// setCheckpoint sets checkpoint from source to current state instance func (s *state) setCheckpoint(chkpt *Checkpoint) { if chkpt.PartiallyProcessed == nil { chkpt.PartiallyProcessed = make(map[string]int64) @@ -69,7 +75,7 @@ func (s *state) setCheckpoint(chkpt *Checkpoint) { s.cp = chkpt } -// checkpoint, returns the current state checkpoint +// checkpoint returns the current state checkpoint func (s *state) checkpoint() *Checkpoint { return s.cp }