Skip to content

Commit

Permalink
python/sdk: Improve error handling and logging for ObjectFile
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Wilson <aawilson@nvidia.com>
  • Loading branch information
aaronnw committed Oct 4, 2024
1 parent 9857e78 commit b61b3db
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 11 deletions.
36 changes: 27 additions & 9 deletions python/aistore/sdk/obj/object_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
import requests

from aistore.sdk.obj.content_iterator import ContentIterator
from aistore.sdk.obj.object_file_errors import (
ObjectFileMaxResumeError,
ObjectFileStreamError,
)
from aistore.sdk.utils import get_logger

logger = get_logger(__name__)
Expand Down Expand Up @@ -110,10 +114,10 @@ def __init__(self, content_iterator: ContentIterator, max_resume: int):
self._current_pos = 0
self._closed = False
self._buffer = SimpleBuffer()
self._chunk_iterator = self._content_iterator.iter_from_position(
self._current_pos
)
self._resume_total = 0
self._chunk_iterator = None

self._reset_iterator()

def close(self) -> None:
"""
Expand Down Expand Up @@ -166,6 +170,20 @@ def seekable(self) -> bool:
"""
return False

def _reset_iterator(self):
"""
Initialize a new iterator for establishing an object stream and reading chunks of data.
Raises: ObjectFileStreamError if a connection cannot be made.
"""
try:
self._chunk_iterator = self._content_iterator.iter_from_position(
self._current_pos + len(self._buffer)
)
except Exception as err:
logger.error("Error establishing object stream: (%s)", err)
raise ObjectFileStreamError(err) from err

def read(self, size=-1):
"""
Read bytes from the object, handling retries in case of stream errors.
Expand All @@ -175,6 +193,10 @@ def read(self, size=-1):
Returns:
bytes: The data read from the object.
Raises:
ObjectFileStreamError if a connection cannot be made.
ObjectFileMaxResumeError if the stream is interrupted more than the allowed maximum.
"""
if self._closed:
raise ValueError("I/O operation on closed file.")
Expand All @@ -191,18 +213,14 @@ def read(self, size=-1):
self._resume_total += 1
if self._resume_total > self._max_resume:
logger.error("Max retries reached. Cannot resume read.")
raise err
raise ObjectFileMaxResumeError(err, self._resume_total) from err
logger.warning(
"Chunked encoding error (%s), retrying %d/%d",
err,
self._resume_total,
self._max_resume,
)

# Reset the chunk iterator for resuming the stream
self._chunk_iterator = self._content_iterator.iter_from_position(
self._current_pos + len(self._buffer)
)
self._reset_iterator()

# Read data from the buffer
data = self._buffer.read(size)
Expand Down
25 changes: 25 additions & 0 deletions python/aistore/sdk/obj/object_file_errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#


class ObjectFileMaxResumeError(Exception):
"""
Raised when ObjectFile has exceeded the max number of stream resumes for an object
"""

def __init__(self, err, max_retries):
self.original_error = err
super().__init__(
f"Object file exceeded max number of stream resumptions: {max_retries}"
)


class ObjectFileStreamError(Exception):
"""
Raised when ObjectFile fails to establish a stream for an object
"""

def __init__(self, err):
self.original_error = err
super().__init__("Object file failed to establish stream connection")
40 changes: 38 additions & 2 deletions python/tests/unit/sdk/obj/test_object_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@
import os
import unittest
from unittest import mock
from unittest.mock import Mock

from requests.exceptions import ChunkedEncodingError

from aistore.sdk.obj.content_iterator import ContentIterator
from aistore.sdk.obj.object_file import ObjectFile
from aistore.sdk.const import DEFAULT_CHUNK_SIZE
from aistore.sdk.obj.object_file_errors import (
ObjectFileMaxResumeError,
ObjectFileStreamError,
)


# pylint: disable=too-few-public-methods
Expand Down Expand Up @@ -106,14 +112,44 @@ def test_buffer_usage(self):

# Retry & Resume Related Tests

def test_init_raises_stream_error(self):
"""Test that ObjectFile raises ObjectFileStreamError if no stream can be established."""
stream_creation_err = Exception("Can't connect to AIS")
mock_content_iterator = Mock()
mock_content_iterator.iter_from_position.side_effect = stream_creation_err

with self.assertRaises(ObjectFileStreamError):
ObjectFile(mock_content_iterator, max_resume=2)

def test_read_raises_stream_error(self):
"""Test that ObjectFile raises ObjectFileStreamError if no stream can be established on subsequent reads."""
stream_creation_err = Exception("Can't connect to AIS")
mock_content_iterator = Mock(spec=ContentIterator)
bad_iterator = BadContentIterator(
data=b"some data", fail_on_read=2
).iter_from_position(0)

# First attempt creates a bad iterator, which fails and triggers a re-initialization of the content iterator
# Second content_iterator can't connect at all
mock_content_iterator.iter_from_position.side_effect = [
bad_iterator,
stream_creation_err,
]

# The initialization before the ChunkedEncodingError happens without error
object_file = ObjectFile(mock_content_iterator, max_resume=2)
# ChunkedEncodingError is raised on a subsequent call, and new iterator fails to reconnect
with self.assertRaises(ObjectFileStreamError):
object_file.read()

def test_read_all_fails_after_max_retries(self):
"""Test that ObjectFile gives up after exceeding max retry attempts for read-all."""
data = os.urandom(DEFAULT_CHUNK_SIZE * 4)
mock_reader = BadContentIterator(data=data, fail_on_read=2)
object_file = ObjectFile(mock_reader, max_resume=2)

# Test that the read fails after 2 retry attempts
with self.assertRaises(ChunkedEncodingError):
with self.assertRaises(ObjectFileMaxResumeError):
object_file.read()

def test_read_fixed_fails_after_max_retries(self):
Expand All @@ -123,7 +159,7 @@ def test_read_fixed_fails_after_max_retries(self):
object_file = ObjectFile(mock_reader, max_resume=2)

# Test that the read fails after 2 retry attempts for a fixed-size read
with self.assertRaises(ChunkedEncodingError):
with self.assertRaises(ObjectFileMaxResumeError):
object_file.read(DEFAULT_CHUNK_SIZE * 4)

def test_read_all_success_after_retries(self):
Expand Down

0 comments on commit b61b3db

Please sign in to comment.