Skip to content

Commit

Permalink
http_backoff retry with SliceFileObj (#2542)
Browse files Browse the repository at this point in the history
  • Loading branch information
hlky authored Sep 16, 2024
1 parent a49ca75 commit c9458ad
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 95 deletions.
3 changes: 2 additions & 1 deletion src/huggingface_hub/commands/lfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
from typing import Dict, List, Optional

from huggingface_hub.commands import BaseHuggingfaceCLICommand
from huggingface_hub.lfs import LFS_MULTIPART_UPLOAD_COMMAND, SliceFileObj
from huggingface_hub.lfs import LFS_MULTIPART_UPLOAD_COMMAND

from ..utils import get_session, hf_raise_for_status, logging
from ..utils._lfs import SliceFileObj


logger = logging.get_logger(__name__)
Expand Down
93 changes: 1 addition & 92 deletions src/huggingface_hub/lfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@

import inspect
import io
import os
import re
import warnings
from contextlib import AbstractContextManager
from dataclasses import dataclass
from math import ceil
from os.path import getsize
Expand All @@ -39,6 +37,7 @@
tqdm,
validate_hf_hub_args,
)
from .utils._lfs import SliceFileObj
from .utils.sha import sha256, sha_fileobj


Expand Down Expand Up @@ -462,93 +461,3 @@ def _upload_parts_hf_transfer(
if not supports_callback:
progress.update(total)
return output


class SliceFileObj(AbstractContextManager):
"""
Utility context manager to read a *slice* of a seekable file-like object as a seekable, file-like object.
This is NOT thread safe
Inspired by stackoverflow.com/a/29838711/593036
Credits to @julien-c
Args:
fileobj (`BinaryIO`):
A file-like object to slice. MUST implement `tell()` and `seek()` (and `read()` of course).
`fileobj` will be reset to its original position when exiting the context manager.
seek_from (`int`):
The start of the slice (offset from position 0 in bytes).
read_limit (`int`):
The maximum number of bytes to read from the slice.
Attributes:
previous_position (`int`):
The previous position
Examples:
Reading 200 bytes with an offset of 128 bytes from a file (ie bytes 128 to 327):
```python
>>> with open("path/to/file", "rb") as file:
... with SliceFileObj(file, seek_from=128, read_limit=200) as fslice:
... fslice.read(...)
```
Reading a file in chunks of 512 bytes
```python
>>> import os
>>> chunk_size = 512
>>> file_size = os.getsize("path/to/file")
>>> with open("path/to/file", "rb") as file:
... for chunk_idx in range(ceil(file_size / chunk_size)):
... with SliceFileObj(file, seek_from=chunk_idx * chunk_size, read_limit=chunk_size) as fslice:
... chunk = fslice.read(...)
```
"""

def __init__(self, fileobj: BinaryIO, seek_from: int, read_limit: int):
self.fileobj = fileobj
self.seek_from = seek_from
self.read_limit = read_limit

def __enter__(self):
self._previous_position = self.fileobj.tell()
end_of_stream = self.fileobj.seek(0, os.SEEK_END)
self._len = min(self.read_limit, end_of_stream - self.seek_from)
# ^^ The actual number of bytes that can be read from the slice
self.fileobj.seek(self.seek_from, io.SEEK_SET)
return self

def __exit__(self, exc_type, exc_value, traceback):
self.fileobj.seek(self._previous_position, io.SEEK_SET)

def read(self, n: int = -1):
pos = self.tell()
if pos >= self._len:
return b""
remaining_amount = self._len - pos
data = self.fileobj.read(remaining_amount if n < 0 else min(n, remaining_amount))
return data

def tell(self) -> int:
return self.fileobj.tell() - self.seek_from

def seek(self, offset: int, whence: int = os.SEEK_SET) -> int:
start = self.seek_from
end = start + self._len
if whence in (os.SEEK_SET, os.SEEK_END):
offset = start + offset if whence == os.SEEK_SET else end + offset
offset = max(start, min(offset, end))
whence = os.SEEK_SET
elif whence == os.SEEK_CUR:
cur_pos = self.fileobj.tell()
offset = max(start - cur_pos, min(offset, end - cur_pos))
else:
raise ValueError(f"whence value {whence} is not supported")
return self.fileobj.seek(offset, whence) - self.seek_from

def __iter__(self):
yield self.read(n=4 * 1024 * 1024)
3 changes: 2 additions & 1 deletion src/huggingface_hub/utils/_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
)
from . import logging
from ._fixes import JSONDecodeError
from ._lfs import SliceFileObj
from ._typing import HTTP_METHOD_T


Expand Down Expand Up @@ -290,7 +291,7 @@ def http_backoff(
# first HTTP request. We need to save the initial position so that the full content
# of the file is re-sent on http backoff. See warning tip in docstring.
io_obj_initial_pos = None
if "data" in kwargs and isinstance(kwargs["data"], io.IOBase):
if "data" in kwargs and isinstance(kwargs["data"], (io.IOBase, SliceFileObj)):
io_obj_initial_pos = kwargs["data"].tell()

session = get_session()
Expand Down
110 changes: 110 additions & 0 deletions src/huggingface_hub/utils/_lfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# coding=utf-8
# Copyright 2019-present, the HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Git LFS related utilities"""

import io
import os
from contextlib import AbstractContextManager
from typing import BinaryIO


class SliceFileObj(AbstractContextManager):
"""
Utility context manager to read a *slice* of a seekable file-like object as a seekable, file-like object.
This is NOT thread safe
Inspired by stackoverflow.com/a/29838711/593036
Credits to @julien-c
Args:
fileobj (`BinaryIO`):
A file-like object to slice. MUST implement `tell()` and `seek()` (and `read()` of course).
`fileobj` will be reset to its original position when exiting the context manager.
seek_from (`int`):
The start of the slice (offset from position 0 in bytes).
read_limit (`int`):
The maximum number of bytes to read from the slice.
Attributes:
previous_position (`int`):
The previous position
Examples:
Reading 200 bytes with an offset of 128 bytes from a file (ie bytes 128 to 327):
```python
>>> with open("path/to/file", "rb") as file:
... with SliceFileObj(file, seek_from=128, read_limit=200) as fslice:
... fslice.read(...)
```
Reading a file in chunks of 512 bytes
```python
>>> import os
>>> chunk_size = 512
>>> file_size = os.getsize("path/to/file")
>>> with open("path/to/file", "rb") as file:
... for chunk_idx in range(ceil(file_size / chunk_size)):
... with SliceFileObj(file, seek_from=chunk_idx * chunk_size, read_limit=chunk_size) as fslice:
... chunk = fslice.read(...)
```
"""

def __init__(self, fileobj: BinaryIO, seek_from: int, read_limit: int):
self.fileobj = fileobj
self.seek_from = seek_from
self.read_limit = read_limit

def __enter__(self):
self._previous_position = self.fileobj.tell()
end_of_stream = self.fileobj.seek(0, os.SEEK_END)
self._len = min(self.read_limit, end_of_stream - self.seek_from)
# ^^ The actual number of bytes that can be read from the slice
self.fileobj.seek(self.seek_from, io.SEEK_SET)
return self

def __exit__(self, exc_type, exc_value, traceback):
self.fileobj.seek(self._previous_position, io.SEEK_SET)

def read(self, n: int = -1):
pos = self.tell()
if pos >= self._len:
return b""
remaining_amount = self._len - pos
data = self.fileobj.read(remaining_amount if n < 0 else min(n, remaining_amount))
return data

def tell(self) -> int:
return self.fileobj.tell() - self.seek_from

def seek(self, offset: int, whence: int = os.SEEK_SET) -> int:
start = self.seek_from
end = start + self._len
if whence in (os.SEEK_SET, os.SEEK_END):
offset = start + offset if whence == os.SEEK_SET else end + offset
offset = max(start, min(offset, end))
whence = os.SEEK_SET
elif whence == os.SEEK_CUR:
cur_pos = self.fileobj.tell()
offset = max(start - cur_pos, min(offset, end - cur_pos))
else:
raise ValueError(f"whence value {whence} is not supported")
return self.fileobj.seek(offset, whence) - self.seek_from

def __iter__(self):
yield self.read(n=4 * 1024 * 1024)
3 changes: 2 additions & 1 deletion tests/test_lfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from hashlib import sha256
from io import BytesIO

from huggingface_hub.lfs import SliceFileObj, UploadInfo
from huggingface_hub.lfs import UploadInfo
from huggingface_hub.utils import SoftTemporaryDirectory
from huggingface_hub.utils._lfs import SliceFileObj


class TestUploadInfo(unittest.TestCase):
Expand Down

0 comments on commit c9458ad

Please sign in to comment.