-
Notifications
You must be signed in to change notification settings - Fork 31
/
Copy pathpublic.py
36 lines (25 loc) · 975 Bytes
/
public.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import logging
import requests
from .cid import CID
from .types import FetchError, IPFSProvider, PinError, UploadError
logger = logging.getLogger(__name__)
class PublicIPFS(IPFSProvider):
"""Public IPFS gateway (fetch-only provider)"""
# pylint:disable=duplicate-code
GATEWAY = "https://ipfs.io"
def __init__(self, *, timeout: int) -> None:
super().__init__()
self.timeout = timeout
def fetch(self, cid: CID) -> bytes:
url = f"{self.GATEWAY}/ipfs/{cid}"
try:
resp = requests.get(url, timeout=self.timeout)
resp.raise_for_status()
except requests.RequestException as ex:
logger.error({"msg": "Request has been failed", "error": str(ex)})
raise FetchError(cid) from ex
return resp.content
def upload(self, content: bytes, name: str | None = None) -> CID:
raise UploadError
def pin(self, cid: CID) -> None:
raise PinError(cid)