Skip to content
This repository has been archived by the owner on Feb 6, 2023. It is now read-only.

Commit

Permalink
Merge pull request #251 from disktnk/feature/248-file-lock
Browse files Browse the repository at this point in the history
Introduce file lock to accept multiple process on assets API
  • Loading branch information
ofk authored Feb 1, 2019
2 parents 8ac3b3d + 0323ca2 commit 538ad0a
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 18 deletions.
44 changes: 28 additions & 16 deletions chainerui/summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import datetime
import json
import os
import shutil
import warnings

from chainerui.utils import tempdir
import filelock

from chainerui.logging import logger


CHAINERUI_ASSETS_METAFILE_NAME = '.chainerui_assets'
Expand All @@ -18,6 +19,7 @@ def __init__(self):
self.filename = CHAINERUI_ASSETS_METAFILE_NAME
self.default_output_path = ''
self.out = None
self.saved_idx = 0

def add(self, value):
self.cache.append(value)
Expand All @@ -33,13 +35,22 @@ def get_outpath(self, out):
return self.default_output_path
return self.out

def save(self, out):
with tempdir(prefix='chainerui_', dir=out) as tempd:
path = os.path.join(tempd, self.filename)
with open(path, 'w') as f:
json.dump(self.cache, f, indent=4)
def save(self, out, timeout):
filepath = os.path.join(out, self.filename)
lockpath = filepath + '.lock'

shutil.move(path, os.path.join(out, self.filename))
try:
with filelock.FileLock(lockpath, timeout=timeout):
saved_assets_list = []
if os.path.isfile(filepath):
with open(filepath) as f:
saved_assets_list = json.load(f)
saved_assets_list.extend(self.cache[self.saved_idx:])
with open(filepath, 'w') as f:
json.dump(saved_assets_list, f, indent=4)
self.saved_idx = len(self.cache)
except filelock.Timeout:
logger.error('Process to write a list of assets is timeout')


_chainerui_asset_observer = _Summary()
Expand Down Expand Up @@ -131,7 +142,7 @@ def get_subdir(self, subdir):
os.makedirs(out_dir)
return out_dir, rel_out_dir

def save(self):
def save(self, timeout):
cached = False
if self.images:
self.value['images'] = self.images
Expand All @@ -143,7 +154,7 @@ def save(self):
return
self.value['timestamp'] = datetime.datetime.now().isoformat()
_chainerui_asset_observer.add(self.value)
_chainerui_asset_observer.save(self.out)
_chainerui_asset_observer.save(self.out, timeout)


def set_out(path):
Expand All @@ -160,7 +171,7 @@ def set_out(path):


@contextlib.contextmanager
def reporter(prefix=None, out=None, subdir='', **kwargs):
def reporter(prefix=None, out=None, subdir='', timeout=5, **kwargs):
"""Summary media assets to visualize.
``reporter`` function collects media assets by the ``with`` statement and
Expand Down Expand Up @@ -194,11 +205,11 @@ def reporter(prefix=None, out=None, subdir='', **kwargs):

report = _Reporter(prefix, out, subdir, **kwargs)
yield report
report.save()
report.save(timeout)


def image(images, name=None, ch_axis=1, row=0, mode=None, batched=True,
out=None, subdir='', **kwargs):
out=None, subdir='', timeout=5, **kwargs):
"""Summary images to visualize.
Array of images are converted as image format (PNG format on default),
Expand Down Expand Up @@ -287,10 +298,11 @@ def image(images, name=None, ch_axis=1, row=0, mode=None, batched=True,
value['timestamp'] = created_at.isoformat()
value['images'] = {col_name: os.path.join(subdir, filename)}
_chainerui_asset_observer.add(value)
_chainerui_asset_observer.save(out_root)
_chainerui_asset_observer.save(out_root, timeout)


def audio(audio, sample_rate, name=None, out=None, subdir='', **kwargs):
def audio(audio, sample_rate, name=None, out=None, subdir='', timeout=5,
**kwargs):
"""summary audio files to listen on a browser.
An sampled array is converted as WAV audio file, saved to output directory,
Expand Down Expand Up @@ -344,4 +356,4 @@ def audio(audio, sample_rate, name=None, out=None, subdir='', **kwargs):
value['timestamp'] = created_at.isoformat()
value['audios'] = {col_name: os.path.join(subdir, filename)}
_chainerui_asset_observer.add(value)
_chainerui_asset_observer.save(out_root)
_chainerui_asset_observer.save(out_root, timeout)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ sqlalchemy>=1.1.18
alembic>=1.0.0
gevent>=1.2.2
structlog>=18.2.0
filelock>=3.0.9
66 changes: 64 additions & 2 deletions tests/test_summary.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import json
from mock import MagicMock
from mock import patch
from multiprocessing import Process
import os
import unittest
import warnings

import filelock
import numpy as np
import pytest

Expand Down Expand Up @@ -33,10 +35,11 @@ def clear_cache():
yield
summary._chainerui_asset_observer.out = None
summary._chainerui_asset_observer.cache = []
summary._chainerui_asset_observer.saved_idx = 0


@unittest.skipUnless(_image_report_available, 'Image report is not available')
def test_summary_set_out_with_warning_image(func_dir):
def test_summary_image_without_output_path(func_dir):
summary._chainerui_asset_observer.default_output_path = func_dir
meta_filepath = os.path.join(
func_dir, summary.CHAINERUI_ASSETS_METAFILE_NAME)
Expand All @@ -54,7 +57,7 @@ def test_summary_set_out_with_warning_image(func_dir):


@unittest.skipUnless(_image_report_available, 'Image report is not available')
def test_summary_set_out_reporter_image(func_dir):
def test_summary_reporter_image_without_output_path(func_dir):
summary._chainerui_asset_observer.default_output_path = func_dir
meta_filepath = os.path.join(
func_dir, summary.CHAINERUI_ASSETS_METAFILE_NAME)
Expand Down Expand Up @@ -276,3 +279,62 @@ def test_reporter_audio_unavailable(func_dir):

assert not os.path.exists(
os.path.join(func_dir, summary.CHAINERUI_ASSETS_METAFILE_NAME))


@unittest.skipUnless(_image_report_available, 'Image report is not available')
def test_summary_called_multiple_script(func_dir):
# This test is not enough to check that _Summary object accepts whether
# called by multiple scripts or not, but it's difficult to test it.
# By increasing lock counter with taking a file lock during this test,
# the meta file is shared by multiple scripts virtually, and check the
# asset list is appended correctly.
meta_filepath = os.path.join(
func_dir, summary.CHAINERUI_ASSETS_METAFILE_NAME)
metalock_filepath = meta_filepath + '.lock'

img = np.zeros(10*3*5*5, dtype=np.float32).reshape((10, 3, 5, 5))
summary.image(img, out=func_dir, epoch=10)
assert os.path.exists(meta_filepath)

try:
p = Process(
target=summary.image, args=(img,),
kwargs={'out': func_dir, 'epoch': 20})

with filelock.FileLock(metalock_filepath):
# virtually other script handles the meta file
# save a next image, expected to write after lock file is released
p.start()
# set dummy text, this process means added asset by other process
with open(meta_filepath, 'r') as f:
saved = json.load(f)
assert len(saved) == 1
with open(meta_filepath, 'w') as f:
saved.append({'dummy': 'text'})
json.dump(saved, f, indent=4)
finally:
p.join()

with open(meta_filepath) as f:
saved = json.load(f)
assert len(saved) == 3
assert saved[1].get('dummy', None) == 'text'
assert saved[2].get('epoch', None) == 20


@unittest.skipUnless(_image_report_available, 'Image report is not available')
def test_summary_timeout(func_dir, caplog):
meta_filepath = os.path.join(
func_dir, summary.CHAINERUI_ASSETS_METAFILE_NAME)
metalock_filepath = meta_filepath + '.lock'

with filelock.FileLock(metalock_filepath):
img = np.zeros(10*3*5*5, dtype=np.float32).reshape((10, 3, 5, 5))
with summary.reporter(out=func_dir, timeout=0.1) as r:
# test process has already handled meta file,
# this saving process should be timeout
r.image(img)

assert len(caplog.records) == 1
assert 'is timeout' in caplog.records[0].message
assert not os.path.exists(meta_filepath)

0 comments on commit 538ad0a

Please sign in to comment.