Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffered S3 reader #247

Merged
merged 11 commits into from
Dec 28, 2021
2 changes: 1 addition & 1 deletion pfio/v2/hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class Hdfs(FS):

'''

def __init__(self, cwd=None):
def __init__(self, cwd=None, **_):
super().__init__()
self._fs = _create_fs()
assert self._fs is not None
Expand Down
3 changes: 2 additions & 1 deletion pfio/v2/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ def __init__(self, _stat, filename):


class Local(FS):
def __init__(self, cwd=None):
def __init__(self, cwd=None, **_):
super().__init__()

if cwd is None:
self._cwd = ''
else:
Expand Down
34 changes: 29 additions & 5 deletions pfio/v2/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
from .fs import FS, FileStat


DEFAULT_MAX_BUFFER_SIZE = 16 * 1024 * 1024


def _normalize_key(key: str) -> str:
key = os.path.normpath(key)
if key.startswith("/"):
Expand Down Expand Up @@ -40,8 +43,10 @@ def isdir(self):
return True


class _ObjectReader:
class _ObjectReader(io.RawIOBase):
def __init__(self, client, bucket, key, mode, kwargs):
super(_ObjectReader, self).__init__()

self.client = client
self.bucket = bucket
self.key = key
Expand Down Expand Up @@ -141,6 +146,14 @@ def writable(self):
def write(self, data):
raise io.UnsupportedOperation('not writable')

def readall(self):
return self.read(-1)

def readinto(self, b):
buf = self.read(len(b))
b[:len(buf)] = buf
return len(buf)


class _ObjectWriter:
def __init__(self, client, bucket, key, mode, mpu_chunksize, kwargs):
Expand Down Expand Up @@ -280,13 +293,21 @@ class S3(FS):
- ``aws_secret_access_key``, ``AWS_SECRET_ACCESS_KEY``
- ``endpoint``, ``S3_ENDPOINT``

It supports buffering when opening a file in binary read mode ("rb").
When ``buffering`` is set to -1 (default), the buffer size will be
the size of the file or ``pfio.v2.S3.DEFAULT_MAX_BUFFER_SIZE``,
whichever smaller.
``buffering=0`` disables buffering, and ``buffering>0`` forcibly sets the
specified value as the buffer size in bytes.
'''

def __init__(self, bucket, prefix=None,
endpoint=None, create_bucket=False,
aws_access_key_id=None,
aws_secret_access_key=None,
mpu_chunksize=32*1024*1024):
mpu_chunksize=32*1024*1024,
buffering=-1,
**_):
super().__init__()
self.bucket = bucket
if prefix is not None:
Expand All @@ -295,6 +316,7 @@ def __init__(self, bucket, prefix=None,
self.cwd = ''

self.mpu_chunksize = mpu_chunksize
self.buffering = buffering

# boto3.set_stream_logger()

Expand Down Expand Up @@ -356,9 +378,11 @@ def open(self, path, mode='r', **kwargs):
if 'r' in mode:
obj = _ObjectReader(self.client, self.bucket, path, mode, kwargs)
if 'b' in mode:
# TODO: BufferedIOBase requires readinto() implemeted
# obj = io.BufferedReader(obj)
pass
if self.buffering:
bs = self.buffering
if bs < 0:
bs = min(obj.content_length, DEFAULT_MAX_BUFFER_SIZE)
obj = io.BufferedReader(obj, buffer_size=bs)
else:
obj = io.TextIOWrapper(obj)

Expand Down
2 changes: 1 addition & 1 deletion pfio/v2/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def __init__(self, zip_info):
class Zip(FS):
_readonly = True

def __init__(self, backend, file_path, mode='r', cwd=None):
def __init__(self, backend, file_path, mode='r', **_):
super().__init__()
self.backend = backend
self.file_path = file_path
Expand Down
136 changes: 102 additions & 34 deletions tests/v2_tests/test_s3.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
import multiprocessing as mp
import os
import pickle
Expand All @@ -8,6 +9,7 @@

from pfio.v2 import S3, from_url, open_url
from pfio.v2.fs import ForkedError
from pfio.v2.s3 import _ObjectReader


@pytest.fixture
Expand All @@ -28,8 +30,16 @@ class _S3Fixture():
yield _S3Fixture()


def test_s3(s3_fixture):
with from_url('s3://test-bucket/base', **s3_fixture.aws_kwargs) as s3:
def touch(s3, path, content):
with s3.open(path, 'w') as fp:
fp.write(content)

assert s3.exists(path)


def test_s3_init(s3_fixture):
with from_url('s3://test-bucket/base',
**s3_fixture.aws_kwargs) as s3:
assert s3_fixture.bucket == s3.bucket
assert '/base' == s3.cwd
assert s3_fixture.aws_kwargs['aws_access_key_id'] \
Expand All @@ -38,24 +48,13 @@ def test_s3(s3_fixture):
== s3.aws_secret_access_key
assert s3.endpoint is None

with s3.open('foo.txt', 'w') as fp:
fp.write('bar')
assert not fp.closed

with s3.open('foo.txt', 'r') as fp:
assert 'bar' == fp.read()
assert not fp.closed
def test_s3_files(s3_fixture):
with from_url('s3://test-bucket/base',
**s3_fixture.aws_kwargs) as s3:

with s3.open('foo.txt', 'rb') as fp:
assert b'b' == fp.read(1)
assert b'a' == fp.read(1)
assert b'r' == fp.read(1)
assert b'' == fp.read(1)
assert b'' == fp.read(1)
fp.seek(1)
assert b'a' == fp.read(1)
assert b'r' == fp.read(1)
assert b'' == fp.read(1)
with s3.open('foo.txt', 'w') as fp:
fp.write('bar')
assert not fp.closed

assert ['foo.txt'] == list(s3.list())
Expand All @@ -79,6 +78,48 @@ def test_s3(s3_fixture):
assert s3.isdir("/base")
assert not s3.isdir("/bas")


# TODO: Find out a way to know buffer size used in a BufferedReader
@pytest.mark.parametrize("buffering, reader_type",
[(-1, io.BufferedReader),
(0, _ObjectReader),
(2, io.BufferedReader)])
def test_s3_read(s3_fixture, buffering, reader_type):
with from_url('s3://test-bucket/base',
buffering=buffering,
**s3_fixture.aws_kwargs) as s3:

with s3.open('foo.txt', 'w') as fp:
fp.write('bar')
assert not fp.closed

with s3.open('foo.txt', 'r') as fp:
assert isinstance(fp, io.TextIOWrapper)
assert 'bar' == fp.read()
assert not fp.closed

with s3.open('foo.txt', 'rb') as fp:
assert isinstance(fp, reader_type)
assert b'b' == fp.read(1)
assert b'a' == fp.read(1)
assert b'r' == fp.read(1)
assert b'' == fp.read(1)
assert b'' == fp.read(1)
fp.seek(1)
assert b'a' == fp.read(1)
assert b'r' == fp.read(1)
assert b'' == fp.read(1)
assert not fp.closed


def test_s3_fork(s3_fixture):
with from_url('s3://test-bucket/base',
**s3_fixture.aws_kwargs) as s3:

with s3.open('foo.txt', 'w') as fp:
fp.write('bar')
assert not fp.closed

def f(s3):
try:
s3.open('foo.txt', 'r')
Expand All @@ -96,7 +137,7 @@ def g(s3):
try:
with S3(bucket='test-bucket', **s3_fixture.aws_kwargs) as s4:
with s4.open('base/foo.txt', 'r') as fp:
fp.read()
assert fp.read()
except ForkedError:
pytest.fail('ForkedError')

Expand Down Expand Up @@ -141,13 +182,6 @@ def test_s3_mpu(s3_fixture):
assert "0123456" == data[7:14]


def touch(s3, path, content):
with s3.open(path, 'w') as fp:
fp.write(content)

assert s3.exists(path)


def test_s3_recursive(s3_fixture):
with from_url('s3://test-bucket/base', **s3_fixture.aws_kwargs) as s3:

Expand Down Expand Up @@ -200,8 +234,11 @@ def _seek_check(f):
assert f.tell() == 12, "the position should be kept after an error"


def test_s3_seek(s3_fixture):
with from_url('s3://test-bucket/base', **s3_fixture.aws_kwargs) as s3:
@pytest.mark.parametrize("buffering", [-1, 0])
def test_s3_seek(s3_fixture, buffering):
with from_url('s3://test-bucket/base',
buffering=buffering,
**s3_fixture.aws_kwargs) as s3:

# Make a 10-bytes test data
touch(s3, 'foo.data', '0123456789')
Expand All @@ -221,8 +258,11 @@ def test_s3_seek(s3_fixture):
_seek_check(f)


def test_s3_pickle(s3_fixture):
with from_url('s3://test-bucket/base', **s3_fixture.aws_kwargs) as s3:
@pytest.mark.parametrize("buffering", [-1, 0])
def test_s3_pickle(s3_fixture, buffering):
with from_url('s3://test-bucket/base',
buffering=buffering,
**s3_fixture.aws_kwargs) as s3:

with s3.open('foo.pkl', 'wb') as fp:
pickle.dump({'test': 'data'}, fp)
Expand All @@ -232,15 +272,18 @@ def test_s3_pickle(s3_fixture):
assert pickle.load(f) == {'test': 'data'}


def test_rename(s3_fixture):
with from_url('s3://test-bucket/base', **s3_fixture.aws_kwargs) as s3:
@pytest.mark.parametrize("buffering", [-1, 0])
def test_rename(s3_fixture, buffering):
with from_url('s3://test-bucket/base',
buffering=buffering,
**s3_fixture.aws_kwargs) as s3:

with s3.open('foo.pkl', 'wb') as fp:
pickle.dump({'test': 'data'}, fp)

s3.rename('foo.pkl', 'bar.pkl')

with from_url('s3://test-bucket') as s3:
with from_url('s3://test-bucket', **s3_fixture.aws_kwargs) as s3:
assert not s3.exists('base/foo.pkl')
assert s3.exists('base/bar.pkl')

Expand All @@ -249,9 +292,34 @@ def test_rename(s3_fixture):
assert pickle.load(f) == {'test': 'data'}


@pytest.mark.parametrize("buffering", [-1, 0])
def test_s3_read_and_readall(s3_fixture, buffering):
with from_url('s3://test-bucket/',
buffering=buffering,
**s3_fixture.aws_kwargs) as s3:

# Make a 10-bytes test data
touch(s3, 'foo.data', '0123456789')

with open_url('s3://test-bucket/foo.data', 'rb',
**s3_fixture.aws_kwargs) as f:
assert f.read() == b'0123456789'

f.seek(5, os.SEEK_SET)
assert f.read() == b'56789'

f.seek(5, os.SEEK_SET)
assert f.read(2) == b'56'

f.seek(5, os.SEEK_SET)
assert f.read(1000) == b'56789'

f.seek(5, os.SEEK_SET)
assert f.raw.readall() == b'56789'


def test_remove(s3_fixture):
with from_url('s3://test-bucket/base', **s3_fixture.aws_kwargs) as s3:

with pytest.raises(FileNotFoundError) as err:
s3.remove('non-existent-object')
assert str(err.value) == "No such S3 object: 'non-existent-object'"
Expand Down