Skip to content

Commit

Permalink
test: Remote storage refactorings (#5243)
Browse files Browse the repository at this point in the history
Remote storage cleanup split from #5198:
- pageserver, extensions, and safekeepers now have their separate remote
storage
- RemoteStorageKind has the configuration code
- S3Storage has the cleanup code
- with MOCK_S3, pageserver, extensions, safekeepers use different
buckets
- with LOCAL_FS, `repo_dir / "local_fs_remote_storage" / $user` is used
as path, where $user is `pageserver`, `safekeeper`
- no more `NeonEnvBuilder.enable_xxx_remote_storage` but one
`enable_{pageserver,extensions,safekeeper}_remote_storage`

Should not have any real changes. These will allow us to default to
`LOCAL_FS` for pageserver on the next PR, remove
`RemoteStorageKind.NOOP`, work towards #5172.

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
  • Loading branch information
koivunej and bayandin authored Sep 8, 2023
1 parent cdc65c1 commit ff87fc5
Show file tree
Hide file tree
Showing 24 changed files with 434 additions and 494 deletions.
303 changes: 86 additions & 217 deletions test_runner/fixtures/neon_fixtures.py

Large diffs are not rendered by default.

16 changes: 6 additions & 10 deletions test_runner/fixtures/pageserver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,11 @@ def list_prefix(
Note that this function takes into account prefix_in_bucket.
"""
# For local_fs we need to properly handle empty directories, which we currently dont, so for simplicity stick to s3 api.
assert neon_env_builder.remote_storage_kind in (
RemoteStorageKind.MOCK_S3,
RemoteStorageKind.REAL_S3,
)
# For mypy
assert isinstance(neon_env_builder.remote_storage, S3Storage)
assert neon_env_builder.remote_storage_client is not None
remote = neon_env_builder.pageserver_remote_storage
assert isinstance(remote, S3Storage), "localfs is currently not supported"
assert remote.client is not None

prefix_in_bucket = neon_env_builder.remote_storage.prefix_in_bucket or ""
prefix_in_bucket = remote.prefix_in_bucket or ""
if not prefix:
prefix = prefix_in_bucket
else:
Expand All @@ -277,9 +273,9 @@ def list_prefix(
prefix = "/".join((prefix_in_bucket, prefix))

# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
response = neon_env_builder.remote_storage_client.list_objects_v2(
response = remote.client.list_objects_v2(
Delimiter="/",
Bucket=neon_env_builder.remote_storage.bucket_name,
Bucket=remote.bucket_name,
Prefix=prefix,
)
return response
Expand Down
294 changes: 233 additions & 61 deletions test_runner/fixtures/remote_storage.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,36 @@
import enum
import hashlib
import json
import os
import re
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union

import boto3
from mypy_boto3_s3 import S3Client

from fixtures.log_helper import log
from fixtures.types import TenantId, TimelineId

TIMELINE_INDEX_PART_FILE_NAME = "index_part.json"


@enum.unique
class RemoteStorageUser(str, enum.Enum):
"""
Instead of using strings for the users, use a more strict enum.
"""

PAGESERVER = "pageserver"
EXTENSIONS = "ext"
SAFEKEEPER = "safekeeper"

def __str__(self) -> str:
return self.value


class MockS3Server:
"""
Starts a mock S3 server for testing on a port given, errors if the server fails to start or exits prematurely.
Expand Down Expand Up @@ -58,49 +77,6 @@ def kill(self):
self.subprocess.kill()


@enum.unique
class RemoteStorageKind(str, enum.Enum):
LOCAL_FS = "local_fs"
MOCK_S3 = "mock_s3"
REAL_S3 = "real_s3"
# Pass to tests that are generic to remote storage
# to ensure the test pass with or without the remote storage
NOOP = "noop"


def available_remote_storages() -> List[RemoteStorageKind]:
remote_storages = [RemoteStorageKind.LOCAL_FS, RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
remote_storages.append(RemoteStorageKind.REAL_S3)
log.info("Enabling real s3 storage for tests")
else:
log.info("Using mock implementations to test remote storage")
return remote_storages


def available_s3_storages() -> List[RemoteStorageKind]:
remote_storages = [RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
remote_storages.append(RemoteStorageKind.REAL_S3)
log.info("Enabling real s3 storage for tests")
else:
log.info("Using mock implementations to test remote storage")
return remote_storages


def s3_storage() -> RemoteStorageKind:
"""
For tests that require a remote storage impl that exposes an S3
endpoint, but don't want to parametrize over multiple storage types.
Use real S3 if available, else use MockS3
"""
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
return RemoteStorageKind.REAL_S3
else:
return RemoteStorageKind.MOCK_S3


@dataclass
class LocalFsStorage:
root: Path
Expand All @@ -118,15 +94,28 @@ def index_content(self, tenant_id: TenantId, timeline_id: TimelineId):
with self.index_path(tenant_id, timeline_id).open("r") as f:
return json.load(f)

def to_toml_inline_table(self) -> str:
return f"local_path='{self.root}'"

def cleanup(self):
# no cleanup is done here, because there's NeonEnvBuilder.cleanup_local_storage which will remove everything, including localfs files
pass

@staticmethod
def component_path(repo_dir: Path, user: RemoteStorageUser) -> Path:
return repo_dir / "local_fs_remote_storage" / str(user)


@dataclass
class S3Storage:
bucket_name: str
bucket_region: str
access_key: str
secret_key: str
prefix_in_bucket: str
client: S3Client
cleanup: bool
endpoint: Optional[str] = None
prefix_in_bucket: Optional[str] = ""

def access_env_vars(self) -> Dict[str, str]:
return {
Expand All @@ -144,29 +133,212 @@ def to_string(self) -> str:
}
)

def to_toml_inline_table(self) -> str:
s = [
f"bucket_name='{self.bucket_name}'",
f"bucket_region='{self.bucket_region}'",
]

if self.prefix_in_bucket is not None:
s.append(f"prefix_in_bucket='{self.prefix_in_bucket}'")

if self.endpoint is not None:
s.append(f"endpoint='{self.endpoint}'")

return ",".join(s)

def do_cleanup(self):
if not self.cleanup:
# handles previous keep_remote_storage_contents
return

log.info(
"removing data from test s3 bucket %s by prefix %s",
self.bucket_name,
self.prefix_in_bucket,
)
paginator = self.client.get_paginator("list_objects_v2")
pages = paginator.paginate(
Bucket=self.bucket_name,
Prefix=self.prefix_in_bucket,
)

# Using Any because DeleteTypeDef (from boto3-stubs) doesn't fit our case
objects_to_delete: Any = {"Objects": []}
cnt = 0
for item in pages.search("Contents"):
# weirdly when nothing is found it returns [None]
if item is None:
break

objects_to_delete["Objects"].append({"Key": item["Key"]})

# flush once aws limit reached
if len(objects_to_delete["Objects"]) >= 1000:
self.client.delete_objects(
Bucket=self.bucket_name,
Delete=objects_to_delete,
)
objects_to_delete = {"Objects": []}
cnt += 1

# flush rest
if len(objects_to_delete["Objects"]):
self.client.delete_objects(
Bucket=self.bucket_name,
Delete=objects_to_delete,
)

log.info(f"deleted {cnt} objects from remote storage")


RemoteStorage = Union[LocalFsStorage, S3Storage]


# serialize as toml inline table
def remote_storage_to_toml_inline_table(remote_storage: RemoteStorage) -> str:
if isinstance(remote_storage, LocalFsStorage):
remote_storage_config = f"local_path='{remote_storage.root}'"
elif isinstance(remote_storage, S3Storage):
remote_storage_config = f"bucket_name='{remote_storage.bucket_name}',\
bucket_region='{remote_storage.bucket_region}'"
@enum.unique
class RemoteStorageKind(str, enum.Enum):
LOCAL_FS = "local_fs"
MOCK_S3 = "mock_s3"
REAL_S3 = "real_s3"
# Pass to tests that are generic to remote storage
# to ensure the test pass with or without the remote storage
NOOP = "noop"

def configure(
self,
repo_dir: Path,
mock_s3_server,
run_id: str,
test_name: str,
user: RemoteStorageUser,
bucket_name: Optional[str] = None,
bucket_region: Optional[str] = None,
) -> Optional[RemoteStorage]:
if self == RemoteStorageKind.NOOP:
return None

if self == RemoteStorageKind.LOCAL_FS:
return LocalFsStorage(LocalFsStorage.component_path(repo_dir, user))

# real_s3 uses this as part of prefix, mock_s3 uses this as part of
# bucket name, giving all users unique buckets because we have to
# create them
test_name = re.sub(r"[_\[\]]", "-", test_name)

def to_bucket_name(user: str, test_name: str) -> str:
s = f"{user}-{test_name}"

if len(s) > 63:
prefix = s[:30]
suffix = hashlib.sha256(test_name.encode()).hexdigest()[:32]
s = f"{prefix}-{suffix}"
assert len(s) == 63

return s

if self == RemoteStorageKind.MOCK_S3:
# there's a single mock_s3 server for each process running the tests
mock_endpoint = mock_s3_server.endpoint()
mock_region = mock_s3_server.region()

access_key, secret_key = mock_s3_server.access_key(), mock_s3_server.secret_key()

client = boto3.client(
"s3",
endpoint_url=mock_endpoint,
region_name=mock_region,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
)

bucket_name = to_bucket_name(user, test_name)
log.info(
f"using mock_s3 bucket name {bucket_name} for user={user}, test_name={test_name}"
)

return S3Storage(
bucket_name=bucket_name,
endpoint=mock_endpoint,
bucket_region=mock_region,
access_key=access_key,
secret_key=secret_key,
prefix_in_bucket="",
client=client,
cleanup=False,
)

assert self == RemoteStorageKind.REAL_S3

env_access_key = os.getenv("AWS_ACCESS_KEY_ID")
assert env_access_key, "no aws access key provided"
env_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
assert env_secret_key, "no aws access key provided"

# session token is needed for local runs with sso auth
session_token = os.getenv("AWS_SESSION_TOKEN")

bucket_name = bucket_name or os.getenv("REMOTE_STORAGE_S3_BUCKET")
assert bucket_name is not None, "no remote storage bucket name provided"
bucket_region = bucket_region or os.getenv("REMOTE_STORAGE_S3_REGION")
assert bucket_region is not None, "no remote storage region provided"

prefix_in_bucket = f"{run_id}/{test_name}/{user}"

client = boto3.client(
"s3",
region_name=bucket_region,
aws_access_key_id=env_access_key,
aws_secret_access_key=env_secret_key,
aws_session_token=session_token,
)

return S3Storage(
bucket_name=bucket_name,
bucket_region=bucket_region,
access_key=env_access_key,
secret_key=env_secret_key,
prefix_in_bucket=prefix_in_bucket,
client=client,
cleanup=True,
)


def available_remote_storages() -> List[RemoteStorageKind]:
remote_storages = [RemoteStorageKind.LOCAL_FS, RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
remote_storages.append(RemoteStorageKind.REAL_S3)
log.info("Enabling real s3 storage for tests")
else:
log.info("Using mock implementations to test remote storage")
return remote_storages

if remote_storage.prefix_in_bucket is not None:
remote_storage_config += f",prefix_in_bucket='{remote_storage.prefix_in_bucket}'"

if remote_storage.endpoint is not None:
remote_storage_config += f",endpoint='{remote_storage.endpoint}'"
def available_s3_storages() -> List[RemoteStorageKind]:
remote_storages = [RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
remote_storages.append(RemoteStorageKind.REAL_S3)
log.info("Enabling real s3 storage for tests")
else:
raise Exception("invalid remote storage type")
log.info("Using mock implementations to test remote storage")
return remote_storages

return f"{{{remote_storage_config}}}"

def s3_storage() -> RemoteStorageKind:
"""
For tests that require a remote storage impl that exposes an S3
endpoint, but don't want to parametrize over multiple storage types.
Use real S3 if available, else use MockS3
"""
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
return RemoteStorageKind.REAL_S3
else:
return RemoteStorageKind.MOCK_S3


# serialize as toml inline table
def remote_storage_to_toml_inline_table(remote_storage: RemoteStorage) -> str:
if not isinstance(remote_storage, (LocalFsStorage, S3Storage)):
raise Exception("invalid remote storage type")

class RemoteStorageUsers(enum.Flag):
PAGESERVER = enum.auto()
SAFEKEEPER = enum.auto()
return f"{{{remote_storage.to_toml_inline_table()}}}"
Loading

1 comment on commit ff87fc5

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1708 tests run: 1629 passed, 0 failed, 79 skipped (full report)


Code coverage (full report)

  • functions: 52.8% (7592 of 14380 functions)
  • lines: 81.4% (44638 of 54841 lines)

The comment gets automatically updated with the latest test results
ff87fc5 at 2023-09-08T11:41:00.512Z :recycle:

Please sign in to comment.