From 3697856dc82e491e50e53b23b22cd84209983405 Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Wed, 6 Nov 2024 20:39:31 +0000 Subject: [PATCH] Fix new mypy errors and improve typing with Self --- scripts/force_layer_download.py | 6 +- test_runner/fixtures/common_types.py | 6 +- test_runner/fixtures/compute_reconfigure.py | 2 +- test_runner/fixtures/neon_cli.py | 3 - test_runner/fixtures/neon_fixtures.py | 67 +++++++++---------- test_runner/fixtures/parametrize.py | 1 + test_runner/fixtures/workload.py | 2 +- test_runner/regress/test_compute_metrics.py | 7 +- test_runner/regress/test_ddl_forwarding.py | 4 +- .../regress/test_pageserver_layer_rolling.py | 12 ++-- .../regress/test_pageserver_secondary.py | 2 +- test_runner/regress/test_sharding.py | 2 +- test_runner/regress/test_sni_router.py | 10 ++- .../regress/test_storage_controller.py | 4 +- test_runner/regress/test_wal_acceptor.py | 6 +- 15 files changed, 64 insertions(+), 70 deletions(-) diff --git a/scripts/force_layer_download.py b/scripts/force_layer_download.py index 6dbac08f3cf2..835e28c5d6ec 100644 --- a/scripts/force_layer_download.py +++ b/scripts/force_layer_download.py @@ -194,9 +194,11 @@ async def main_impl(args, report_out, client: Client): tenant_ids = await client.get_tenant_ids() get_timeline_id_coros = [client.get_timeline_ids(tenant_id) for tenant_id in tenant_ids] gathered = await asyncio.gather(*get_timeline_id_coros, return_exceptions=True) - assert len(tenant_ids) == len(gathered) tenant_and_timline_ids = [] - for tid, tlids in zip(tenant_ids, gathered, strict=False): + for tid, tlids in zip(tenant_ids, gathered, strict=True): + # TODO: add error handling if tlids isinstance(Exception) + assert isinstance(tlids, list) + for tlid in tlids: tenant_and_timline_ids.append((tid, tlid)) elif len(comps) == 1: diff --git a/test_runner/fixtures/common_types.py b/test_runner/fixtures/common_types.py index 212ed9207f3f..c73d5411fa09 100644 --- a/test_runner/fixtures/common_types.py +++ b/test_runner/fixtures/common_types.py @@ -190,10 +190,6 @@ def from_json(cls, d: dict[str, Any]) -> TenantTimelineId: ) -# Workaround for compat with python 3.9, which does not have `typing.Self` -TTenantShardId = TypeVar("TTenantShardId", bound="TenantShardId") - - class TenantShardId: def __init__(self, tenant_id: TenantId, shard_number: int, shard_count: int): self.tenant_id = tenant_id @@ -202,7 +198,7 @@ def __init__(self, tenant_id: TenantId, shard_number: int, shard_count: int): assert self.shard_number < self.shard_count or self.shard_count == 0 @classmethod - def parse(cls: type[TTenantShardId], input: str) -> TTenantShardId: + def parse(cls: type[TenantShardId], input: str) -> TenantShardId: if len(input) == 32: return cls( tenant_id=TenantId(input), diff --git a/test_runner/fixtures/compute_reconfigure.py b/test_runner/fixtures/compute_reconfigure.py index 4175f67ecbbb..33f01f80fbb9 100644 --- a/test_runner/fixtures/compute_reconfigure.py +++ b/test_runner/fixtures/compute_reconfigure.py @@ -69,7 +69,7 @@ def handler(request: Request) -> Response: # This causes the endpoint to query storage controller for its location, which # is redundant since we already have it here, but this avoids extending the # neon_local CLI to take full lists of locations - reconfigure_threads.submit(lambda workload=workload: workload.reconfigure()) # type: ignore[no-any-return] + reconfigure_threads.submit(lambda workload=workload: workload.reconfigure()) # type: ignore[misc] return Response(status=200) diff --git a/test_runner/fixtures/neon_cli.py b/test_runner/fixtures/neon_cli.py index 03a02f51fd2c..a85a1914553e 100644 --- a/test_runner/fixtures/neon_cli.py +++ b/test_runner/fixtures/neon_cli.py @@ -20,12 +20,9 @@ if TYPE_CHECKING: from typing import ( Any, - TypeVar, cast, ) - T = TypeVar("T") - # Used to be an ABC. abc.ABC removed due to linter without name change. class AbstractNeonCli: diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 195b788c7e41..e04cadf46f4b 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -102,10 +102,7 @@ if TYPE_CHECKING: from collections.abc import Callable - from typing import ( - Any, - TypeVar, - ) + from typing import Any, Self, TypeVar from fixtures.paths import SnapshotDirLocked @@ -838,7 +835,7 @@ def cleanup_remote_storage(self): if isinstance(x, S3Storage): x.do_cleanup() - def __enter__(self) -> NeonEnvBuilder: + def __enter__(self) -> Self: return self def __exit__( @@ -1148,21 +1145,19 @@ def start(self, timeout_in_seconds: int | None = None): with concurrent.futures.ThreadPoolExecutor( max_workers=2 + len(self.pageservers) + len(self.safekeepers) ) as executor: - futs.append( - executor.submit(lambda: self.broker.start() or None) - ) # The `or None` is for the linter + futs.append(executor.submit(lambda: self.broker.start())) for pageserver in self.pageservers: futs.append( executor.submit( - lambda ps=pageserver: ps.start(timeout_in_seconds=timeout_in_seconds) + lambda ps=pageserver: ps.start(timeout_in_seconds=timeout_in_seconds) # type: ignore[misc] ) ) for safekeeper in self.safekeepers: futs.append( executor.submit( - lambda sk=safekeeper: sk.start(timeout_in_seconds=timeout_in_seconds) + lambda sk=safekeeper: sk.start(timeout_in_seconds=timeout_in_seconds) # type: ignore[misc] ) ) @@ -1602,13 +1597,13 @@ def start( timeout_in_seconds: int | None = None, instance_id: int | None = None, base_port: int | None = None, - ): + ) -> Self: assert not self.running self.env.neon_cli.storage_controller_start(timeout_in_seconds, instance_id, base_port) self.running = True return self - def stop(self, immediate: bool = False) -> NeonStorageController: + def stop(self, immediate: bool = False) -> Self: if self.running: self.env.neon_cli.storage_controller_stop(immediate) self.running = False @@ -2282,7 +2277,7 @@ def set_preferred_azs(self, preferred_azs: dict[TenantShardId, str]) -> list[Ten response.raise_for_status() return [TenantShardId.parse(tid) for tid in response.json()["updated"]] - def __enter__(self) -> NeonStorageController: + def __enter__(self) -> Self: return self def __exit__( @@ -2304,7 +2299,7 @@ def start( timeout_in_seconds: int | None = None, instance_id: int | None = None, base_port: int | None = None, - ): + ) -> Self: assert instance_id is not None and base_port is not None self.env.neon_cli.storage_controller_start(timeout_in_seconds, instance_id, base_port) @@ -2324,7 +2319,7 @@ def stop_instance( self.running = any(meta["running"] for meta in self.instances.values()) return self - def stop(self, immediate: bool = False) -> NeonStorageController: + def stop(self, immediate: bool = False) -> Self: for iid, details in self.instances.items(): if details["running"]: self.env.neon_cli.storage_controller_stop(immediate, iid) @@ -2446,7 +2441,7 @@ def start( self, extra_env_vars: dict[str, str] | None = None, timeout_in_seconds: int | None = None, - ) -> NeonPageserver: + ) -> Self: """ Start the page server. `overrides` allows to add some config to this pageserver start. @@ -2481,7 +2476,7 @@ def start( return self - def stop(self, immediate: bool = False) -> NeonPageserver: + def stop(self, immediate: bool = False) -> Self: """ Stop the page server. Returns self. @@ -2529,7 +2524,7 @@ def complete(): wait_until(20, 0.5, complete) - def __enter__(self) -> NeonPageserver: + def __enter__(self) -> Self: return self def __exit__( @@ -2957,7 +2952,7 @@ def get_subdir_size(self, subdir: Path) -> int: """Return size of pgdatadir subdirectory in bytes.""" return get_dir_size(self.pgdatadir / subdir) - def __enter__(self) -> VanillaPostgres: + def __enter__(self) -> Self: return self def __exit__( @@ -3006,7 +3001,7 @@ def get_subdir_size(self, subdir) -> int: # See https://www.postgresql.org/docs/14/functions-admin.html#FUNCTIONS-ADMIN-GENFILE raise Exception("cannot get size of a Postgres instance") - def __enter__(self) -> RemotePostgres: + def __enter__(self) -> Self: return self def __exit__( @@ -3220,7 +3215,7 @@ def __init__( self.http_timeout_seconds = 15 self._popen: subprocess.Popen[bytes] | None = None - def start(self) -> NeonProxy: + def start(self) -> Self: assert self._popen is None # generate key of it doesn't exist @@ -3348,7 +3343,7 @@ async def find_auth_link(link_auth_uri, proc): log.info(f"SUCCESS, found auth url: {line}") return line - def __enter__(self) -> NeonProxy: + def __enter__(self) -> Self: return self def __exit__( @@ -3438,7 +3433,7 @@ def __init__( self.http_timeout_seconds = 15 self._popen: subprocess.Popen[bytes] | None = None - def start(self) -> NeonAuthBroker: + def start(self) -> Self: assert self._popen is None # generate key of it doesn't exist @@ -3507,7 +3502,7 @@ def get_metrics(self) -> str: request_result = requests.get(f"http://{self.host}:{self.http_port}/metrics") return request_result.text - def __enter__(self) -> NeonAuthBroker: + def __enter__(self) -> Self: return self def __exit__( @@ -3704,7 +3699,7 @@ def create( config_lines: list[str] | None = None, pageserver_id: int | None = None, allow_multiple: bool = False, - ) -> Endpoint: + ) -> Self: """ Create a new Postgres endpoint. Returns self. @@ -3750,7 +3745,7 @@ def start( safekeepers: list[int] | None = None, allow_multiple: bool = False, basebackup_request_tries: int | None = None, - ) -> Endpoint: + ) -> Self: """ Start the Postgres instance. Returns self. @@ -3797,7 +3792,7 @@ def config_file_path(self) -> Path: """Path to the postgresql.conf in the endpoint directory (not the one in pgdata)""" return self.endpoint_path() / "postgresql.conf" - def config(self, lines: list[str]) -> Endpoint: + def config(self, lines: list[str]) -> Self: """ Add lines to postgresql.conf. Lines should be an array of valid postgresql.conf rows. @@ -3873,7 +3868,7 @@ def stop( self, mode: str = "fast", sks_wait_walreceiver_gone: tuple[list[Safekeeper], TimelineId] | None = None, - ) -> Endpoint: + ) -> Self: """ Stop the Postgres instance if it's running. @@ -3907,7 +3902,7 @@ def stop( return self - def stop_and_destroy(self, mode: str = "immediate") -> Endpoint: + def stop_and_destroy(self, mode: str = "immediate") -> Self: """ Stop the Postgres instance, then destroy the endpoint. Returns self. @@ -3934,7 +3929,7 @@ def create_start( pageserver_id: int | None = None, allow_multiple: bool = False, basebackup_request_tries: int | None = None, - ) -> Endpoint: + ) -> Self: """ Create an endpoint, apply config, and start Postgres. Returns self. @@ -3957,7 +3952,7 @@ def create_start( return self - def __enter__(self) -> Endpoint: + def __enter__(self) -> Self: return self def __exit__( @@ -4058,7 +4053,7 @@ def create( pageserver_id=pageserver_id, ) - def stop_all(self, fail_on_error=True) -> EndpointFactory: + def stop_all(self, fail_on_error=True) -> Self: exception = None for ep in self.endpoints: try: @@ -4154,7 +4149,7 @@ def __init__( def start( self, extra_opts: list[str] | None = None, timeout_in_seconds: int | None = None - ) -> Safekeeper: + ) -> Self: if extra_opts is None: # Apply either the extra_opts passed in, or the ones from our constructor: we do not merge the two. extra_opts = self.extra_opts @@ -4189,7 +4184,7 @@ def start( break # success return self - def stop(self, immediate: bool = False) -> Safekeeper: + def stop(self, immediate: bool = False) -> Self: self.env.neon_cli.safekeeper_stop(self.id, immediate) self.running = False return self @@ -4367,13 +4362,13 @@ def __init__(self, env: NeonEnv): def start( self, timeout_in_seconds: int | None = None, - ): + ) -> Self: assert not self.running self.env.neon_cli.storage_broker_start(timeout_in_seconds) self.running = True return self - def stop(self): + def stop(self) -> Self: if self.running: self.env.neon_cli.storage_broker_stop() self.running = False diff --git a/test_runner/fixtures/parametrize.py b/test_runner/fixtures/parametrize.py index 0286b4f0363b..2c6adb8a334d 100644 --- a/test_runner/fixtures/parametrize.py +++ b/test_runner/fixtures/parametrize.py @@ -66,6 +66,7 @@ def pytest_generate_tests(metafunc: Metafunc): metafunc.parametrize("build_type", build_types) + pg_versions: list[PgVersion] if (v := os.getenv("DEFAULT_PG_VERSION")) is None: pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET] else: diff --git a/test_runner/fixtures/workload.py b/test_runner/fixtures/workload.py index 639e60914a33..72dc102538a3 100644 --- a/test_runner/fixtures/workload.py +++ b/test_runner/fixtures/workload.py @@ -53,7 +53,7 @@ def __init__( self._endpoint: Endpoint | None = None self._endpoint_opts = endpoint_opts or {} - def reconfigure(self): + def reconfigure(self) -> None: """ Request the endpoint to reconfigure based on location reported by storage controller """ diff --git a/test_runner/regress/test_compute_metrics.py b/test_runner/regress/test_compute_metrics.py index c7850362920d..1b15c5f15efa 100644 --- a/test_runner/regress/test_compute_metrics.py +++ b/test_runner/regress/test_compute_metrics.py @@ -17,7 +17,7 @@ if TYPE_CHECKING: from types import TracebackType - from typing import TypedDict + from typing import Self, TypedDict from fixtures.neon_fixtures import NeonEnv from fixtures.pg_version import PgVersion @@ -185,7 +185,7 @@ def start(self) -> None: def stop(self) -> None: raise NotImplementedError() - def __enter__(self) -> SqlExporterRunner: + def __enter__(self) -> Self: self.start() return self @@ -242,8 +242,7 @@ def __init__( self.with_volume_mapping(str(config_file), container_config_file, "z") self.with_volume_mapping(str(collector_file), container_collector_file, "z") - @override - def start(self) -> SqlExporterContainer: + def start(self) -> Self: super().start() log.info("Waiting for sql_exporter to be ready") diff --git a/test_runner/regress/test_ddl_forwarding.py b/test_runner/regress/test_ddl_forwarding.py index 8fb74f46e436..1c5554c37973 100644 --- a/test_runner/regress/test_ddl_forwarding.py +++ b/test_runner/regress/test_ddl_forwarding.py @@ -13,7 +13,7 @@ from werkzeug.wrappers.response import Response if TYPE_CHECKING: - from typing import Any + from typing import Any, Self def handle_db(dbs, roles, operation): @@ -91,7 +91,7 @@ def __init__(self, httpserver: HTTPServer, vanilla_pg: VanillaPostgres, host: st lambda request: ddl_forward_handler(request, self.dbs, self.roles, self) ) - def __enter__(self): + def __enter__(self) -> Self: self.pg.start() return self diff --git a/test_runner/regress/test_pageserver_layer_rolling.py b/test_runner/regress/test_pageserver_layer_rolling.py index 590354e9dac1..f6a7bfa1ade5 100644 --- a/test_runner/regress/test_pageserver_layer_rolling.py +++ b/test_runner/regress/test_pageserver_layer_rolling.py @@ -131,7 +131,7 @@ def test_pageserver_small_inmemory_layers( wait_until_pageserver_is_caught_up(env, last_flush_lsns) # We didn't write enough data to trigger a size-based checkpoint: we should see dirty data. - wait_until(10, 1, lambda: assert_dirty_bytes_nonzero(env)) # type: ignore + wait_until(10, 1, lambda: assert_dirty_bytes_nonzero(env)) ps_http_client = env.pageserver.http_client() total_wal_ingested_before_restart = wait_for_wal_ingest_metric(ps_http_client) @@ -139,7 +139,7 @@ def test_pageserver_small_inmemory_layers( # Within ~ the checkpoint interval, all the ephemeral layers should be frozen and flushed, # such that there are zero bytes of ephemeral layer left on the pageserver log.info("Waiting for background checkpoints...") - wait_until(CHECKPOINT_TIMEOUT_SECONDS * 2, 1, lambda: assert_dirty_bytes(env, 0)) # type: ignore + wait_until(CHECKPOINT_TIMEOUT_SECONDS * 2, 1, lambda: assert_dirty_bytes(env, 0)) # Zero ephemeral layer bytes does not imply that all the frozen layers were uploaded: they # must be uploaded to remain visible to the pageserver after restart. @@ -180,7 +180,7 @@ def test_idle_checkpoints(neon_env_builder: NeonEnvBuilder): wait_until_pageserver_is_caught_up(env, last_flush_lsns) # We didn't write enough data to trigger a size-based checkpoint: we should see dirty data. - wait_until(10, 1, lambda: assert_dirty_bytes_nonzero(env)) # type: ignore + wait_until(10, 1, lambda: assert_dirty_bytes_nonzero(env)) # Stop the safekeepers, so that we cannot have any more WAL receiver connections for sk in env.safekeepers: @@ -193,7 +193,7 @@ def test_idle_checkpoints(neon_env_builder: NeonEnvBuilder): # Within ~ the checkpoint interval, all the ephemeral layers should be frozen and flushed, # such that there are zero bytes of ephemeral layer left on the pageserver log.info("Waiting for background checkpoints...") - wait_until(CHECKPOINT_TIMEOUT_SECONDS * 2, 1, lambda: assert_dirty_bytes(env, 0)) # type: ignore + wait_until(CHECKPOINT_TIMEOUT_SECONDS * 2, 1, lambda: assert_dirty_bytes(env, 0)) # The code below verifies that we do not flush on the first write # after an idle period longer than the checkpoint timeout. @@ -210,7 +210,7 @@ def test_idle_checkpoints(neon_env_builder: NeonEnvBuilder): run_worker_for_tenant(env, 5, tenant_with_extra_writes, offset=ENTRIES_PER_TIMELINE) ) - dirty_after_write = wait_until(10, 1, lambda: assert_dirty_bytes_nonzero(env)) # type: ignore + dirty_after_write = wait_until(10, 1, lambda: assert_dirty_bytes_nonzero(env)) # We shouldn't flush since we've just opened a new layer waited_for = 0 @@ -312,4 +312,4 @@ def assert_dirty_data_limited(): dirty_bytes = get_dirty_bytes(env) assert dirty_bytes < max_dirty_data - wait_until(compaction_period_s * 2, 1, lambda: assert_dirty_data_limited()) # type: ignore + wait_until(compaction_period_s * 2, 1, lambda: assert_dirty_data_limited()) diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py index de0344bc29f5..a264f4d3c9c2 100644 --- a/test_runner/regress/test_pageserver_secondary.py +++ b/test_runner/regress/test_pageserver_secondary.py @@ -702,7 +702,7 @@ def await_log(pageserver, deadline, expression): else: timeout = int(deadline - now) + 1 try: - wait_until(timeout, 1, lambda: pageserver.assert_log_contains(expression)) # type: ignore + wait_until(timeout, 1, lambda: pageserver.assert_log_contains(expression)) except: log.error(f"Timed out waiting for '{expression}'") raise diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 16bfa83b4336..411574bd8621 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -1405,7 +1405,7 @@ def finish_split(): # e.g. while waiting for a storage controller to re-attach a parent shard if we failed # inside the pageserver and the storage controller responds by detaching children and attaching # parents concurrently (https://github.com/neondatabase/neon/issues/7148) - wait_until(10, 1, lambda: workload.churn_rows(10, upload=False, ingest=False)) # type: ignore + wait_until(10, 1, lambda: workload.churn_rows(10, upload=False, ingest=False)) workload.validate() diff --git a/test_runner/regress/test_sni_router.py b/test_runner/regress/test_sni_router.py index ef9974a15d00..2a26fef59aea 100644 --- a/test_runner/regress/test_sni_router.py +++ b/test_runner/regress/test_sni_router.py @@ -3,13 +3,17 @@ import socket import subprocess from pathlib import Path -from types import TracebackType +from typing import TYPE_CHECKING import backoff from fixtures.log_helper import log from fixtures.neon_fixtures import PgProtocol, VanillaPostgres from fixtures.port_distributor import PortDistributor +if TYPE_CHECKING: + from types import TracebackType + from typing import Self + def generate_tls_cert(cn, certout, keyout): subprocess.run( @@ -54,7 +58,7 @@ def __init__( self._popen: subprocess.Popen[bytes] | None = None self.test_output_dir = test_output_dir - def start(self) -> PgSniRouter: + def start(self) -> Self: assert self._popen is None args = [ str(self.neon_binpath / "pg_sni_router"), @@ -87,7 +91,7 @@ def wait_for_exit(self, timeout=2): if self._popen: self._popen.wait(timeout=2) - def __enter__(self) -> PgSniRouter: + def __enter__(self) -> Self: return self def __exit__( diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index dbddc5582366..13bc54a1146d 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -2494,14 +2494,14 @@ def start_env(env: NeonEnv, storage_controller_port: int): for pageserver in env.pageservers: futs.append( executor.submit( - lambda ps=pageserver: ps.start(timeout_in_seconds=timeout_in_seconds) + lambda ps=pageserver: ps.start(timeout_in_seconds=timeout_in_seconds) # type: ignore[misc] ) ) for safekeeper in env.safekeepers: futs.append( executor.submit( - lambda sk=safekeeper: sk.start(timeout_in_seconds=timeout_in_seconds) + lambda sk=safekeeper: sk.start(timeout_in_seconds=timeout_in_seconds) # type: ignore[misc] ) ) diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 4c404cd8812e..405f15e488e8 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -61,7 +61,7 @@ ) if TYPE_CHECKING: - from typing import Any + from typing import Any, Self def wait_lsn_force_checkpoint( @@ -1460,7 +1460,7 @@ def __init__( self.tenant_id: TenantId | None = None self.timeline_id: TimelineId | None = None - def init(self) -> SafekeeperEnv: + def init(self) -> Self: assert self.postgres is None, "postgres is already initialized" assert self.safekeepers is None, "safekeepers are already initialized" @@ -1541,7 +1541,7 @@ def kill_safekeeper(self, sk_dir): log.info(f"Killing safekeeper with pid {pid}") os.kill(pid, signal.SIGKILL) - def __enter__(self): + def __enter__(self) -> Self: return self def __exit__(self, exc_type, exc_value, traceback):