Skip to content

Commit

Permalink
[fix] bump and add seek for thread-based implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mosquito committed Jun 25, 2017
1 parent a0bd714 commit 7906346
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 67 deletions.
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
env*
.env*
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ python:
- '3.6'
- 'nightly'
install:
- pip install -U cython pytest pytest-cov 'pytest-asyncio<0.6'
- pip install -Ur requirements.txt
- pip install -U pytest pytest-cov 'pytest-asyncio<0.6'
- pip install -Ue .
script:
- py.test --cov=aiofile tests
Expand Down
13 changes: 13 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
linux_wheel:
docker run -it --rm \
-v `pwd`:/app/src:ro \
-v `pwd`/dist:/app/dst \
--entrypoint /bin/bash \
quay.io/pypa/manylinux1_x86_64 \
/app/src/scripts/make-wheels.sh
docker run -it --rm \
-v `pwd`:/app/src:ro \
-v `pwd`/dist:/app/dst \
--entrypoint /bin/bash \
quay.io/pypa/manylinux1_i686 \
/app/src/scripts/make-wheels.sh
2 changes: 1 addition & 1 deletion aiofile/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def mode_to_flags(mode: str):


class AIOFile:
__slots__ = '__fileno', '__fname', '__mode', '__access_mode', '__loop'
__slots__ = '__fileno', '__fname', '__mode', '__access_mode', '__loop',

OPERATION_CLASS = AIOOperation
IO_READ = IO_READ
Expand Down
34 changes: 22 additions & 12 deletions aiofile/thread_aio.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import asyncio
import os
from functools import partial
from collections import defaultdict
from threading import Lock


IO_READ = 0
IO_WRITE = 1
IO_NOP = 2


_LOCKS = defaultdict(Lock)


class ThreadedAIOOperation:
__slots__ = '__fd', '__offset', '__nbytes', '__reqprio', '__opcode', '__buffer', '__loop', '__state'
__slots__ = ('__fd', '__offset', '__nbytes', '__reqprio', '__opcode',
'__buffer', '__loop', '__state', '__lock')

def __init__(self, opcode: int, fd: int, offset: int, nbytes: int, reqprio: int,
loop: asyncio.AbstractEventLoop):
Expand All @@ -25,6 +30,7 @@ def __init__(self, opcode: int, fd: int, offset: int, nbytes: int, reqprio: int,
self.__opcode = opcode
self.__buffer = b''
self.__state = None
self.__lock = _LOCKS[self.__fd]

@property
def buffer(self):
Expand All @@ -34,21 +40,25 @@ def buffer(self):
def buffer(self, data: bytes):
self.__buffer = data

def _execute(self):
with self.__lock:
os.lseek(self.__fd, self.__offset, os.SEEK_SET)

if self.opcode == IO_READ:
return os.read(self.__fd, self.__nbytes)
elif self.opcode == IO_WRITE:
return os.write(self.__fd, self.__buffer)
elif self.opcode == IO_NOP:
return os.fsync(self.__fd)

_LOCKS.pop(self.__fd)

def __iter__(self):
if self.__state is not None:
raise RuntimeError('Invalid state')

self.__state = False

if self.opcode == IO_READ:
operation = partial(os.read, self.__fd, self.__nbytes)
elif self.opcode == IO_WRITE:
operation = partial(os.write, self.__fd, self.__buffer)
elif self.opcode == IO_NOP:
operation = partial(os.fsync, self.__fd)

yield
result = yield from self.__loop.run_in_executor(None, operation)
result = yield from self.__loop.run_in_executor(None, self._execute)
self.__state = True
return result

Expand Down
2 changes: 1 addition & 1 deletion aiofile/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

team_email = 'me@mosquito.su'

version_info = (0, 2, 0)
version_info = (0, 2, 1)

__author__ = ", ".join("{} <{}>".format(*info) for info in author_info)
__version__ = ".".join(map(str, version_info))
20 changes: 20 additions & 0 deletions scripts/make-wheels.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
set -ex

SRC=/app/src
DST=/app/dst

function build_wheel() {
/opt/python/$1/bin/pip install -Ur ${SRC}/requirements.txt
/opt/python/$1/bin/pip wheel ${SRC} -f ${SRC} -w ${DST}
}


build_wheel cp34-cp34m
build_wheel cp35-cp35m
build_wheel cp36-cp36m


cd ${DST}
for f in ./*linux_*; do if [ -f $f ]; then auditwheel repair $f -w . ; rm $f; fi; done
cd -

27 changes: 15 additions & 12 deletions tests/test_aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@
PY35 = sys.version_info >= (3, 5)


AIOFile.OPERATION_CLASS = AIOOperation
AIOFile.IO_READ = IO_READ
AIOFile.IO_NOP = IO_NOP
AIOFile.IO_WRITE = IO_WRITE
def posix_aio_file(name, mode):
AIOFile.OPERATION_CLASS = AIOOperation
AIOFile.IO_READ = IO_READ
AIOFile.IO_NOP = IO_NOP
AIOFile.IO_WRITE = IO_WRITE

return AIOFile(name, mode)


@pytest.mark.asyncio
def test_read(temp_file, uuid):
with open(temp_file, "w") as f:
f.write(uuid)

aio_file = AIOFile(temp_file, 'r')
aio_file = posix_aio_file(temp_file, 'r')

data = yield from aio_file.read()
data = data.decode()
Expand All @@ -29,8 +32,8 @@ def test_read(temp_file, uuid):

@pytest.mark.asyncio
def test_read_write(temp_file, uuid):
r_file = AIOFile(temp_file, 'r')
w_file = AIOFile(temp_file, 'w')
r_file = posix_aio_file(temp_file, 'r')
w_file = posix_aio_file(temp_file, 'w')

yield from w_file.write(uuid)
yield from w_file.fsync()
Expand All @@ -47,7 +50,7 @@ def test_read_offset(temp_file, uuid):
for _ in range(10):
f.write(uuid)

aio_file = AIOFile(temp_file, 'r')
aio_file = posix_aio_file(temp_file, 'r')

data = yield from aio_file.read(
offset=len(uuid),
Expand All @@ -61,8 +64,8 @@ def test_read_offset(temp_file, uuid):

@pytest.mark.asyncio
def test_read_write_offset(temp_file, uuid):
r_file = AIOFile(temp_file, 'r')
w_file = AIOFile(temp_file, 'w')
r_file = posix_aio_file(temp_file, 'r')
w_file = posix_aio_file(temp_file, 'w')

for i in range(10):
yield from w_file.write(uuid, offset=i * len(uuid))
Expand All @@ -81,8 +84,8 @@ def test_read_write_offset(temp_file, uuid):

@pytest.mark.asyncio
def test_reader_writer(temp_file, uuid):
r_file = AIOFile(temp_file, 'r')
w_file = AIOFile(temp_file, 'w')
r_file = posix_aio_file(temp_file, 'r')
w_file = posix_aio_file(temp_file, 'w')

writer = Writer(w_file)

Expand Down
30 changes: 16 additions & 14 deletions tests/test_py35/test_aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@
from .. import *


def posix_aio_file(name, mode):
AIOFile.OPERATION_CLASS = AIOOperation
AIOFile.IO_READ = IO_READ
AIOFile.IO_NOP = IO_NOP
AIOFile.IO_WRITE = IO_WRITE

AIOFile.OPERATION_CLASS = AIOOperation
AIOFile.IO_READ = IO_READ
AIOFile.IO_NOP = IO_NOP
AIOFile.IO_WRITE = IO_WRITE
return AIOFile(name, mode)


@pytest.mark.asyncio
async def test_read(temp_file, uuid):
with open(temp_file, "w") as f:
f.write(uuid)

aio_file = AIOFile(temp_file, 'r')
aio_file = posix_aio_file(temp_file, 'r')

data = await aio_file.read()
data = data.decode()
Expand All @@ -26,8 +28,8 @@ async def test_read(temp_file, uuid):

@pytest.mark.asyncio
async def test_read_write(temp_file, uuid):
r_file = AIOFile(temp_file, 'r')
w_file = AIOFile(temp_file, 'w')
r_file = posix_aio_file(temp_file, 'r')
w_file = posix_aio_file(temp_file, 'w')

await w_file.write(uuid)
await w_file.fsync()
Expand All @@ -44,7 +46,7 @@ async def test_read_offset(temp_file, uuid):
for _ in range(10):
f.write(uuid)

aio_file = AIOFile(temp_file, 'r')
aio_file = posix_aio_file(temp_file, 'r')

data = await aio_file.read(
offset=len(uuid),
Expand All @@ -58,8 +60,8 @@ async def test_read_offset(temp_file, uuid):

@pytest.mark.asyncio
async def test_read_write_offset(temp_file, uuid):
r_file = AIOFile(temp_file, 'r')
w_file = AIOFile(temp_file, 'w')
r_file = posix_aio_file(temp_file, 'r')
w_file = posix_aio_file(temp_file, 'w')

for i in range(10):
await w_file.write(uuid, offset=i * len(uuid))
Expand All @@ -78,8 +80,8 @@ async def test_read_write_offset(temp_file, uuid):

@pytest.mark.asyncio
async def test_reader_writer(temp_file, uuid):
r_file = AIOFile(temp_file, 'r')
w_file = AIOFile(temp_file, 'w')
r_file = posix_aio_file(temp_file, 'r')
w_file = posix_aio_file(temp_file, 'w')

writer = Writer(w_file)

Expand All @@ -99,8 +101,8 @@ async def test_reader_writer(temp_file, uuid):

@pytest.mark.asyncio
async def test_reader_writer(loop, temp_file, uuid):
r_file = AIOFile(temp_file, 'r')
w_file = AIOFile(temp_file, 'w')
r_file = posix_aio_file(temp_file, 'r')
w_file = posix_aio_file(temp_file, 'w')

writer = Writer(w_file)

Expand Down
31 changes: 17 additions & 14 deletions tests/test_py35/test_thread_aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@
from .. import *


AIOFile.OPERATION_CLASS = ThreadedAIOOperation
AIOFile.IO_READ = IO_READ
AIOFile.IO_NOP = IO_NOP
AIOFile.IO_WRITE = IO_WRITE
def thread_aio_file(name, mode):
AIOFile.OPERATION_CLASS = ThreadedAIOOperation
AIOFile.IO_READ = IO_READ
AIOFile.IO_NOP = IO_NOP
AIOFile.IO_WRITE = IO_WRITE

return AIOFile(name, mode)


@pytest.mark.asyncio
async def test_read(temp_file, uuid):
with open(temp_file, "w") as f:
f.write(uuid)

aio_file = AIOFile(temp_file, 'r')
aio_file = thread_aio_file(temp_file, 'r')

data = await aio_file.read()
data = data.decode()
Expand All @@ -25,8 +28,8 @@ async def test_read(temp_file, uuid):

@pytest.mark.asyncio
async def test_read_write(temp_file, uuid):
r_file = AIOFile(temp_file, 'r')
w_file = AIOFile(temp_file, 'w')
r_file = thread_aio_file(temp_file, 'r')
w_file = thread_aio_file(temp_file, 'w')

await w_file.write(uuid)
await w_file.fsync()
Expand All @@ -43,7 +46,7 @@ async def test_read_offset(temp_file, uuid):
for _ in range(10):
f.write(uuid)

aio_file = AIOFile(temp_file, 'r')
aio_file = thread_aio_file(temp_file, 'r')

data = await aio_file.read(
offset=len(uuid),
Expand All @@ -57,8 +60,8 @@ async def test_read_offset(temp_file, uuid):

@pytest.mark.asyncio
async def test_read_write_offset(temp_file, uuid):
r_file = AIOFile(temp_file, 'r')
w_file = AIOFile(temp_file, 'w')
r_file = thread_aio_file(temp_file, 'r')
w_file = thread_aio_file(temp_file, 'w')

for i in range(10):
await w_file.write(uuid, offset=i * len(uuid))
Expand All @@ -77,8 +80,8 @@ async def test_read_write_offset(temp_file, uuid):

@pytest.mark.asyncio
async def test_reader_writer(temp_file, uuid):
r_file = AIOFile(temp_file, 'r')
w_file = AIOFile(temp_file, 'w')
r_file = thread_aio_file(temp_file, 'r')
w_file = thread_aio_file(temp_file, 'w')

writer = Writer(w_file)

Expand All @@ -98,8 +101,8 @@ async def test_reader_writer(temp_file, uuid):

@pytest.mark.asyncio
async def test_reader_writer(loop, temp_file, uuid):
r_file = AIOFile(temp_file, 'r')
w_file = AIOFile(temp_file, 'w')
r_file = thread_aio_file(temp_file, 'r')
w_file = thread_aio_file(temp_file, 'w')

writer = Writer(w_file)

Expand Down
Loading

0 comments on commit 7906346

Please sign in to comment.