Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assets 2: Chained assets #436

Merged
merged 11 commits into from
Apr 26, 2020
107 changes: 87 additions & 20 deletions ppb/assetlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
import ppb.events as events
from ppb.systemslib import System

__all__ = 'AbstractAsset', 'Asset', 'AssetLoadingSystem',
__all__ = (
'AssetLoadingSystem',
'AbstractAsset', 'BackgroundMixin', 'ChainingMixin', 'FreeingMixin',
'Asset',
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -77,6 +81,25 @@ def submit(self, fn, *args, _asset=None, **kwargs):

return fut

def gather(self, futures, callback, *pargs, **kwargs):
mock = MockFuture()

def waiter():
done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_EXCEPTION)
for f in done:
exc = f.exception()
if exc is not None:
mock.set_exception(exc)
break
else:
assert not not_done
newfut = self.submit(callback, *pargs, **kwargs)
mock.handoff(newfut)

threading.Thread(target=waiter).start()

return mock

def _finish(self, fut):
asset = fut.__asset()
if asset is not None:
Expand All @@ -98,6 +121,43 @@ def queued_events(self):
_executor = DelayedThreadExecutor()


class MockFuture(concurrent.futures.Future):
"""
Acts as a Future's understudy until the real future is availalble.
"""
_handed_off = False

def handoff(self, fut):
"""
Gives our state to the real future
"""
with self._condition:
if self._handed_off:
raise concurrent.futures.InvalidStateError(f"{self!r} already handed off")
self._handed_off = True

# Add the callbacks
with self._condition:
callbacks, self._done_callbacks = self._done_callbacks, []

for fn in callbacks:
fut.add_done_callback(fn)

# Apply cancellation
if self.cancelled():
fut.cancel()
else:
fut.add_done_callback(self._pass_on_result)

def _pass_on_result(self, fut):
try:
result = fut.result()
except BaseException as exc:
self.set_exception(exc)
else:
self.set_result(result)


class AbstractAsset(abc.ABC):
"""
The asset interface.
Expand All @@ -122,7 +182,7 @@ class BackgroundMixin:
"""
Asset that does stuff in the background.
"""
_fut = None
_future = None

def _start(self):
"""
Expand All @@ -143,7 +203,7 @@ def is_loaded(self):
"""
Returns if the data has been loaded and parsed.
"""
return self._future.done()
return self._future is not None and self._future.done()

def load(self, timeout: float = None):
"""
Expand All @@ -157,6 +217,23 @@ def load(self, timeout: float = None):
return self._future.result(timeout)


class ChainingMixin(BackgroundMixin):
"""
Asset that does stuff in the background, after other assets have loaded.
"""
def _start(self, *assets):
"""
Queue the background stuff to run.

Call at the end of __init__().
"""
self._future = _executor.gather([
asset._future
for asset in assets
if hasattr(asset, '_future')
], self._background, _asset=self)


class FreeingMixin:
"""
Asset that supports freeing
Expand All @@ -174,7 +251,12 @@ def __del__(self):
# NOTE: This isn't super great, but there isn't a better way without
# knowing what we've been mixed with.
if self.is_loaded():
self.free(self.load())
try:
data = self.load()
except BaseException:
pass
else:
self.free(data)


_asset_cache = weakref.WeakValueDictionary()
Expand All @@ -200,7 +282,7 @@ def __init__(self, name):
self._start()

def __repr__(self):
return f"<{type(self).__name__} name={self.name!r}{' loaded' if self.is_loaded() else ''}>"
return f"<{type(self).__name__} name={self.name!r}{' loaded' if self.is_loaded() else ''} at 0x{id(self):x}>"

def _background(self):
# Called in background thread
Expand Down Expand Up @@ -228,21 +310,6 @@ def background_parse(self, data: bytes):
return data


def force_background_thread(func, *pargs, **kwargs):
"""
Calls the given function from not the main thread.

If already not the main thread, calls it syncronously.

If this is the main thread, creates a new thread to call it.
"""
if threading.current_thread() is threading.main_thread():
t = threading.Thread(target=func, args=pargs, kwargs=kwargs, daemon=True)
t.start()
else:
func(*pargs, **kwargs)


class AssetLoadingSystem(System):
"""
Connects the asset system to PPB, managing lifecycles and such.
Expand Down
2 changes: 1 addition & 1 deletion ppb/vfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def exists(filepath):
dirpath = _main_path()
return (dirpath / filename).is_file()
else:
return impres.is_resource(modulename, filepath)
return impres.is_resource(modulename, filename)


def iterdir(modulepath):
Expand Down
65 changes: 64 additions & 1 deletion tests/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from ppb import GameEngine, BaseScene
import ppb.events
import ppb.assetlib
from ppb.assetlib import DelayedThreadExecutor, Asset, AssetLoadingSystem
from ppb.assetlib import (
DelayedThreadExecutor, Asset, AssetLoadingSystem, BackgroundMixin,
ChainingMixin, AbstractAsset,
)
from ppb.testutils import Failer


Expand Down Expand Up @@ -181,3 +184,63 @@ def test_timeout(clean_assets):

with pytest.raises(concurrent.futures.TimeoutError):
a.load(timeout=0.1)


def test_chained(clean_assets):
class Const(BackgroundMixin, AbstractAsset):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of magical, can we get a comment that this is essentially a threaded string.join so folks at least get the intent?

def __init__(self, value):
self.value = value
self._start()

def _background(self):
return self.value

class Concat(ChainingMixin, AbstractAsset):
def __init__(self, delimiter, *values):
self.delimiter = delimiter
self.values = values
self._start(*values)

def _background(self):
return self.delimiter.join(a.load() for a in self.values)

a = Concat(
' ',
Const("spam"), Const("eggs"), Const("foo"), Const("bar"),
)
engine = GameEngine(
AssetTestScene, basic_systems=[AssetLoadingSystem, Failer],
fail=lambda e: False, message=None, run_time=1,
)
with engine:
engine.start()

assert a.load() == "spam eggs foo bar"


def test_chained_big(clean_assets):
class Concat(ChainingMixin, AbstractAsset):
def __init__(self, delimiter, *values):
self.delimiter = delimiter
self.values = values
self._start(*values)

def _background(self):
return self.delimiter.join(a.load() for a in self.values)

a = Concat(
b'\n',
*(
Asset(f"ppb/{fname}")
for fname in ppb.vfs.iterdir('ppb')
if ppb.vfs.exists(f"ppb/{fname}")
)
)
engine = GameEngine(
AssetTestScene, basic_systems=[AssetLoadingSystem, Failer],
fail=lambda e: False, message=None, run_time=1,
)
with engine:
engine.start()

assert a.load(timeout=5)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what this is intended to test? Just that all of these workers are loaded in less than 5 seconds?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's so that if the chaining deadlocks, the test fails instead of pytest just sitting.

(Yes, this happened quite a bit in development.)