From 5e63924cfba6029ba63b318e1311124f365359fa Mon Sep 17 00:00:00 2001 From: Daisuke Tanaka Date: Tue, 8 Jan 2019 16:51:21 +0900 Subject: [PATCH 1/5] fix taking file lock to allow to be multiple script usage --- chainerui/summary.py | 29 ++++++++++++++++++++--------- tests/test_summary.py | 5 +++-- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/chainerui/summary.py b/chainerui/summary.py index f24b89b5..e7e8bfd6 100644 --- a/chainerui/summary.py +++ b/chainerui/summary.py @@ -2,10 +2,11 @@ import datetime import json import os -import shutil import warnings -from chainerui.utils import tempdir +import filelock + +from chainerui.logging import logger CHAINERUI_ASSETS_METAFILE_NAME = '.chainerui_assets' @@ -18,6 +19,7 @@ def __init__(self): self.filename = CHAINERUI_ASSETS_METAFILE_NAME self.default_output_path = '' self.out = None + self.saved_idx = 0 def add(self, value): self.cache.append(value) @@ -33,13 +35,22 @@ def get_outpath(self, out): return self.default_output_path return self.out - def save(self, out): - with tempdir(prefix='chainerui_', dir=out) as tempd: - path = os.path.join(tempd, self.filename) - with open(path, 'w') as f: - json.dump(self.cache, f, indent=4) - - shutil.move(path, os.path.join(out, self.filename)) + def save(self, out, timeout=5): + filepath = os.path.join(out, self.filename) + lockpath = filepath + '.lock' + + try: + with filelock.FileLock(lockpath, timeout=timeout): + saved_assets_list = [] + if os.path.isfile(filepath): + with open(filepath) as f: + saved_assets_list = json.load(f) + saved_assets_list.extend(self.cache[self.saved_idx:]) + with open(filepath, 'w') as f: + json.dump(saved_assets_list, f, indent=4) + self.saved_idx = len(self.cache) + except filelock.Timeout: + logger.error('Process to write asset file list is timeout') _chainerui_asset_observer = _Summary() diff --git a/tests/test_summary.py b/tests/test_summary.py index b20403c4..61617060 100644 --- a/tests/test_summary.py +++ b/tests/test_summary.py @@ -33,10 +33,11 @@ def clear_cache(): yield summary._chainerui_asset_observer.out = None summary._chainerui_asset_observer.cache = [] + summary._chainerui_asset_observer.saved_idx = 0 @unittest.skipUnless(_image_report_available, 'Image report is not available') -def test_summary_set_out_with_warning_image(func_dir): +def test_summary_image_without_output_path(func_dir): summary._chainerui_asset_observer.default_output_path = func_dir meta_filepath = os.path.join( func_dir, summary.CHAINERUI_ASSETS_METAFILE_NAME) @@ -54,7 +55,7 @@ def test_summary_set_out_with_warning_image(func_dir): @unittest.skipUnless(_image_report_available, 'Image report is not available') -def test_summary_set_out_reporter_image(func_dir): +def test_summary_reporter_image_without_output_path(func_dir): summary._chainerui_asset_observer.default_output_path = func_dir meta_filepath = os.path.join( func_dir, summary.CHAINERUI_ASSETS_METAFILE_NAME) From cc7e40fcbdeacbec0dd767e7a2f5b19a40690dca Mon Sep 17 00:00:00 2001 From: Daisuke Tanaka Date: Tue, 8 Jan 2019 18:16:52 +0900 Subject: [PATCH 2/5] add test to check accepted multiple script --- tests/test_summary.py | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/tests/test_summary.py b/tests/test_summary.py index 61617060..b20fbfb4 100644 --- a/tests/test_summary.py +++ b/tests/test_summary.py @@ -1,10 +1,12 @@ import json from mock import MagicMock from mock import patch +from multiprocessing import Process import os import unittest import warnings +import filelock import numpy as np import pytest @@ -277,3 +279,44 @@ def test_reporter_audio_unavailable(func_dir): assert not os.path.exists( os.path.join(func_dir, summary.CHAINERUI_ASSETS_METAFILE_NAME)) + + +@unittest.skipUnless(_image_report_available, 'Image report is not available') +def test_summary_called_multiple_script(func_dir): + # This test is not enough to check that _Summary object accepts whether + # called by multiple scripts or not, but it's difficult to test it. + # By increasing lock counter with taking a file lock during this test, + # the meta file is shared by multiple scripts virtually, and check the + # asset list is appended correctly. + meta_filepath = os.path.join( + func_dir, summary.CHAINERUI_ASSETS_METAFILE_NAME) + metalock_filepath = meta_filepath + '.lock' + + img = np.zeros(10*3*5*5, dtype=np.float32).reshape((10, 3, 5, 5)) + summary.image(img, out=func_dir, epoch=10) + assert os.path.exists(meta_filepath) + + try: + p = Process( + target=summary.image, args=(img,), + kwargs={'out': func_dir, 'epoch': 20}) + + with filelock.FileLock(metalock_filepath): + # virtually other script handles the meta file + # save a next image, expected to write after lock file is released + p.start() + # set dummy text, this process means added asset by other process + with open(meta_filepath, 'r') as f: + saved = json.load(f) + assert len(saved) == 1 + with open(meta_filepath, 'w') as f: + saved.append({'dummy': 'text'}) + json.dump(saved, f, indent=4) + finally: + p.join() + + with open(meta_filepath) as f: + saved = json.load(f) + assert len(saved) == 3 + assert saved[1].get('dummy', None) == 'text' + assert saved[2].get('epoch', None) == 20 From 1a9ffa357518513abbe198765260de02e88ba1a1 Mon Sep 17 00:00:00 2001 From: Daisuke Tanaka Date: Tue, 8 Jan 2019 18:31:01 +0900 Subject: [PATCH 3/5] add timeout option as public --- chainerui/summary.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/chainerui/summary.py b/chainerui/summary.py index e7e8bfd6..d86403a1 100644 --- a/chainerui/summary.py +++ b/chainerui/summary.py @@ -35,7 +35,7 @@ def get_outpath(self, out): return self.default_output_path return self.out - def save(self, out, timeout=5): + def save(self, out, timeout): filepath = os.path.join(out, self.filename) lockpath = filepath + '.lock' @@ -50,7 +50,7 @@ def save(self, out, timeout=5): json.dump(saved_assets_list, f, indent=4) self.saved_idx = len(self.cache) except filelock.Timeout: - logger.error('Process to write asset file list is timeout') + logger.error('Process to write a list of assets is timeout') _chainerui_asset_observer = _Summary() @@ -142,7 +142,7 @@ def get_subdir(self, subdir): os.makedirs(out_dir) return out_dir, rel_out_dir - def save(self): + def save(self, timeout): cached = False if self.images: self.value['images'] = self.images @@ -154,7 +154,7 @@ def save(self): return self.value['timestamp'] = datetime.datetime.now().isoformat() _chainerui_asset_observer.add(self.value) - _chainerui_asset_observer.save(self.out) + _chainerui_asset_observer.save(self.out, timeout) def set_out(path): @@ -171,7 +171,7 @@ def set_out(path): @contextlib.contextmanager -def reporter(prefix=None, out=None, subdir='', **kwargs): +def reporter(prefix=None, out=None, subdir='', timeout=5, **kwargs): """Summary media assets to visualize. ``reporter`` function collects media assets by the ``with`` statement and @@ -205,11 +205,11 @@ def reporter(prefix=None, out=None, subdir='', **kwargs): report = _Reporter(prefix, out, subdir, **kwargs) yield report - report.save() + report.save(timeout) def image(images, name=None, ch_axis=1, row=0, mode=None, batched=True, - out=None, subdir='', **kwargs): + out=None, subdir='', timeout=5, **kwargs): """Summary images to visualize. Array of images are converted as image format (PNG format on default), @@ -298,10 +298,11 @@ def image(images, name=None, ch_axis=1, row=0, mode=None, batched=True, value['timestamp'] = created_at.isoformat() value['images'] = {col_name: os.path.join(subdir, filename)} _chainerui_asset_observer.add(value) - _chainerui_asset_observer.save(out_root) + _chainerui_asset_observer.save(out_root, timeout) -def audio(audio, sample_rate, name=None, out=None, subdir='', **kwargs): +def audio(audio, sample_rate, name=None, out=None, subdir='', timeout=5, + **kwargs): """summary audio files to listen on a browser. An sampled array is converted as WAV audio file, saved to output directory, @@ -355,4 +356,4 @@ def audio(audio, sample_rate, name=None, out=None, subdir='', **kwargs): value['timestamp'] = created_at.isoformat() value['audios'] = {col_name: os.path.join(subdir, filename)} _chainerui_asset_observer.add(value) - _chainerui_asset_observer.save(out_root) + _chainerui_asset_observer.save(out_root, timeout) From ce057b39b55f24d3412fac02ed6b0a88e75d5ff6 Mon Sep 17 00:00:00 2001 From: Daisuke Tanaka Date: Tue, 8 Jan 2019 18:31:16 +0900 Subject: [PATCH 4/5] add timeout test --- tests/test_summary.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/test_summary.py b/tests/test_summary.py index b20fbfb4..4a8387e2 100644 --- a/tests/test_summary.py +++ b/tests/test_summary.py @@ -320,3 +320,21 @@ def test_summary_called_multiple_script(func_dir): assert len(saved) == 3 assert saved[1].get('dummy', None) == 'text' assert saved[2].get('epoch', None) == 20 + + +@unittest.skipUnless(_image_report_available, 'Image report is not available') +def test_summary_timeout(func_dir, caplog): + meta_filepath = os.path.join( + func_dir, summary.CHAINERUI_ASSETS_METAFILE_NAME) + metalock_filepath = meta_filepath + '.lock' + + with filelock.FileLock(metalock_filepath): + img = np.zeros(10*3*5*5, dtype=np.float32).reshape((10, 3, 5, 5)) + with summary.reporter(out=func_dir, timeout=0.1) as r: + # test process has already handled meta file, + # this saving process should be timeout + r.image(img) + + assert len(caplog.records) == 1 + assert 'is timeout' in caplog.records[0].message + assert not os.path.exists(meta_filepath) From 07daf9b45cdadab50d8227a151bfe49b8f7b0e9b Mon Sep 17 00:00:00 2001 From: Daisuke Tanaka Date: Tue, 8 Jan 2019 18:40:29 +0900 Subject: [PATCH 5/5] add filelock module --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index b8dc7527..a5793cf2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,4 @@ sqlalchemy>=1.1.18 alembic>=1.0.0 gevent>=1.2.2 structlog>=18.2.0 +filelock>=3.0.9