diff --git a/tests/rptest/archival/s3_client.py b/tests/rptest/archival/s3_client.py index a47deb5f2f380..ce6738bf513b0 100644 --- a/tests/rptest/archival/s3_client.py +++ b/tests/rptest/archival/s3_client.py @@ -3,6 +3,8 @@ from rptest.archival.shared_client_utils import key_to_topic import boto3 + +from botocore import UNSIGNED from botocore.config import Config from botocore.exceptions import ClientError @@ -55,7 +57,9 @@ def __init__(self, secret_key, logger, endpoint=None, - disable_ssl=True): + disable_ssl=True, + signature_version='s3v4', + before_call_headers=None): logger.debug( f"Constructed S3Client in region {region}, endpoint {endpoint}, key is set = {access_key is not None}" @@ -66,22 +70,31 @@ def __init__(self, self._secret_key = secret_key self._endpoint = endpoint self._disable_ssl = disable_ssl + if signature_version.lower() == "unsigned": + self._signature_version = UNSIGNED + else: + self._signature_version = signature_version + self._before_call_headers = before_call_headers self._cli = self.make_client() self.logger = logger def make_client(self): cfg = Config(region_name=self._region, - signature_version='s3v4', + signature_version=self._signature_version, retries={ 'max_attempts': 10, 'mode': 'adaptive' }) - return boto3.client('s3', - config=cfg, - aws_access_key_id=self._access_key, - aws_secret_access_key=self._secret_key, - endpoint_url=self._endpoint, - use_ssl=not self._disable_ssl) + cl = boto3.client('s3', + config=cfg, + aws_access_key_id=self._access_key, + aws_secret_access_key=self._secret_key, + endpoint_url=self._endpoint, + use_ssl=not self._disable_ssl) + if self._before_call_headers is not None: + event_system = cl.meta.events + event_system.register('before-call.s3.*', self._add_header) + return cl def create_bucket(self, name): """Create bucket in S3""" @@ -458,3 +471,7 @@ def list_buckets(self, client=None) -> dict[str, Union[list, dict]]: except Exception as ex: self.logger.error(f'Error listing buckets: {ex}') raise + + def _add_header(self, model, params, request_signer, **kwargs): + for k, v in self._before_call_headers.items(): + params['headers'][k] = v diff --git a/tests/rptest/services/redpanda.py b/tests/rptest/services/redpanda.py index 0035f54ea0850..38dfa2b8c576b 100644 --- a/tests/rptest/services/redpanda.py +++ b/tests/rptest/services/redpanda.py @@ -356,6 +356,7 @@ class SISettings: GLOBAL_S3_ACCESS_KEY = "s3_access_key" GLOBAL_S3_SECRET_KEY = "s3_secret_key" GLOBAL_S3_REGION_KEY = "s3_region" + GLOBAL_GCP_PROJECT_ID_KEY = "gcp_project_id" GLOBAL_ABS_STORAGE_ACCOUNT = "abs_storage_account" GLOBAL_ABS_SHARED_KEY = "abs_shared_key" @@ -396,7 +397,9 @@ def __init__(self, int] = None, fast_uploads=False, retention_local_strict=True, - cloud_storage_max_throughput_per_shard: Optional[int] = None): + cloud_storage_max_throughput_per_shard: Optional[int] = None, + cloud_storage_signature_version="s3v4", + before_call_headers=None): """ :param fast_uploads: if true, set low upload intervals to help tests run quickly when they wait for uploads to complete. @@ -491,6 +494,8 @@ def _load_s3_context(self, logger, test_context): self.GLOBAL_S3_SECRET_KEY, None) cloud_storage_region = test_context.globals.get( self.GLOBAL_S3_REGION_KEY, None) + cloud_storage_gcp_project_id = test_context.globals.get( + self.GLOBAL_GCP_PROJECT_ID_KEY, None) # Enable S3 if AWS creds were given at globals if cloud_storage_credentials_source == 'aws_instance_metadata' or cloud_storage_credentials_source == 'gcp_instance_metadata': @@ -502,6 +507,11 @@ def _load_s3_context(self, logger, test_context): if test_context.globals.get(self.GLOBAL_CLOUD_PROVIDER, 'aws') == 'gcp': self.endpoint_url = 'https://storage.googleapis.com' + self.cloud_storage_signature_version = "unsigned" + self.before_call_headers = { + "Authorization": f"Bearer {self.gcp_iam_token()}", + "x-goog-project-id": cloud_storage_gcp_project_id + } self.cloud_storage_disable_tls = False # SI will fail to create archivers if tls is disabled self.cloud_storage_region = cloud_storage_region self.cloud_storage_api_endpoint_port = 443 @@ -526,6 +536,16 @@ def cloud_storage_bucket(self): elif self.cloud_storage_type == CloudStorageType.ABS: return self._cloud_storage_azure_container + def gcp_iam_token(self): + try: + res = requests.get( + "http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token", + headers={"Metadata-Flavor": "Google"}) + except Exception as ex: + self.logger.error(f'Error getting IAM token from GCP: {ex}') + raise + return res.json()["access_token"] + # Call this to update the extra_rp_conf def update_rp_conf(self, conf) -> dict[str, Any]: if self.cloud_storage_type == CloudStorageType.S3: @@ -2391,7 +2411,9 @@ def start_si(self): secret_key=self._si_settings.cloud_storage_secret_key, endpoint=self._si_settings.endpoint_url, logger=self.logger, - ) + signature_version=self._si_settings. + cloud_storage_signature_version, + before_call_headers=self._si_settings.before_call_headers) self.logger.debug( f"Creating S3 bucket: {self._si_settings.cloud_storage_bucket}"