Skip to content

Commit

Permalink
style(consumer): Convert to railway programming (#211)
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc authored Dec 16, 2024
1 parent 7ee73cd commit 398febc
Show file tree
Hide file tree
Showing 15 changed files with 326 additions and 463 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dependencies = [
"numpy == 2.1.0",
"ocf-blosc2 == 0.0.11",
"psutil == 6.0.0",
"returns == 0.23.0",
"returns == 0.24.0",
"s3fs == 2024.9.0",
"xarray == 2024.9.0",
"zarr == 2.18.3"
Expand Down
13 changes: 3 additions & 10 deletions src/nwp_consumer/cmd/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys
from typing import NamedTuple

from nwp_consumer.internal import handlers, ports, repositories, services
from nwp_consumer.internal import handlers, ports, repositories

log = logging.getLogger("nwp-consumer")

Expand Down Expand Up @@ -61,17 +61,10 @@ def parse_env() -> Adaptors:

def run_cli() -> None:
"""Entrypoint for the CLI handler."""
# TODO: InfoUseCase
adaptors = parse_env()
c = handlers.CLIHandler(
consumer_usecase=services.ConsumerService(
model_repository=adaptors.model_repository,
notification_repository=adaptors.notification_repository,
),
archiver_usecase=services.ArchiverService(
model_repository=adaptors.model_repository,
notification_repository=adaptors.notification_repository,
),
model_adaptor=adaptors.model_repository,
notification_adaptor=adaptors.notification_repository,
)
returncode: int = c.run()
sys.exit(returncode)
Expand Down
4 changes: 3 additions & 1 deletion src/nwp_consumer/internal/entities/performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class PerformanceMonitor(Thread):
memory_buffer: list[int]
cpu_buffer: list[float]
start_time: float
end_time: float
end_time: float | None
stop: bool = True

def __enter__(self) -> None:
Expand Down Expand Up @@ -60,6 +60,8 @@ def get_usage(self) -> tuple[int, float]:

def get_runtime(self) -> int:
"""Get the runtime of the thread in seconds."""
if self.end_time is None:
return int(time.time() - self.start_time)
return int(self.end_time - self.start_time)

def run(self) -> None:
Expand Down
61 changes: 34 additions & 27 deletions src/nwp_consumer/internal/handlers/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,27 @@
import datetime as dt
import logging

from returns.result import Failure, Success
from returns.result import Failure, ResultE

from nwp_consumer.internal import ports
from nwp_consumer.internal import ports, services

log = logging.getLogger("nwp-consumer")


class CLIHandler:
"""CLI driving actor."""

model_adaptor: type[ports.ModelRepository]
notification_adaptor: type[ports.NotificationRepository]

def __init__(
self,
consumer_usecase: ports.ConsumeUseCase,
archiver_usecase: ports.ArchiveUseCase,
) -> None:
model_adaptor: type[ports.ModelRepository],
notification_adaptor: type[ports.NotificationRepository],
) -> None:
"""Create a new instance."""
self._consumer_usecase = consumer_usecase
self._archiver_usecase = archiver_usecase

self.model_adaptor = model_adaptor
self.notification_adaptor = notification_adaptor

@property
def parser(self) -> argparse.ArgumentParser:
Expand Down Expand Up @@ -82,30 +84,35 @@ def run(self) -> int:
args = self.parser.parse_args()
match args.command:
case "consume":
result = self._consumer_usecase.consume(it=args.init_time)

match result:
case Failure(e):
log.error(f"Failed to consume NWP data: {e}")
return 1
case Success(path):
log.info(f"Successfully consumed NWP data to '{path}'")
return 0
service_result = services.ConsumerService.from_adaptors(
model_adaptor=self.model_adaptor,
notification_adaptor=self.notification_adaptor,
)
result: ResultE[str] = service_result.do(
consume_result
for service in service_result
for consume_result in service.consume(period=args.init_time)
)
if isinstance(result, Failure):
log.error(f"Failed to consume NWP data: {result!s}")
return 1

case "archive":
result = self._archiver_usecase.archive(year=args.year, month=args.month)

match result:
case Failure(e):
log.error(f"Failed to archive NWP data: {e}")
return 1
case Success(path):
log.info(f"Successfully archived NWP data to '{path}'")
return 0
service_result = services.ConsumerService.from_adaptors(
model_adaptor=self.model_adaptor,
notification_adaptor=self.notification_adaptor,
)
result = service_result.do(
consume_result
for service in service_result
for consume_result in service.consume(period=args.init_time)
)
if isinstance(result, Failure):
log.error(f"Failed to archive NWP data: {result!s}")
return 1

case "info":
log.error("Info command is coming soon! :)")
return 0

case _:
log.error(f"Unknown command: {args.command}")
Expand Down
6 changes: 2 additions & 4 deletions src/nwp_consumer/internal/ports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@
in the `repositories` module.
"""

from .services import ConsumeUseCase, ArchiveUseCase
from .repositories import ModelRepository, ZarrRepository, NotificationRepository
from .services import ConsumeUseCase
from .repositories import ModelRepository, NotificationRepository

__all__ = [
"ConsumeUseCase",
"ArchiveUseCase",
"ModelRepository",
"ZarrRepository",
"NotificationRepository",
]
11 changes: 0 additions & 11 deletions src/nwp_consumer/internal/ports/repositories.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import abc
import datetime as dt
import logging
import pathlib
from collections.abc import Callable, Iterator

import xarray as xr
Expand Down Expand Up @@ -124,16 +123,6 @@ def model() -> entities.ModelMetadata:
"""Metadata about the model."""
pass


class ZarrRepository(abc.ABC):
"""Interface for a repository that stores Zarr NWP data."""

@abc.abstractmethod
def save(self, src: pathlib.Path, dst: pathlib.Path) -> ResultE[str]:
"""Save NWP store data in the repository."""
pass


class NotificationRepository(abc.ABC):
"""Interface for a repository that sends notifications.
Expand Down
55 changes: 7 additions & 48 deletions src/nwp_consumer/internal/ports/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@
These interfaces define the signatures that *driving* actors must conform to
in order to interact with the core.
Also sometimes referred to as *primary ports*.
Sometimes referred to as *primary ports*.
"""

import abc
import datetime as dt

from returns.result import ResultE

from nwp_consumer.internal import entities


class ConsumeUseCase(abc.ABC):
"""Interface for the consumer use case.
Expand All @@ -24,16 +22,15 @@ class ConsumeUseCase(abc.ABC):


@abc.abstractmethod
def consume(self, it: dt.datetime | None = None) -> ResultE[str]:
"""Consume NWP data to Zarr format for desired init time.
def consume(self, period: dt.datetime | dt.date | None = None) -> ResultE[str]:
"""Consume NWP data to Zarr format for desired time period.
Where possible the implementation should be as memory-efficient as possible.
The designs of the repository methods also enable parallel processing within
the implementation.
Args:
it: The initialization time for which to consume data.
If None, the latest available forecast should be consumed.
period: The period for which to gather init time data.
Returns:
The path to the produced Zarr store.
Expand All @@ -46,51 +43,13 @@ def consume(self, it: dt.datetime | None = None) -> ResultE[str]:
pass

@abc.abstractmethod
def postprocess(self, options: entities.PostProcessOptions) -> ResultE[str]:
"""Postprocess the produced Zarr according to given options."""
pass


class ArchiveUseCase(abc.ABC):
"""Interface for the archive use case.
Defines the business-critical methods for the following use cases:
- 'A user should be able to archive NWP data for a given time period.'
"""

@abc.abstractmethod
def archive(self, year: int, month: int) -> ResultE[str]:
"""Archive NWP data to Zarr format for the given month.
def archive(self, period: dt.date) -> ResultE[str]:
"""Archive NWP data to Zarr format for desired time period.
Args:
year: The year for which to archive data.
month: The month for which to archive data.
period: The period for which to gather init time data.
Returns:
The path to the produced Zarr store.
"""
pass

class InfoUseCase(abc.ABC):
"""Interface for the notification use case.
Defines the business-critical methods for the following use cases:
- 'A user should be able to retrieve information about the service.'
"""

@abc.abstractmethod
def available_models(self) -> list[str]:
"""Get a list of available models."""
pass

@abc.abstractmethod
def model_repository_info(self) -> str:
"""Get information about the model repository."""
pass

@abc.abstractmethod
def model_info(self) -> str:
"""Get information about the model."""
pass
Loading

0 comments on commit 398febc

Please sign in to comment.