From d75e73516c3709ea6d3bf06939bffcffc41bb330 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 8 Oct 2024 07:52:28 +0530 Subject: [PATCH] [filbeat][azure-blob-storage] - Adding support for Microsoft Entra ID RBAC authentication (#40879) (#41060) --- CHANGELOG.next.asciidoc | 1 + .../inputs/input-azure-blob-storage.asciidoc | 54 ++++- .../input/azureblobstorage/auth_test.go | 189 ++++++++++++++++++ .../filebeat/input/azureblobstorage/client.go | 41 +++- .../filebeat/input/azureblobstorage/config.go | 25 ++- .../input/azureblobstorage/config_test.go | 80 ++++++++ .../input/azureblobstorage/mock/data.go | 2 +- .../input/azureblobstorage/mock/mock.go | 6 +- .../input/azureblobstorage/scheduler.go | 2 +- .../filebeat/input/azureblobstorage/types.go | 3 + 10 files changed, 381 insertions(+), 22 deletions(-) create mode 100644 x-pack/filebeat/input/azureblobstorage/auth_test.go create mode 100644 x-pack/filebeat/input/azureblobstorage/config_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ecfa4edd67c..4f8ad4deddb 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -308,6 +308,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Disable event normalization for netflow input {pull}40635[40635] - Allow attribute selection in the Active Directory entity analytics provider. {issue}40482[40482] {pull}40662[40662] - Improve error quality when CEL program does not correctly return an events array. {pull}40580[40580] +- Added support for Microsoft Entra ID RBAC authentication. {issue}40434[40434] {pull}40879[40879] - Add `use_kubeadm` config option for filebeat (both filbeat.input and autodiscovery) in order to toggle kubeadm-config api requests {pull}40301[40301] - Make HTTP library function inclusion non-conditional in CEL input. {pull}40912[40912] - Add support for Crowdstrike streaming API to the streaming input. {issue}40264[40264] {pull}40838[40838] diff --git a/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc b/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc index c7e9c2892bd..d6c1b4c9050 100644 --- a/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc @@ -128,17 +128,18 @@ Now let's explore the configuration attributes a bit more elaborately. *Supported Attributes :-* 1. <> - 2. <> - 3. <> - 4. <> - 5. <> - 6. <> - 7. <> - 8. <> - 9. <> - 10. <> - 11. <> - 12. <> + 2. <> + 3. <> + 4. <> + 5. <> + 6. <> + 7. <> + 8. <> + 9. <> + 10. <> + 11. <> + 12. <> + 13. <> [id="attrib-account-name"] @@ -148,6 +149,37 @@ Now let's explore the configuration attributes a bit more elaborately. This attribute is required for various internal operations with respect to authentication, creating service clients and blob clients which are used internally for various processing purposes. +[id="attrib-auth-oauth2"] +[float] +==== `auth.oauth2` + +This attribute contains the Microsoft Entra ID RBAC authentication credentials for a secure connection to the Azure Blob Storage. The `auth.oauth2` attribute contains the following sub-attributes: + + 1. `client_id`: The client ID of the Azure Entra ID application. + 2. `client_secret`: The client secret of the Azure Entra ID application. + 3. `tenant_id`: The tenant ID of the Azure Entra ID application. + +A sample configuration with `auth.oauth2` is given below: + +["source","yaml"] +---- +filebeat.inputs: +- type: azure-blob-storage + account_name: some_account + auth.oauth2: + client_id: "some_client_id" + client_secret: "some_client_secret" + tenant_id: "some_tenant_id" + containers: + - name: container_1 + max_workers: 3 + poll: true + poll_interval: 10s +---- +How to setup the `auth.oauth2` credentials can be found in the Azure documentation https://docs.microsoft.com/en-us/azure/active-directory/develop/quickstart-register-app[here] + +NOTE: According to our internal testing it seems that we require at least an access level of **blobOwner** for the service principle to be able to read the blobs. If you are facing any issues with the access level, ensure that the access level is set to **blobOwner**. + [id="attrib-auth-shared-account-key"] [float] ==== `auth.shared_credentials.account_key` diff --git a/x-pack/filebeat/input/azureblobstorage/auth_test.go b/x-pack/filebeat/input/azureblobstorage/auth_test.go new file mode 100644 index 00000000000..2c1ece73739 --- /dev/null +++ b/x-pack/filebeat/input/azureblobstorage/auth_test.go @@ -0,0 +1,189 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package azureblobstorage + +import ( + "bytes" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "regexp" + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/stretchr/testify/assert" + "golang.org/x/sync/errgroup" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + beattest "github.com/elastic/beats/v7/libbeat/publisher/testing" + "github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/mock" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" +) + +// customTransporter implements the Transporter interface with a custom Do & RoundTrip method +type customTransporter struct { + rt http.RoundTripper + servURL string +} + +func (t *customTransporter) RoundTrip(req *http.Request) (*http.Response, error) { + return t.rt.RoundTrip(req) +} + +// Do is responsible for the routing of the request to the appropriate handler based on the request URL +func (t *customTransporter) Do(req *http.Request) (*http.Response, error) { + logp.L().Named("azure-blob-storage-test").Debug("request URL: ", req.URL) + re := regexp.MustCompile(`^/([0-9a-fA-F-]+)/?(oauth2/v2\.0/token|v2\.0/\.well-known/openid-configuration)`) + matches := re.FindStringSubmatch(req.URL.Path) + + if len(matches) == 3 { + tenant_id := matches[1] + action := matches[2] + + switch action { + case "v2.0/.well-known/openid-configuration": + return createJSONResponse(map[string]interface{}{ + "token_endpoint": t.servURL + "/" + tenant_id + "/oauth2/v2.0/token", + "authorization_endpoint": t.servURL + "/" + tenant_id + "/oauth2/v2.0/authorize", + "issuer": t.servURL + "/" + tenant_id + "/oauth2/v2.0/issuer", + }, 200) + + case "oauth2/v2.0/token": + return createJSONResponse(map[string]interface{}{ + "token_type": "Bearer", + "expires_in": 3600, + "access_token": "mock_access_token_123", + }, 200) + } + } + return t.rt.RoundTrip(req) +} + +func createJSONResponse(data interface{}, statusCode int) (*http.Response, error) { + jsonData, err := json.Marshal(data) + if err != nil { + return nil, err + } + + resp := &http.Response{ + StatusCode: statusCode, + Body: io.NopCloser(bytes.NewBuffer(jsonData)), + Header: make(http.Header), + } + + resp.Header.Set("Content-Type", "application/json") + return resp, nil +} + +func Test_OAuth2(t *testing.T) { + tests := []struct { + name string + baseConfig map[string]interface{} + mockHandler func() http.Handler + expected map[string]bool + }{ + { + name: "OAuth2TConfig", + baseConfig: map[string]interface{}{ + "account_name": "beatsblobnew", + "auth.oauth2": map[string]interface{}{ + "client_id": "12345678-90ab-cdef-1234-567890abcdef", + "client_secret": "abcdefg1234567890!@#$%^&*()-_=+", + "tenant_id": "87654321-abcd-ef90-1234-fedcba098765", + }, + "max_workers": 2, + "poll": true, + "poll_interval": "30s", + "containers": []map[string]interface{}{ + { + "name": beatsContainer, + }, + }, + }, + mockHandler: mock.AzureStorageServer, + expected: map[string]bool{ + mock.Beatscontainer_blob_ata_json: true, + mock.Beatscontainer_blob_data3_json: true, + mock.Beatscontainer_blob_docs_ata_json: true, + }, + }, + } + + logp.TestingSetup() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + serv := httptest.NewServer(tt.mockHandler()) + t.Cleanup(serv.Close) + + httpClient := &http.Client{ + Transport: &customTransporter{ + rt: http.DefaultTransport, + servURL: serv.URL, + }, + } + + cfg := conf.MustNewConfigFrom(tt.baseConfig) + conf := config{} + err := cfg.Unpack(&conf) + assert.NoError(t, err) + + // inject custom transport & client options + conf.Auth.OAuth2.clientOptions = azcore.ClientOptions{ + InsecureAllowCredentialWithHTTP: true, + Transport: httpClient.Transport.(*customTransporter), + } + + input := newStatelessInput(conf, serv.URL+"/") + + assert.Equal(t, "azure-blob-storage-stateless", input.Name()) + assert.NoError(t, input.Test(v2.TestContext{})) + + chanClient := beattest.NewChanClient(len(tt.expected)) + t.Cleanup(func() { _ = chanClient.Close() }) + + ctx, cancel := newV2Context() + t.Cleanup(cancel) + ctx.ID += tt.name + + var g errgroup.Group + g.Go(func() error { + return input.Run(ctx, chanClient) + }) + + var timeout *time.Timer + if conf.PollInterval != nil { + timeout = time.NewTimer(1*time.Second + *conf.PollInterval) + } else { + timeout = time.NewTimer(10 * time.Second) + } + t.Cleanup(func() { timeout.Stop() }) + + var receivedCount int + wait: + for { + select { + case <-timeout.C: + t.Errorf("timed out waiting for %d events", len(tt.expected)) + cancel() + return + case got := <-chanClient.Channel: + var val interface{} + var err error + val, err = got.Fields.GetValue("message") + assert.NoError(t, err) + assert.True(t, tt.expected[val.(string)]) + receivedCount += 1 + if receivedCount == len(tt.expected) { + cancel() + break wait + } + } + } + }) + } +} diff --git a/x-pack/filebeat/input/azureblobstorage/client.go b/x-pack/filebeat/input/azureblobstorage/client.go index fd30498e852..e23374dcea8 100644 --- a/x-pack/filebeat/input/azureblobstorage/client.go +++ b/x-pack/filebeat/input/azureblobstorage/client.go @@ -7,6 +7,7 @@ package azureblobstorage import ( "fmt" + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" azcontainer "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" @@ -16,10 +17,13 @@ import ( ) func fetchServiceClientAndCreds(cfg config, url string, log *logp.Logger) (*service.Client, *serviceCredentials, error) { - if cfg.Auth.SharedCredentials != nil { + switch { + case cfg.Auth.SharedCredentials != nil: return fetchServiceClientWithSharedKeyCreds(url, cfg.AccountName, cfg.Auth.SharedCredentials, log) - } else if cfg.Auth.ConnectionString != nil { + case cfg.Auth.ConnectionString != nil: return fetchServiceClientWithConnectionString(cfg.Auth.ConnectionString, log) + case cfg.Auth.OAuth2 != nil: + return fetchServiceClientWithOAuth2(url, cfg.Auth.OAuth2) } return nil, nil, fmt.Errorf("no valid auth specified") @@ -52,8 +56,26 @@ func fetchServiceClientWithConnectionString(connectionString *connectionStringCo return serviceClient, &serviceCredentials{connectionStrCreds: connectionString.URI, cType: connectionStringType}, nil } +func fetchServiceClientWithOAuth2(url string, cfg *OAuth2Config) (*service.Client, *serviceCredentials, error) { + creds, err := azidentity.NewClientSecretCredential(cfg.TenantID, cfg.ClientID, cfg.ClientSecret, &azidentity.ClientSecretCredentialOptions{ + ClientOptions: cfg.clientOptions, + }) + if err != nil { + return nil, nil, fmt.Errorf("failed to create client secret credential with oauth2 config: %w", err) + } + + client, err := azblob.NewClient(url, creds, &azblob.ClientOptions{ + ClientOptions: cfg.clientOptions, + }) + if err != nil { + return nil, nil, fmt.Errorf("failed to create azblob service client: %w", err) + } + + return client.ServiceClient(), &serviceCredentials{oauth2Creds: creds, cType: oauth2Type}, nil +} + // fetchBlobClient, generic function that returns a BlobClient based on the credential type -func fetchBlobClient(url string, credential *blobCredentials, log *logp.Logger) (*blob.Client, error) { +func fetchBlobClient(url string, credential *blobCredentials, cfg config, log *logp.Logger) (*blob.Client, error) { if credential == nil { return nil, fmt.Errorf("no valid blob credentials found") } @@ -63,6 +85,8 @@ func fetchBlobClient(url string, credential *blobCredentials, log *logp.Logger) return fetchBlobClientWithSharedKey(url, credential.serviceCreds.sharedKeyCreds, log) case connectionStringType: return fetchBlobClientWithConnectionString(credential.serviceCreds.connectionStrCreds, credential.containerName, credential.blobName, log) + case oauth2Type: + return fetchBlobClientWithOAuth2(url, credential.serviceCreds.oauth2Creds, cfg.Auth.OAuth2) default: return nil, fmt.Errorf("no valid service credential 'type' found: %s", credential.serviceCreds.cType) } @@ -88,6 +112,17 @@ func fetchBlobClientWithConnectionString(connectionString string, containerName return blobClient, nil } +func fetchBlobClientWithOAuth2(url string, credential *azidentity.ClientSecretCredential, oauth2Cfg *OAuth2Config) (*blob.Client, error) { + blobClient, err := blob.NewClient(url, credential, &blob.ClientOptions{ + ClientOptions: oauth2Cfg.clientOptions, + }) + if err != nil { + return nil, fmt.Errorf("failed to fetch blob client for %s: %w", url, err) + } + + return blobClient, nil +} + func fetchContainerClient(serviceClient *service.Client, containerName string, log *logp.Logger) (*azcontainer.Client, error) { return serviceClient.NewContainerClient(containerName), nil } diff --git a/x-pack/filebeat/input/azureblobstorage/config.go b/x-pack/filebeat/input/azureblobstorage/config.go index a780e60216d..5367596935a 100644 --- a/x-pack/filebeat/input/azureblobstorage/config.go +++ b/x-pack/filebeat/input/azureblobstorage/config.go @@ -5,8 +5,11 @@ package azureblobstorage import ( + "errors" "time" + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/elastic/beats/v7/libbeat/common/match" ) @@ -44,19 +47,35 @@ type fileSelectorConfig struct { } type authConfig struct { - SharedCredentials *sharedKeyConfig `config:"shared_credentials,omitempty"` - ConnectionString *connectionStringConfig `config:"connection_string,omitempty"` + SharedCredentials *sharedKeyConfig `config:"shared_credentials"` + ConnectionString *connectionStringConfig `config:"connection_string"` + OAuth2 *OAuth2Config `config:"oauth2"` } type connectionStringConfig struct { - URI string `config:"uri,omitempty"` + URI string `config:"uri"` } type sharedKeyConfig struct { AccountKey string `config:"account_key"` } +type OAuth2Config struct { + ClientID string `config:"client_id"` + ClientSecret string `config:"client_secret"` + TenantID string `config:"tenant_id"` + // clientOptions is used internally for testing purposes only + clientOptions azcore.ClientOptions +} + func defaultConfig() config { return config{ AccountName: "some_account", } } + +func (c config) Validate() error { + if c.Auth.OAuth2 != nil && (c.Auth.OAuth2.ClientID == "" || c.Auth.OAuth2.ClientSecret == "" || c.Auth.OAuth2.TenantID == "") { + return errors.New("client_id, client_secret and tenant_id are required for OAuth2 auth") + } + return nil +} diff --git a/x-pack/filebeat/input/azureblobstorage/config_test.go b/x-pack/filebeat/input/azureblobstorage/config_test.go new file mode 100644 index 00000000000..adef15d95f7 --- /dev/null +++ b/x-pack/filebeat/input/azureblobstorage/config_test.go @@ -0,0 +1,80 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package azureblobstorage + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" +) + +var configTests = []struct { + name string + config map[string]interface{} + wantErr error +}{ + { + name: "invalid_oauth2_config", + config: map[string]interface{}{ + "account_name": "beatsblobnew", + "auth.oauth2": map[string]interface{}{ + "client_id": "12345678-90ab-cdef-1234-567890abcdef", + "client_secret": "abcdefg1234567890!@#$%^&*()-_=+", + }, + "max_workers": 2, + "poll": true, + "poll_interval": "10s", + "containers": []map[string]interface{}{ + { + "name": beatsContainer, + }, + }, + }, + wantErr: fmt.Errorf("client_id, client_secret and tenant_id are required for OAuth2 auth accessing config"), + }, + { + name: "valid_oauth2_config", + config: map[string]interface{}{ + "account_name": "beatsblobnew", + "auth.oauth2": map[string]interface{}{ + "client_id": "12345678-90ab-cdef-1234-567890abcdef", + "client_secret": "abcdefg1234567890!@#$%^&*()-_=+", + "tenant_id": "87654321-abcd-ef90-1234-fedcba098765", + }, + "max_workers": 2, + "poll": true, + "poll_interval": "10s", + "containers": []map[string]interface{}{ + { + "name": beatsContainer, + }, + }, + }, + }, +} + +func TestConfig(t *testing.T) { + logp.TestingSetup() + for _, test := range configTests { + t.Run(test.name, func(t *testing.T) { + cfg := conf.MustNewConfigFrom(test.config) + conf := config{} + err := cfg.Unpack(&conf) + + switch { + case err == nil && test.wantErr != nil: + t.Fatalf("expected error unpacking config: %v", test.wantErr) + case err != nil && test.wantErr == nil: + t.Fatalf("unexpected error unpacking config: %v", err) + case err != nil && test.wantErr != nil: + assert.EqualError(t, err, test.wantErr.Error()) + } + }) + } +} diff --git a/x-pack/filebeat/input/azureblobstorage/mock/data.go b/x-pack/filebeat/input/azureblobstorage/mock/data.go index 1132df16900..ed44d4b37ec 100644 --- a/x-pack/filebeat/input/azureblobstorage/mock/data.go +++ b/x-pack/filebeat/input/azureblobstorage/mock/data.go @@ -11,7 +11,7 @@ const ( beatsContainer2 = "beatscontainer2" ) -var containers = map[string]bool{ +var Containers = map[string]bool{ beatsContainer: true, beatsContainer2: true, } diff --git a/x-pack/filebeat/input/azureblobstorage/mock/mock.go b/x-pack/filebeat/input/azureblobstorage/mock/mock.go index 32d65cdb2e3..78c70b28df8 100644 --- a/x-pack/filebeat/input/azureblobstorage/mock/mock.go +++ b/x-pack/filebeat/input/azureblobstorage/mock/mock.go @@ -25,18 +25,18 @@ func AzureStorageServer() http.Handler { if r.Method == http.MethodGet { switch len(path) { case 1: - if containers[path[0]] { + if Containers[path[0]] { w.Header().Set(contentType, xmlType) w.Write([]byte(fetchContainer[path[0]])) return } case 2: - if containers[path[0]] && availableBlobs[path[0]][path[1]] { + if Containers[path[0]] && availableBlobs[path[0]][path[1]] { w.Write([]byte(blobs[path[0]][path[1]])) return } case 3: - if containers[path[0]] { + if Containers[path[0]] { objName := strings.Join(path[1:], "/") if availableBlobs[path[0]][objName] { w.Write([]byte(blobs[path[0]][objName])) diff --git a/x-pack/filebeat/input/azureblobstorage/scheduler.go b/x-pack/filebeat/input/azureblobstorage/scheduler.go index ba433b78f41..781514d5271 100644 --- a/x-pack/filebeat/input/azureblobstorage/scheduler.go +++ b/x-pack/filebeat/input/azureblobstorage/scheduler.go @@ -123,7 +123,7 @@ func (s *scheduler) scheduleOnce(ctx context.Context) error { containerName: s.src.ContainerName, } - blobClient, err := fetchBlobClient(blobURL, blobCreds, s.log) + blobClient, err := fetchBlobClient(blobURL, blobCreds, *s.cfg, s.log) if err != nil { s.log.Errorf("Job creation failed for container %s with error %v", s.src.ContainerName, err) return err diff --git a/x-pack/filebeat/input/azureblobstorage/types.go b/x-pack/filebeat/input/azureblobstorage/types.go index 67012198423..9b2b0a4eca7 100644 --- a/x-pack/filebeat/input/azureblobstorage/types.go +++ b/x-pack/filebeat/input/azureblobstorage/types.go @@ -8,6 +8,7 @@ package azureblobstorage import ( "time" + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" ) @@ -30,6 +31,7 @@ func (s *Source) Name() string { const ( sharedKeyType = "sharedKeyType" connectionStringType = "connectionStringType" + oauth2Type = "oauth2Type" jsonType = "application/json" octetType = "application/octet-stream" ndJsonType = "application/x-ndjson" @@ -39,6 +41,7 @@ const ( // currently only shared key & connection string types of credentials are supported type serviceCredentials struct { + oauth2Creds *azidentity.ClientSecretCredential sharedKeyCreds *azblob.SharedKeyCredential connectionStrCreds string cType string