Skip to content

Commit

Permalink
chore: type the module
Browse files Browse the repository at this point in the history
  • Loading branch information
anand2312 committed Oct 13, 2021
1 parent 4d68841 commit b5f7316
Showing 1 changed file with 85 additions and 21 deletions.
106 changes: 85 additions & 21 deletions supabase/lib/storage/storage_bucket_api.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,43 @@
from __future__ import annotations

from collections.abc import Awaitable
from typing import Any, Literal, Optional, Union
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Literal, Optional, Type, Union

from httpx import AsyncClient, Client

RequestMethod = Literal["GET", "POST", "PUT", "DELETE", "PATCH"]
# dict is returned when the request was synchronous, awaitable when async, None if there was an error
_SyncOrAsyncResponse = Union[dict[str, Any], Awaitable, None]
__all__ = ["Bucket", "StorageBucketAPI"]

_RequestMethod = Literal["GET", "POST", "PUT", "DELETE", "PATCH"]


@dataclass
class Bucket:
id: str
name: str
owner: str
public: bool
created_at: datetime
updated_at: datetime

def __post_init__(self) -> None:
# created_at and updated_at are returned by the API as ISO timestamps
# so we convert them to datetime objects
self.created_at = datetime.fromisoformat(self.created_at) # type: ignore
self.updated_at = datetime.fromisoformat(self.updated_at) # type: ignore


ResponseType = Union[
dict[
str, str
], # response from an endpoint without a custom response_class, example: create_bucket
list[
Bucket
], # response from an endpoint which returns a list of objects, example: list_buckets
Bucket, # response from an endpoint which returns a single object, example: get_bucket
None,
]


class StorageBucketAPI:
Expand All @@ -27,50 +57,82 @@ def __init__(
self._client = Client(headers=self.headers)

def _request(
self, method: RequestMethod, url: str, json: Optional[dict[Any, Any]] = None
) -> Union[dict[Any, Any], Awaitable, None]:
self,
method: _RequestMethod,
url: str,
json: Optional[dict[Any, Any]] = None,
response_class: Optional[Type] = None,
) -> Any:
if self._is_async:
return self._async_request(method, url, json)
return self._async_request(method, url, json, response_class)
else:
return self._sync_request(method, url, json)
return self._sync_request(method, url, json, response_class)

def _sync_request(
self, method: RequestMethod, url: str, json: Optional[dict[Any, Any]] = None
) -> Optional[dict[Any, Any]]:
self,
method: _RequestMethod,
url: str,
json: Optional[dict[Any, Any]] = None,
response_class: Optional[Type] = None,
) -> ResponseType:
if isinstance(self._client, AsyncClient): # only to appease the type checker
return

response = self._client.request(method, url, json=json)
response.raise_for_status()
return response.json()

response_data = response.json()

if not response_class:
# if no response_class is specified, return the raw response
return response_data

if isinstance(response_data, list):
# if a list of objects are returned, convert each member to response_class
return [response_class(**item) for item in response_data]
else:
return response_class(**response_data)

async def _async_request(
self, method: RequestMethod, url: str, json: Optional[dict[Any, Any]] = None
) -> Optional[dict[Any, Any]]:
self,
method: _RequestMethod,
url: str,
json: Optional[dict[Any, Any]] = None,
response_class: Optional[Type] = None,
) -> ResponseType:
if isinstance(self._client, Client): # only to appease the type checker
return

response = await self._client.request(method, url, json=json)
response.raise_for_status()
return response.json()

def list_buckets(self) -> _SyncOrAsyncResponse:
response_data = response.json()

if not response_class:
return response_data

if isinstance(response_data, list):
return [response_class(**item) for item in response_data]
else:
return response_class(**response_data)

def list_buckets(self) -> Union[list[Bucket], Awaitable[list[Bucket]], None]:
"""Retrieves the details of all storage buckets within an existing product."""
return self._request("GET", f"{self.url}/bucket")
return self._request("GET", f"{self.url}/bucket", response_class=Bucket)

def get_bucket(self, id: str) -> _SyncOrAsyncResponse:
def get_bucket(self, id: str) -> Union[Bucket, Awaitable[Bucket], None]:
"""Retrieves the details of an existing storage bucket.
Parameters
----------
id
The unique identifier of the bucket you would like to retrieve.
"""
return self._request("GET", f"{self.url}/bucket/{id}")
return self._request("GET", f"{self.url}/bucket/{id}", response_class=Bucket)

def create_bucket(
self, id: str, name: str = None, public: bool = False
) -> _SyncOrAsyncResponse:
) -> Union[dict[str, str], Awaitable[dict[str, str]]]:
"""Creates a new storage bucket.
Parameters
Expand All @@ -88,7 +150,7 @@ def create_bucket(
json={"id": id, "name": name, "public": public},
)

def empty_bucket(self, id: str) -> _SyncOrAsyncResponse:
def empty_bucket(self, id: str) -> Union[dict[str, str], Awaitable[dict[str, str]]]:
"""Removes all objects inside a single bucket.
Parameters
Expand All @@ -98,7 +160,9 @@ def empty_bucket(self, id: str) -> _SyncOrAsyncResponse:
"""
return self._request("POST", f"{self.url}/bucket/{id}/empty", json={})

def delete_bucket(self, id: str) -> _SyncOrAsyncResponse:
def delete_bucket(
self, id: str
) -> Union[dict[str, str], Awaitable[dict[str, str]]]:
"""Deletes an existing bucket. Note that you cannot delete buckets with existing objects inside. You must first
`empty()` the bucket.
Expand Down

0 comments on commit b5f7316

Please sign in to comment.