Skip to content

Commit

Permalink
test/service: add headers and unsigned signature in s3-gcp iam
Browse files Browse the repository at this point in the history
adding auth gcp project headers before calling any
s3 operation when `gcp_instance_metadata` is defined.
the auth is handled by the auth header, thus the
signature should be UNSIGNED

ref
redpanda-data/vtools#1510
  • Loading branch information
gousteris committed Nov 17, 2023
1 parent 6f8624b commit 829f474
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 10 deletions.
33 changes: 25 additions & 8 deletions tests/rptest/archival/s3_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from rptest.archival.shared_client_utils import key_to_topic

import boto3

from botocore import UNSIGNED
from botocore.config import Config
from botocore.exceptions import ClientError

Expand Down Expand Up @@ -55,7 +57,9 @@ def __init__(self,
secret_key,
logger,
endpoint=None,
disable_ssl=True):
disable_ssl=True,
signature_version='s3v4',
before_call_headers=None):

logger.debug(
f"Constructed S3Client in region {region}, endpoint {endpoint}, key is set = {access_key is not None}"
Expand All @@ -66,22 +70,31 @@ def __init__(self,
self._secret_key = secret_key
self._endpoint = endpoint
self._disable_ssl = disable_ssl
if signature_version.lower() == "unsigned":
self._signature_version = UNSIGNED
else:
self._signature_version = signature_version
self._before_call_headers = before_call_headers
self._cli = self.make_client()
self.logger = logger

def make_client(self):
cfg = Config(region_name=self._region,
signature_version='s3v4',
signature_version=self._signature_version,
retries={
'max_attempts': 10,
'mode': 'adaptive'
})
return boto3.client('s3',
config=cfg,
aws_access_key_id=self._access_key,
aws_secret_access_key=self._secret_key,
endpoint_url=self._endpoint,
use_ssl=not self._disable_ssl)
cl = boto3.client('s3',
config=cfg,
aws_access_key_id=self._access_key,
aws_secret_access_key=self._secret_key,
endpoint_url=self._endpoint,
use_ssl=not self._disable_ssl)
if self._before_call_headers is not None:
event_system = cl.meta.events
event_system.register('before-call.s3.*', self._add_header)
return cl

def create_bucket(self, name):
"""Create bucket in S3"""
Expand Down Expand Up @@ -458,3 +471,7 @@ def list_buckets(self, client=None) -> dict[str, Union[list, dict]]:
except Exception as ex:
self.logger.error(f'Error listing buckets: {ex}')
raise

def _add_header(self, model, params, request_signer, **kwargs):
for k, v in self._before_call_headers.items():
params['headers'][k] = v
26 changes: 24 additions & 2 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ class SISettings:
GLOBAL_S3_ACCESS_KEY = "s3_access_key"
GLOBAL_S3_SECRET_KEY = "s3_secret_key"
GLOBAL_S3_REGION_KEY = "s3_region"
GLOBAL_GCP_PROJECT_ID_KEY = "gcp_project_id"

GLOBAL_ABS_STORAGE_ACCOUNT = "abs_storage_account"
GLOBAL_ABS_SHARED_KEY = "abs_shared_key"
Expand Down Expand Up @@ -396,7 +397,9 @@ def __init__(self,
int] = None,
fast_uploads=False,
retention_local_strict=True,
cloud_storage_max_throughput_per_shard: Optional[int] = None):
cloud_storage_max_throughput_per_shard: Optional[int] = None,
cloud_storage_signature_version="s3v4",
before_call_headers=None):
"""
:param fast_uploads: if true, set low upload intervals to help tests run
quickly when they wait for uploads to complete.
Expand Down Expand Up @@ -491,6 +494,8 @@ def _load_s3_context(self, logger, test_context):
self.GLOBAL_S3_SECRET_KEY, None)
cloud_storage_region = test_context.globals.get(
self.GLOBAL_S3_REGION_KEY, None)
cloud_storage_gcp_project_id = test_context.globals.get(
self.GLOBAL_GCP_PROJECT_ID_KEY, None)

# Enable S3 if AWS creds were given at globals
if cloud_storage_credentials_source == 'aws_instance_metadata' or cloud_storage_credentials_source == 'gcp_instance_metadata':
Expand All @@ -502,6 +507,11 @@ def _load_s3_context(self, logger, test_context):
if test_context.globals.get(self.GLOBAL_CLOUD_PROVIDER,
'aws') == 'gcp':
self.endpoint_url = 'https://storage.googleapis.com'
self.cloud_storage_signature_version = "unsigned"
self.before_call_headers = {
"Authorization": f"Bearer {self.gcp_iam_token()}",
"x-goog-project-id": cloud_storage_gcp_project_id
}
self.cloud_storage_disable_tls = False # SI will fail to create archivers if tls is disabled
self.cloud_storage_region = cloud_storage_region
self.cloud_storage_api_endpoint_port = 443
Expand All @@ -526,6 +536,16 @@ def cloud_storage_bucket(self):
elif self.cloud_storage_type == CloudStorageType.ABS:
return self._cloud_storage_azure_container

def gcp_iam_token(self):
try:
res = requests.get(
"http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token",
headers={"Metadata-Flavor": "Google"})
except Exception as ex:
self.logger.error(f'Error getting IAM token from GCP: {ex}')
raise
return res.json()["access_token"]

# Call this to update the extra_rp_conf
def update_rp_conf(self, conf) -> dict[str, Any]:
if self.cloud_storage_type == CloudStorageType.S3:
Expand Down Expand Up @@ -2391,7 +2411,9 @@ def start_si(self):
secret_key=self._si_settings.cloud_storage_secret_key,
endpoint=self._si_settings.endpoint_url,
logger=self.logger,
)
signature_version=self._si_settings.
cloud_storage_signature_version,
before_call_headers=self._si_settings.before_call_headers)

self.logger.debug(
f"Creating S3 bucket: {self._si_settings.cloud_storage_bucket}"
Expand Down

0 comments on commit 829f474

Please sign in to comment.