diff --git a/chainerui/summary.py b/chainerui/summary.py index f24b89b5..d86403a1 100644 --- a/chainerui/summary.py +++ b/chainerui/summary.py @@ -2,10 +2,11 @@ import datetime import json import os -import shutil import warnings -from chainerui.utils import tempdir +import filelock + +from chainerui.logging import logger CHAINERUI_ASSETS_METAFILE_NAME = '.chainerui_assets' @@ -18,6 +19,7 @@ def __init__(self): self.filename = CHAINERUI_ASSETS_METAFILE_NAME self.default_output_path = '' self.out = None + self.saved_idx = 0 def add(self, value): self.cache.append(value) @@ -33,13 +35,22 @@ def get_outpath(self, out): return self.default_output_path return self.out - def save(self, out): - with tempdir(prefix='chainerui_', dir=out) as tempd: - path = os.path.join(tempd, self.filename) - with open(path, 'w') as f: - json.dump(self.cache, f, indent=4) + def save(self, out, timeout): + filepath = os.path.join(out, self.filename) + lockpath = filepath + '.lock' - shutil.move(path, os.path.join(out, self.filename)) + try: + with filelock.FileLock(lockpath, timeout=timeout): + saved_assets_list = [] + if os.path.isfile(filepath): + with open(filepath) as f: + saved_assets_list = json.load(f) + saved_assets_list.extend(self.cache[self.saved_idx:]) + with open(filepath, 'w') as f: + json.dump(saved_assets_list, f, indent=4) + self.saved_idx = len(self.cache) + except filelock.Timeout: + logger.error('Process to write a list of assets is timeout') _chainerui_asset_observer = _Summary() @@ -131,7 +142,7 @@ def get_subdir(self, subdir): os.makedirs(out_dir) return out_dir, rel_out_dir - def save(self): + def save(self, timeout): cached = False if self.images: self.value['images'] = self.images @@ -143,7 +154,7 @@ def save(self): return self.value['timestamp'] = datetime.datetime.now().isoformat() _chainerui_asset_observer.add(self.value) - _chainerui_asset_observer.save(self.out) + _chainerui_asset_observer.save(self.out, timeout) def set_out(path): @@ -160,7 +171,7 @@ def set_out(path): @contextlib.contextmanager -def reporter(prefix=None, out=None, subdir='', **kwargs): +def reporter(prefix=None, out=None, subdir='', timeout=5, **kwargs): """Summary media assets to visualize. ``reporter`` function collects media assets by the ``with`` statement and @@ -194,11 +205,11 @@ def reporter(prefix=None, out=None, subdir='', **kwargs): report = _Reporter(prefix, out, subdir, **kwargs) yield report - report.save() + report.save(timeout) def image(images, name=None, ch_axis=1, row=0, mode=None, batched=True, - out=None, subdir='', **kwargs): + out=None, subdir='', timeout=5, **kwargs): """Summary images to visualize. Array of images are converted as image format (PNG format on default), @@ -287,10 +298,11 @@ def image(images, name=None, ch_axis=1, row=0, mode=None, batched=True, value['timestamp'] = created_at.isoformat() value['images'] = {col_name: os.path.join(subdir, filename)} _chainerui_asset_observer.add(value) - _chainerui_asset_observer.save(out_root) + _chainerui_asset_observer.save(out_root, timeout) -def audio(audio, sample_rate, name=None, out=None, subdir='', **kwargs): +def audio(audio, sample_rate, name=None, out=None, subdir='', timeout=5, + **kwargs): """summary audio files to listen on a browser. An sampled array is converted as WAV audio file, saved to output directory, @@ -344,4 +356,4 @@ def audio(audio, sample_rate, name=None, out=None, subdir='', **kwargs): value['timestamp'] = created_at.isoformat() value['audios'] = {col_name: os.path.join(subdir, filename)} _chainerui_asset_observer.add(value) - _chainerui_asset_observer.save(out_root) + _chainerui_asset_observer.save(out_root, timeout) diff --git a/requirements.txt b/requirements.txt index b8dc7527..a5793cf2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,4 @@ sqlalchemy>=1.1.18 alembic>=1.0.0 gevent>=1.2.2 structlog>=18.2.0 +filelock>=3.0.9 diff --git a/tests/test_summary.py b/tests/test_summary.py index b20403c4..4a8387e2 100644 --- a/tests/test_summary.py +++ b/tests/test_summary.py @@ -1,10 +1,12 @@ import json from mock import MagicMock from mock import patch +from multiprocessing import Process import os import unittest import warnings +import filelock import numpy as np import pytest @@ -33,10 +35,11 @@ def clear_cache(): yield summary._chainerui_asset_observer.out = None summary._chainerui_asset_observer.cache = [] + summary._chainerui_asset_observer.saved_idx = 0 @unittest.skipUnless(_image_report_available, 'Image report is not available') -def test_summary_set_out_with_warning_image(func_dir): +def test_summary_image_without_output_path(func_dir): summary._chainerui_asset_observer.default_output_path = func_dir meta_filepath = os.path.join( func_dir, summary.CHAINERUI_ASSETS_METAFILE_NAME) @@ -54,7 +57,7 @@ def test_summary_set_out_with_warning_image(func_dir): @unittest.skipUnless(_image_report_available, 'Image report is not available') -def test_summary_set_out_reporter_image(func_dir): +def test_summary_reporter_image_without_output_path(func_dir): summary._chainerui_asset_observer.default_output_path = func_dir meta_filepath = os.path.join( func_dir, summary.CHAINERUI_ASSETS_METAFILE_NAME) @@ -276,3 +279,62 @@ def test_reporter_audio_unavailable(func_dir): assert not os.path.exists( os.path.join(func_dir, summary.CHAINERUI_ASSETS_METAFILE_NAME)) + + +@unittest.skipUnless(_image_report_available, 'Image report is not available') +def test_summary_called_multiple_script(func_dir): + # This test is not enough to check that _Summary object accepts whether + # called by multiple scripts or not, but it's difficult to test it. + # By increasing lock counter with taking a file lock during this test, + # the meta file is shared by multiple scripts virtually, and check the + # asset list is appended correctly. + meta_filepath = os.path.join( + func_dir, summary.CHAINERUI_ASSETS_METAFILE_NAME) + metalock_filepath = meta_filepath + '.lock' + + img = np.zeros(10*3*5*5, dtype=np.float32).reshape((10, 3, 5, 5)) + summary.image(img, out=func_dir, epoch=10) + assert os.path.exists(meta_filepath) + + try: + p = Process( + target=summary.image, args=(img,), + kwargs={'out': func_dir, 'epoch': 20}) + + with filelock.FileLock(metalock_filepath): + # virtually other script handles the meta file + # save a next image, expected to write after lock file is released + p.start() + # set dummy text, this process means added asset by other process + with open(meta_filepath, 'r') as f: + saved = json.load(f) + assert len(saved) == 1 + with open(meta_filepath, 'w') as f: + saved.append({'dummy': 'text'}) + json.dump(saved, f, indent=4) + finally: + p.join() + + with open(meta_filepath) as f: + saved = json.load(f) + assert len(saved) == 3 + assert saved[1].get('dummy', None) == 'text' + assert saved[2].get('epoch', None) == 20 + + +@unittest.skipUnless(_image_report_available, 'Image report is not available') +def test_summary_timeout(func_dir, caplog): + meta_filepath = os.path.join( + func_dir, summary.CHAINERUI_ASSETS_METAFILE_NAME) + metalock_filepath = meta_filepath + '.lock' + + with filelock.FileLock(metalock_filepath): + img = np.zeros(10*3*5*5, dtype=np.float32).reshape((10, 3, 5, 5)) + with summary.reporter(out=func_dir, timeout=0.1) as r: + # test process has already handled meta file, + # this saving process should be timeout + r.image(img) + + assert len(caplog.records) == 1 + assert 'is timeout' in caplog.records[0].message + assert not os.path.exists(meta_filepath)