Skip to content

Commit

Permalink
Merge pull request #3 from wvenialbo/add-file-repository
Browse files Browse the repository at this point in the history
Add file repository
  • Loading branch information
wvenialbo authored Oct 25, 2024
2 parents 49608f2 + f18ead4 commit fb60ecb
Show file tree
Hide file tree
Showing 7 changed files with 741 additions and 176 deletions.
3 changes: 3 additions & 0 deletions src/GOES_DL/datasource/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
A datasource object that caches the files.
DatasourceHTTP
A datasource object for an HTTP server.
DownloaderRepository
Manage file operations for the downloader object.
"""

from .datasource import Datasource as Datasource
from .datasource_aws import DatasourceAWS as DatasourceAWS
from .datasource_cache import DatasourceCache as DatasourceCache
from .datasource_http import DatasourceHTTP as DatasourceHTTP
from .datasource_repository import DatasourceRepository as DatasourceRepository
145 changes: 47 additions & 98 deletions src/GOES_DL/datasource/datasource_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,25 @@
DatasourceAWS: Handle AWS-based data sources.
"""

from typing import Any, overload
from pathlib import Path
from typing import Any, Literal
from urllib.parse import ParseResult

import boto3
from botocore import UNSIGNED
from botocore.client import ClientError, Config
from mypy_boto3_s3.client import S3Client

from ..dataset import ProductLocator
from ..utils.url import url
from .datasource import Datasource
from .datasource_base import DatasourceBase
from .datasource_cache import DatasourceCache
from .datasource_repository import DatasourceRepository

AWS_CLIENT: str = "s3"
AWS_CLIENT: Literal["s3"] = "s3"


class DatasourceAWS(Datasource):
class DatasourceAWS(DatasourceBase):
"""
Handle AWS-based data sources.
Expand All @@ -29,67 +32,51 @@ class DatasourceAWS(Datasource):
location. The base URL of the datasource is the URL of the AWS S3
bucket.
Parameters
----------
locator : tuple[str, ...] | ProductLocator
A `ProductLocator` object or a tuple of strings containing
the base URL and an optional region where the S3 bucket is
located. E.g. "us-west-1", "us-east-1", "eu-west-1", etc. If
None, the default region is used.
Attributes
----------
base_url : str
The base URL of the datasource. This is the URL where the
datasource is located. The base URL is used to build the full
URL to the files and directories.
bucket_name : str
The name of the AWS S3 bucket.
base_path : str
The base path of the AWS S3 bucket.
s3_client : boto3.Client
The AWS S3 client.
cached : dict[str, list[str]]
The cached file lists in the datasource, organised by folder.
Methods
-------
bucket_exists(bucket_name: str) -> bool
Check if the bucket exists.
clear_cache(dir_path: str = "") -> None
Clear the cache.
get_client() -> Any
Get the AWS S3 client.
get_file(file_path: str) -> Any
Download a file into memory.
get_folder_path(dir_path: str) -> str
Get the folder path.
listdir(dir_path: str) -> list[str]
List the contents of a directory.
object_exists(bucket_name: str, object_path: str) -> bool
Check if the object exists.
Raises
------
ValueError
If the bucket does not exist or the user has no access.
"""

@overload
def __init__(
self, locator: tuple[str, ...], cache: DatasourceCache | None = None
) -> None: ...

@overload
def __init__(
self, locator: ProductLocator, cache: DatasourceCache | None = None
) -> None: ...
bucket_name: str
s3_client: S3Client

def __init__(
self,
locator: ProductLocator | tuple[str, ...],
cache: DatasourceCache | None = None,
repository: str | Path | DatasourceRepository | None = None,
cache: float | DatasourceCache | None = None,
) -> None:
"""
Initialize the DatasourceAWS object.
Parameters
----------
locator : ProductLocator | tuple[str, ...]
A `ProductLocator` object or a tuple of strings containing
the base URL and an optional region where the S3 bucket is
located. E.g. "us-west-1", "us-east-1", "eu-west-1", etc. If
None, the default region is used.
repository : str | Path | DatasourceRepository, optional
The directory where the files will be stored, by default
None.
cache : float | DatasourceCache, optional
The cache expiration time in seconds, by default None.
Raises
------
ValueError
If the bucket does not exist or the user has no access.
"""
base_url: str
region: str | None
if isinstance(locator, ProductLocator):
Expand All @@ -101,62 +88,17 @@ def __init__(

bucket_name: str = url_parts.netloc

self.s3_client: Any = self._get_client(region)
self.s3_client: S3Client = self._get_client(region)

if not self._bucket_exists(bucket_name):
raise ValueError(
f"Bucket '{bucket_name}' does not exist or you have no access."
)

super().__init__(base_url)
super().__init__(base_url, repository, cache)

self.bucket_name: str = bucket_name

self.cache: DatasourceCache = cache or DatasourceCache()

@overload
@staticmethod
def create(
locator: ProductLocator, life_time: float | None = None
) -> "DatasourceAWS": ...

@overload
@staticmethod
def create(
locator: tuple[str, ...], life_time: float | None = None
) -> "DatasourceAWS": ...

@staticmethod
def create(
locator: tuple[str, ...] | ProductLocator,
life_time: float | None = None,
) -> "DatasourceAWS":
"""
Create a new AWS-based datasource.
Create a new AWS-based datasource with a base URL or a
ProductLocator object.
Parameters
----------
locator : str
The base URL of a HTTP folder or a `ProductLocator` object.
life_time : float, optional
The cache life time in seconds, by default None.
Returns
-------
DatasourceHTTP
A new `DatasourceHTTP` object.
Raises
------
ValueError
If the resource does not exist or the user has no access.
"""
cache = DatasourceCache(life_time)
return DatasourceAWS(locator, cache)

def _bucket_exists(self, bucket_name: str) -> bool:
"""
Check if the bucket exists.
Expand All @@ -183,7 +125,7 @@ def _bucket_exists(self, bucket_name: str) -> bool:
return True

@staticmethod
def _get_client(region: str | None) -> Any:
def _get_client(region: str | None) -> S3Client:
"""
Get the AWS S3 client.
Expand Down Expand Up @@ -234,13 +176,20 @@ def get_file(self, file_path: str) -> bytes:
RuntimeError
If the file cannot be retrieved.
"""
folder_path: str = self.get_item_path(file_path)
local_file = self.repository.get_item(file_path)

if local_file is not None:
return local_file

folder_path: str = self._get_item_path(file_path)

try:
response: Any = self.s3_client.get_object(
response = self.s3_client.get_object(
Bucket=self.bucket_name, Key=folder_path
)
return bytes(response["Body"].read())
content = response["Body"].read()
self.repository.add_item(file_path, content)
return content

except ClientError as exc:
message: str = f"Unable to retrieve the file '{file_path}': {exc}"
Expand All @@ -254,7 +203,7 @@ def _url_join(head: str, tail: str) -> str:
return f"{head}/{tail}"
return head + tail

def get_item_path(self, dir_path: str) -> str:
def _get_item_path(self, dir_path: str) -> str:
"""
Get the folder path.
Expand Down Expand Up @@ -301,7 +250,7 @@ def listdir(self, dir_path: str) -> list[str]:
if cached_list is not None:
return cached_list

folder_path: str = self.get_item_path(dir_path)
folder_path: str = self._get_item_path(dir_path)

paginator: Any = self.s3_client.get_paginator("list_objects_v2")
pages: Any = paginator.paginate(
Expand Down
67 changes: 67 additions & 0 deletions src/GOES_DL/datasource/datasource_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""
Extend the Datasource interface with cache and repository support.
Classes:
DatasourceBase: Extend the Datasource interface.
"""

from pathlib import Path

from .datasource import Datasource
from .datasource_cache import DatasourceCache
from .datasource_repository import DatasourceRepository


class DatasourceBase(Datasource):
"""
Extend the Datasource interface with cache and repository support.
Attributes
----------
cache : DatasourceCache
The cache for the datasource.
repository : DatasourceRepository
The repository for the datasource.
"""

cache: DatasourceCache
repository: DatasourceRepository

def __init__(
self,
base_url: str,
repository: str | Path | DatasourceRepository | None,
cache: float | DatasourceCache | None,
) -> None:
"""
Initialize the DatasourceBase.
Parameters
----------
base_url : str
The base URL for the datasource.
repository : str | Path | DatasourceRepository | None
The repository for the datasource. If a path string is
provided, it will be used as the base path for the
repository. If `None` is provided, the repository will be
set to the current directory.
cache : float | DatasourceCache | None
The cache for the datasource. If a float is provided, it
will be used as the life time for each entry in the cache.
If `None` is provided, the cache will be set to have a life
time of 0.0 seconds, i.e. no caching.
"""
super().__init__(base_url)
if repository is None:
repository = "."
if isinstance(repository, (str, Path)):
base_path = repository
repository = DatasourceRepository(base_path)
self.repository = repository

if cache is None:
cache = 0.0
if isinstance(cache, float):
life_time: float = cache
cache = DatasourceCache(life_time)
self.cache = cache
Loading

0 comments on commit fb60ecb

Please sign in to comment.