From 5bdadaf681225c6b302777497b287d09d8c503b9 Mon Sep 17 00:00:00 2001 From: Jamie Bliss Date: Sat, 18 Apr 2020 14:28:52 -0400 Subject: [PATCH 01/10] assetlib.py: Add MockFuture --- ppb/assetlib.py | 56 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/ppb/assetlib.py b/ppb/assetlib.py index 1aac4668..c82652e8 100644 --- a/ppb/assetlib.py +++ b/ppb/assetlib.py @@ -95,6 +95,62 @@ def queued_events(self): _executor = DelayedThreadExecutor() +class MockFuture: + """ + Acts as a Future's understudy until the real future is availalble. + """ + def __init__(self): + self._cancelled = False + self._callbacks = [] + self._real_future = None + self._have_future = threading.Event() + + def cancel(self): + self._cancelled = True + return True + + def cancelled(self): + return self._cancelled + + def running(self): + return False + + def done(self): + return self._cancelled + + def result(self, timeout=None): + # Note that timeout will probably get stretched + self._have_future.wait(timeout) + return self._real_future.result(timeout) + + def exception(self, timeout=None): + # Note that timeout will probably get stretched + self._have_future.wait(timeout) + return self._real_future.exception(timeout) + + def add_done_callback(self, fn): + self._callbacks.append(fn) + + def handoff(self, fut): + """ + Gives our state to the real future + """ + if self._real_future is not None: + raise RuntimeError("MockFuture cannot hand off more than once") + + self._real_future = fut + + # Add the callbacks + for fn in self._callbacks: + fut.add_done_callback(fn) + + # Apply cancellation + if self._cancelled: + fut.cancel() + + self._have_future.set() + + class AbstractAsset(abc.ABC): """ The asset interface. From 574d913ebbac4b6e855765148990ee835d5a625c Mon Sep 17 00:00:00 2001 From: Jamie Bliss Date: Sat, 18 Apr 2020 14:50:56 -0400 Subject: [PATCH 02/10] assetlib.py: Rewrite MockFuture to be based on Future With all the possible threading problems, it was just simpler --- ppb/assetlib.py | 65 +++++++++++++++---------------------------------- 1 file changed, 19 insertions(+), 46 deletions(-) diff --git a/ppb/assetlib.py b/ppb/assetlib.py index c82652e8..76b6e7e6 100644 --- a/ppb/assetlib.py +++ b/ppb/assetlib.py @@ -95,60 +95,33 @@ def queued_events(self): _executor = DelayedThreadExecutor() -class MockFuture: +class MockFuture(concurrent.futures.Future): """ Acts as a Future's understudy until the real future is availalble. """ - def __init__(self): - self._cancelled = False - self._callbacks = [] - self._real_future = None - self._have_future = threading.Event() - - def cancel(self): - self._cancelled = True - return True - - def cancelled(self): - return self._cancelled - - def running(self): - return False - - def done(self): - return self._cancelled - - def result(self, timeout=None): - # Note that timeout will probably get stretched - self._have_future.wait(timeout) - return self._real_future.result(timeout) - - def exception(self, timeout=None): - # Note that timeout will probably get stretched - self._have_future.wait(timeout) - return self._real_future.exception(timeout) - - def add_done_callback(self, fn): - self._callbacks.append(fn) - def handoff(self, fut): """ Gives our state to the real future """ - if self._real_future is not None: - raise RuntimeError("MockFuture cannot hand off more than once") - - self._real_future = fut - - # Add the callbacks - for fn in self._callbacks: - fut.add_done_callback(fn) - - # Apply cancellation - if self._cancelled: - fut.cancel() + with self._condition: + # Add the callbacks + callbacks, self._done_callbacks = self._done_callbacks, [] + for fn in callbacks: + fut.add_done_callback(fn) + + # Apply cancellation + if self.cancelled(): + fut.cancel() + else: + fut.add_done_callback(self._pass_on_result) - self._have_future.set() + def _pass_on_result(self, fut): + try: + result = fut.result() + except BaseException as exc: + self.set_exception(exc) + else: + self.set_result(result) class AbstractAsset(abc.ABC): From d69e22e869d7f9cab17e71fffad094807b4ca186 Mon Sep 17 00:00:00 2001 From: Jamie Bliss Date: Sat, 18 Apr 2020 14:51:19 -0400 Subject: [PATCH 03/10] assetlib.py: Add ChainingMixin --- ppb/assetlib.py | 37 +++++++++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/ppb/assetlib.py b/ppb/assetlib.py index 76b6e7e6..d95f822c 100644 --- a/ppb/assetlib.py +++ b/ppb/assetlib.py @@ -148,8 +148,6 @@ class BackgroundMixin: """ Asset that does stuff in the background. """ - _fut = None - def _start(self): """ Queue the background stuff to run. @@ -182,6 +180,41 @@ def load(self, timeout: float = None): # logger.warning(f"Waited on {self!r} before the engine began") return self._future.result(timeout) + def add_callback(self, func, *pargs, **kwargs): + """ + Add a function to be called when this asset has loaded. + """ + self._future.add_done_callback( + lambda _: _executor.submit(func, self, *pargs, **kwargs) + ) + + +class ChainingMixin(BackgroundMixin): + """ + Asset that does stuff in the background, after other assets have loaded. + """ + def _start(self, *assets): + """ + Queue the background stuff to run. + + Call at the end of __init__(). + """ + self._future = MockFuture() + + for asset in assets: + if hasattr(asset, 'add_callback'): + asset.add_callback(self._check_completed, assets) + + self._check_completed(None, assets) + + def _check_completed(self, _, assets): + if all(a.is_loaded() for a in assets): + # Ok, everything we've ween waiting on is done, start the task and + # do the future handoff + mock, self._future = \ + self._future, _executor.submit(self._background, _asset=self) + mock.handoff(self._future) + class FreeingMixin: """ From 9ee8f63a070193d162807abf5bf812703a46b690 Mon Sep 17 00:00:00 2001 From: Jamie Bliss Date: Sun, 19 Apr 2020 16:21:20 -0400 Subject: [PATCH 04/10] assetlib.py: update __all__ --- ppb/assetlib.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ppb/assetlib.py b/ppb/assetlib.py index d95f822c..9982a9f6 100644 --- a/ppb/assetlib.py +++ b/ppb/assetlib.py @@ -14,7 +14,11 @@ import ppb.events as events from ppb.systemslib import System -__all__ = 'AbstractAsset', 'Asset', 'AssetLoadingSystem', +__all__ = ( + 'AssetLoadingSystem', + 'AbstractAsset', 'BackgroundMixin', 'ChainingMixin', 'FreeingMixin', + 'Asset', +) logger = logging.getLogger(__name__) From de570d8748414c3b560e90235ff4de7cd4133994 Mon Sep 17 00:00:00 2001 From: Jamie Bliss Date: Sun, 19 Apr 2020 16:22:13 -0400 Subject: [PATCH 05/10] assetlib.py: remove unused force_background_thread Using the executor instead --- ppb/assetlib.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/ppb/assetlib.py b/ppb/assetlib.py index 9982a9f6..5db75beb 100644 --- a/ppb/assetlib.py +++ b/ppb/assetlib.py @@ -289,21 +289,6 @@ def background_parse(self, data: bytes): return data -def force_background_thread(func, *pargs, **kwargs): - """ - Calls the given function from not the main thread. - - If already not the main thread, calls it syncronously. - - If this is the main thread, creates a new thread to call it. - """ - if threading.current_thread() is threading.main_thread(): - t = threading.Thread(target=func, args=pargs, kwargs=kwargs, daemon=True) - t.start() - else: - func(*pargs, **kwargs) - - class AssetLoadingSystem(System): def __init__(self, *, engine, **_): super().__init__(**_) From 74011451a9feaf5c6a605e99030582913b4cf84e Mon Sep 17 00:00:00 2001 From: Jamie Bliss Date: Sun, 19 Apr 2020 16:30:14 -0400 Subject: [PATCH 06/10] assets: Add chaining test --- tests/test_assets.py | 38 +++++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/tests/test_assets.py b/tests/test_assets.py index d471ce2a..b084f5f1 100644 --- a/tests/test_assets.py +++ b/tests/test_assets.py @@ -6,7 +6,10 @@ from ppb import GameEngine, BaseScene import ppb.events import ppb.assetlib -from ppb.assetlib import DelayedThreadExecutor, Asset, AssetLoadingSystem +from ppb.assetlib import ( + DelayedThreadExecutor, Asset, AssetLoadingSystem, BackgroundMixin, + ChainingMixin, AbstractAsset, +) from ppb.testutils import Failer @@ -173,3 +176,36 @@ def free(self, obj): del engine, a # Clean up everything that might be holding a reference. gc.collect() assert free_called + + +def test_chained(clean_assets): + class Const(BackgroundMixin, AbstractAsset): + def __init__(self, value): + self.value = value + self._start() + + def _background(self): + return self.value + + class Concat(ChainingMixin, AbstractAsset): + def __init__(self, delimiter, *values): + self.delimiter = delimiter + self.values = values + self._start(*values) + + def _background(self): + return self.delimiter.join(a.load() for a in self.values) + + a = Concat( + ' ', + Const("spam"), Const("eggs"), Const("foo"), Const("bar"), + ) + engine = GameEngine( + AssetTestScene, basic_systems=[AssetLoadingSystem, Failer], + fail=lambda e: False, message=None, run_time=1, + ) + with engine: + engine.start() + + assert a.load() == "spam eggs foo bar" + # At this poiint, background processing should have finished From b7394b4c059546058665b2c038033e04eee66605 Mon Sep 17 00:00:00 2001 From: Jamie Bliss Date: Tue, 21 Apr 2020 13:27:52 -0400 Subject: [PATCH 07/10] assetlib.py: rewrite how chaining works --- ppb/assetlib.py | 75 ++++++++++++++++++++++++++++--------------------- 1 file changed, 43 insertions(+), 32 deletions(-) diff --git a/ppb/assetlib.py b/ppb/assetlib.py index 5db75beb..15e66553 100644 --- a/ppb/assetlib.py +++ b/ppb/assetlib.py @@ -78,6 +78,25 @@ def submit(self, fn, *args, _asset=None, **kwargs): return fut + def gather(self, futures, callback, *pargs, **kwargs): + mock = MockFuture() + + def waiter(): + concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_EXCEPTION) + for f in futures: + if f.done(): + exc = f.exception() + if exc is not None: + mock.set_exception(exc) + break + else: + newfut = self.submit(callback, *pargs, **kwargs) + mock.handoff(newfut) + + threading.Thread(target=waiter).start() + + return mock + def _finish(self, fut): asset = fut.__asset() if asset is not None: @@ -103,21 +122,29 @@ class MockFuture(concurrent.futures.Future): """ Acts as a Future's understudy until the real future is availalble. """ + _handed_off = False + def handoff(self, fut): """ Gives our state to the real future """ with self._condition: + if self._handed_off: + raise concurrent.futures.InvalidStateError(f"{self!r} already handed off") + self._handed_off = True + # Add the callbacks + with self._condition: callbacks, self._done_callbacks = self._done_callbacks, [] - for fn in callbacks: - fut.add_done_callback(fn) - # Apply cancellation - if self.cancelled(): - fut.cancel() - else: - fut.add_done_callback(self._pass_on_result) + for fn in callbacks: + fut.add_done_callback(fn) + + # Apply cancellation + if self.cancelled(): + fut.cancel() + else: + fut.add_done_callback(self._pass_on_result) def _pass_on_result(self, fut): try: @@ -152,6 +179,8 @@ class BackgroundMixin: """ Asset that does stuff in the background. """ + _future = None + def _start(self): """ Queue the background stuff to run. @@ -171,7 +200,7 @@ def is_loaded(self): """ Returns if the data has been loaded and parsed. """ - return self._future.done() + return self._future is not None and self._future.done() def load(self, timeout: float = None): """ @@ -184,14 +213,6 @@ def load(self, timeout: float = None): # logger.warning(f"Waited on {self!r} before the engine began") return self._future.result(timeout) - def add_callback(self, func, *pargs, **kwargs): - """ - Add a function to be called when this asset has loaded. - """ - self._future.add_done_callback( - lambda _: _executor.submit(func, self, *pargs, **kwargs) - ) - class ChainingMixin(BackgroundMixin): """ @@ -203,21 +224,11 @@ def _start(self, *assets): Call at the end of __init__(). """ - self._future = MockFuture() - - for asset in assets: - if hasattr(asset, 'add_callback'): - asset.add_callback(self._check_completed, assets) - - self._check_completed(None, assets) - - def _check_completed(self, _, assets): - if all(a.is_loaded() for a in assets): - # Ok, everything we've ween waiting on is done, start the task and - # do the future handoff - mock, self._future = \ - self._future, _executor.submit(self._background, _asset=self) - mock.handoff(self._future) + self._future = _executor.gather([ + asset._future + for asset in assets + if hasattr(asset, '_future') + ], self._background, _asset=self) class FreeingMixin: @@ -261,7 +272,7 @@ def __init__(self, name): self._start() def __repr__(self): - return f"<{type(self).__name__} name={self.name!r}{' loaded' if self.is_loaded() else ''}>" + return f"<{type(self).__name__} name={self.name!r}{' loaded' if self.is_loaded() else ''} at 0x{id(self):x}>" def _background(self): # Called in background thread From 355057024d8298c37da84cfedc9c50a3686bd414 Mon Sep 17 00:00:00 2001 From: Jamie Bliss Date: Tue, 21 Apr 2020 13:40:27 -0400 Subject: [PATCH 08/10] assetlib.py: Optimize gather a bit --- ppb/assetlib.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/ppb/assetlib.py b/ppb/assetlib.py index 15e66553..9d55c258 100644 --- a/ppb/assetlib.py +++ b/ppb/assetlib.py @@ -82,14 +82,14 @@ def gather(self, futures, callback, *pargs, **kwargs): mock = MockFuture() def waiter(): - concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_EXCEPTION) - for f in futures: - if f.done(): - exc = f.exception() - if exc is not None: - mock.set_exception(exc) - break + done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_EXCEPTION) + for f in done: + exc = f.exception() + if exc is not None: + mock.set_exception(exc) + break else: + assert not not_done newfut = self.submit(callback, *pargs, **kwargs) mock.handoff(newfut) From e5facf20e477e23bbb7f809f29c05ddb21aacd23 Mon Sep 17 00:00:00 2001 From: Jamie Bliss Date: Tue, 21 Apr 2020 13:46:09 -0400 Subject: [PATCH 09/10] tests: Try chaining actual files --- ppb/vfs.py | 2 +- tests/test_assets.py | 29 ++++++++++++++++++++++++++++- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/ppb/vfs.py b/ppb/vfs.py index f76eac0d..f9a1cf64 100644 --- a/ppb/vfs.py +++ b/ppb/vfs.py @@ -91,7 +91,7 @@ def exists(filepath): dirpath = _main_path() return (dirpath / filename).is_file() else: - return impres.is_resource(modulename, filepath) + return impres.is_resource(modulename, filename) def iterdir(modulepath): diff --git a/tests/test_assets.py b/tests/test_assets.py index b084f5f1..7e175461 100644 --- a/tests/test_assets.py +++ b/tests/test_assets.py @@ -208,4 +208,31 @@ def _background(self): engine.start() assert a.load() == "spam eggs foo bar" - # At this poiint, background processing should have finished + + +def test_chained_big(clean_assets): + class Concat(ChainingMixin, AbstractAsset): + def __init__(self, delimiter, *values): + self.delimiter = delimiter + self.values = values + self._start(*values) + + def _background(self): + return self.delimiter.join(a.load() for a in self.values) + + a = Concat( + b'\n', + *( + Asset(f"ppb/{fname}") + for fname in ppb.vfs.iterdir('ppb') + if ppb.vfs.exists(f"ppb/{fname}") + ) + ) + engine = GameEngine( + AssetTestScene, basic_systems=[AssetLoadingSystem, Failer], + fail=lambda e: False, message=None, run_time=1, + ) + with engine: + engine.start() + + assert a.load(timeout=5) From 7546077209c15bb1af1c5c2aeaf7aab03c46fe67 Mon Sep 17 00:00:00 2001 From: Jamie Bliss Date: Tue, 21 Apr 2020 13:47:25 -0400 Subject: [PATCH 10/10] assetlib: FreeingMixin: Handle if the asset has an error --- ppb/assetlib.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/ppb/assetlib.py b/ppb/assetlib.py index 9d55c258..2fc89f9d 100644 --- a/ppb/assetlib.py +++ b/ppb/assetlib.py @@ -246,7 +246,12 @@ def __del__(self): # This should only be called after the background threads and other # processing has finished. if self.is_loaded(): - self.free(self.load()) + try: + data = self.load() + except BaseException: + pass + else: + self.free(data) _asset_cache = weakref.WeakValueDictionary()