diff --git a/HISTORY.rst b/HISTORY.rst index f49787f..7c180f0 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -10,6 +10,14 @@ Unrleased Changes hang on an otherwise ordinary termination. * Changed logging level to "INFO" on cases where the taskgraph was not precalculated since it's an expected path of execution in ``TaskGraph``. +* Adding a ``hardlink_allowed`` parameter to ``add_task`` that allows the + attempt to hardlink a file in a case where a ``copy_artifact=True`` may + permit one. This will save on disk space as well as computation time + if large files are not needed to copy. +* Adding a ``store_result`` flag to ``add_task`` that conditionally stores + the ``func`` result in the database for later ``.get``. This was added to + guard against return types that were not picklable and would otherwise + cause an exception when being executed normally. * Fixed issue that would cause the logger thread to continue reporting status after all tasks were complete and the graph was closed. diff --git a/taskgraph/Task.py b/taskgraph/Task.py index 428119a..8a2fc69 100644 --- a/taskgraph/Task.py +++ b/taskgraph/Task.py @@ -52,29 +52,35 @@ class NoDaemonProcess(multiprocessing.Process): """Make 'daemon' attribute always return False.""" @property def daemon(self): + """Return False indicating not a daemon process.""" return False @daemon.setter def daemon(self, value): + """Do not allow daemon value to be overriden.""" pass -# From https://stackoverflow.com/a/8963618/42897 -# "As the current implementation of multiprocessing [3.7+] has been extensively -# refactored to be based on contexts, we need to provide a NoDaemonContext -# class that has our NoDaemonProcess as attribute. [NonDaemonicPool] will then -# use that context instead of the default one." -# "spawn" is chosen as default since that is the default and only context -# option for Windows and is the default option for Mac OS as well since 3.8. class NoDaemonContext(type(multiprocessing.get_context('spawn'))): + """From https://stackoverflow.com/a/8963618/42897. + + "As the current implementation of multiprocessing [3.7+] has been + extensively refactored to be based on contexts, we need to provide a + NoDaemonContext class that has our NoDaemonProcess as attribute. + [NonDaemonicPool] will then use that context instead of the default + one." "spawn" is chosen as default since that is the default and only + context option for Windows and is the default option for Mac OS as + well since 3.8. + + """ Process = NoDaemonProcess class NonDaemonicPool(multiprocessing.pool.Pool): """NonDaemonic Process Pool.""" - # Invoking super to set the context of Pool class explicitly def __init__(self, *args, **kwargs): + """Invoking super to set the context of Pool class explicitly.""" kwargs['context'] = NoDaemonContext() super(NonDaemonicPool, self).__init__(*args, **kwargs) @@ -96,16 +102,16 @@ def _initialize_logging_to_queue(logging_queue): log records back to the main process. Returns: - ``None`` + None """ root_logger = logging.getLogger() - # By the time this function is called, ``root_logger`` has a copy of all of + # By the time this function is called, `root_logger` has a copy of all of # the logging handlers registered to it within the parent process, which # leads to duplicate logging in some cases. By removing all of the # handlers here, we ensure that log messages can only be passed back to the - # parent process by the ``logging_queue``, where they will be handled. + # parent process by the `logging_queue`, where they will be handled. for handler in root_logger.handlers[:]: root_logger.removeHandler(handler) @@ -233,9 +239,9 @@ def __init__( during task graph execution. If set to 0, don't use subprocesses. If set to <0, use only the main thread for any execution and scheduling. In the case of the latter, - `add_task` will be a blocking call. + ``add_task`` will be a blocking call. reporting_interval (scalar): if not None, report status of task - graph every `reporting_interval` seconds. + graph every ``reporting_interval`` seconds. """ try: @@ -561,25 +567,26 @@ def add_task( hash_target_files=True, dependent_task_list=None, ignore_directories=True, priority=0, hash_algorithm='sizetimestamp', copy_duplicate_artifact=False, - transient_run=False): + hardlink_allowed=False, transient_run=False, store_result=False): """Add a task to the task graph. Args: func (callable): target function - args (list): argument list for `func` - kwargs (dict): keyword arguments for `func` + args (list): argument list for ``func`` + kwargs (dict): keyword arguments for ``func`` target_path_list (list): if not None, a list of file paths that - are expected to be output by `func`. If any of these paths + are expected to be output by ``func``. If any of these paths don't exist, or their timestamp is earlier than an input arg or work token, func will be executed. - If `None`, any identical calls to `add_task` will be skipped - for the TaskGraph object. A future TaskGraph object will - re-run an exact call once for its lifetime. The reasoning is - that it is likely the user wishes to run a target-less task - once for the lifetime of a task-graph, but would otherwise - not have a transient result that could be re-used in a - future instantiation of a TaskGraph object. + If ``None``, any identical calls to ``add_task`` will be + skipped for the TaskGraph object. A future TaskGraph object + will re-run an exact call once for its lifetime. The reasoning + is that it is likely the user wishes to run a target-less task + once for the lifetime of a task-graph, but would otherwise not + have a transient result that could be re-used in a future + instantiation of a TaskGraph object. + task_name (string): if not None, this value is used to identify the task in logging messages. ignore_path_list (list): list of file paths that could be in @@ -590,8 +597,8 @@ def add_task( function is precalculated. If False, this function only notes the existence of the target files before determining if a function call is precalculated. - dependent_task_list (list): list of `Task`s that this task must - `join` before executing. + dependent_task_list (list): list of ``Task``s that this task must + ``join`` before executing. ignore_directories (boolean): if the existence/timestamp of any directories discovered in args or kwargs is used as part of the work token hash. @@ -609,21 +616,26 @@ def add_task( files found in the arguments. This value is used when determining whether a task is precalculated or its target files can be copied to an equivalent task. Note if - `hash_algorithm` is 'sizetimestamp' the task will require the + ``hash_algorithm`` is 'sizetimestamp' the task will require the same base path files to determine equality. If it is a - `hashlib` algorithm only file contents will be considered. + ``hashlib`` algorithm only file contents will be considered. copy_duplicate_artifact (bool): if True and the Tasks' argument signature matches a previous Tasks without direct comparison of the target path files in the arguments other than their positions in the target path list, the target artifacts from a previously successful Task execution will be copied to the new one. + hardlink_allowed (bool): if ``copy_duplicate_artifact`` is True, + this will allow a hardlink rather than a copy when needed. transient_run (bool): if True a call with an identical execution hash will be reexecuted on a subsequent instantiation of a future TaskGraph object. If a duplicate task is submitted to the same object it will not be re-run in any scenario. Otherwise if False, subsequent tasks with an identical execution hash will be skipped. + store_result (bool): If True, the result of ``func`` will be stored + in the TaskGraph database and retrieveable with a call to + ``.get()`` on a ``Task`` object. Returns: Task which was just added to the graph or an existing Task that @@ -633,9 +645,9 @@ def add_task( Raises: ValueError if objects are passed to the dependent task list that are not Tasks. - ValueError if `add_task` is invoked after the `TaskGraph` is + ValueError if ``add_task`` is invoked after the ``TaskGraph`` is closed. - RuntimeError if `add_task` is invoked after `TaskGraph` has + RuntimeError if ``add_task`` is invoked after ``TaskGraph`` has reached a terminate state. """ @@ -677,7 +689,8 @@ def add_task( ignore_path_list, hash_target_files, ignore_directories, transient_run, self._worker_pool, self._taskgraph_cache_dir_path, priority, hash_algorithm, - copy_duplicate_artifact, self._task_database_path) + copy_duplicate_artifact, hardlink_allowed, store_result, + self._task_database_path) self._task_name_map[new_task.task_name] = new_task # it may be this task was already created in an earlier call, @@ -769,11 +782,11 @@ def _handle_logs_from_processes(self, queue_): LOGGER.debug('_handle_logs_from_processes shutting down') def _execution_monitor(self, monitor_wait_event): - """Log state of taskgraph every `self._reporting_interval` seconds. + """Log state of taskgraph every ``self._reporting_interval`` seconds. Args: monitor_wait_event (threading.Event): used to sleep the monitor - for `self._reporting_interval` seconds, or to wake up to + for``self._reporting_interval`` seconds, or to wake up to terminate for shutdown. Returns: @@ -896,16 +909,17 @@ def __init__( self, task_name, func, args, kwargs, target_path_list, ignore_path_list, hash_target_files, ignore_directories, transient_run, worker_pool, cache_dir, priority, hash_algorithm, - copy_duplicate_artifact, task_database_path): + copy_duplicate_artifact, hardlink_allowed, store_result, + task_database_path): """Make a Task. Args: task_name (int): unique task id from the task graph. func (function): a function that takes the argument list - `args` - args (tuple): a list of arguments to pass to `func`. Can be + ``args`` + args (tuple): a list of arguments to pass to ``func``. Can be None. - kwargs (dict): keyword arguments to pass to `func`. Can be + kwargs (dict): keyword arguments to pass to ``func``. Can be None. target_path_list (list): a list of filepaths that this task should generate. @@ -927,7 +941,7 @@ def __init__( Otherwise if False, subsequent tasks with an identical execution hash will be skipped. worker_pool (multiprocessing.Pool): if not None, is a - multiprocessing pool that can be used for `_call` execution. + multiprocessing pool that can be used for ``_call`` execution. cache_dir (string): path to a directory to both write and expect data recorded from a previous Taskgraph run. priority (numeric): the priority of a task is considered when @@ -947,6 +961,12 @@ def __init__( than their positions in the target path list, the target artifacts from a previously successful Task execution will be copied to the new one. + hardlink_allowed (bool): if ``copy_duplicate_artifact`` is True, + this allows taskgraph to create a hardlink rather than make a + direct copy. + store_result (bool): If true, the result of ``func`` will be + stored in the TaskGraph database and retrievable with a call + to ``.get()`` on the Task object. task_database_path (str): path to an SQLITE database that has table named "taskgraph_data" with the three fields: task_hash TEXT NOT NULL, @@ -955,8 +975,8 @@ def __init__( If a call is successful its hash is inserted/updated in the table, the target_path_stats stores the base/target stats for the target files created by the call and listed in - `target_path_list`, and the result of `func` is stored in - "result". + ``target_path_list``, and the result of ``func`` is stored in + ``result``. """ # it is a common error to accidentally pass a non string as to the @@ -986,6 +1006,8 @@ def __init__( self._task_database_path = task_database_path self._hash_algorithm = hash_algorithm self._copy_duplicate_artifact = copy_duplicate_artifact + self._hardlink_allowed = hardlink_allowed + self._store_result = store_result self.exception_object = None # invert the priority since sorting goes smallest to largest and we @@ -1065,7 +1087,7 @@ def __init__( self._task_id_hash = hashlib.sha1( argument_hash_string.encode('utf-8')).hexdigest() - # this will get calculated when `is_precalculated` is invoked. + # this will get calculated when ``is_precalculated`` is invoked. self._task_reexecution_hash = None def __eq__(self, other): @@ -1107,7 +1129,7 @@ def _call(self): Precondition is that the Task dependencies are satisfied. - Sets the `self.task_done_executing_event` flag if execution is + Sets the ``self.task_done_executing_event`` flag if execution is successful. Raises: @@ -1154,8 +1176,24 @@ def _call(self): result_target_path_stats, self._target_path_list): if artifact_target != new_target: - shutil.copyfile( - artifact_target[0], new_target) + target_linked = False + if self._hardlink_allowed: + # some OSes may not allow hardlinks + # but we don't know unless we try + try: + os.link( + artifact_target[0], new_target) + target_linked = True + except Exception: + LOGGER.exception( + f'failed to os.link ' + f'{artifact_target[0]} to ' + f'{new_target}') + # this is the default if either no hardlink + # allowed or a hardlink failed + if not target_linked: + shutil.copyfile( + artifact_target[0], new_target) else: # This is a bug if this ever happens, and # so bad if it does I want to stop and @@ -1179,10 +1217,12 @@ def _call(self): # the following blocks and raises an exception if result # raised an exception LOGGER.debug("apply_async for task %s", self.task_name) - self._result = result.get() + payload = result.get() else: LOGGER.debug("direct _func for task %s", self.task_name) - self._result = self._func(*self._args, **self._kwargs) + payload = self._func(*self._args, **self._kwargs) + if self._store_result: + self._result = payload # check that the target paths exist and record stats for later if not self._hash_target_files: @@ -1261,10 +1301,11 @@ def is_precalculated(self): self._reexecution_info['file_stat_list'] = file_stat_list self._reexecution_info['other_arguments'] = other_arguments - reexecution_string = '%s:%s:%s:%s' % ( + reexecution_string = '%s:%s:%s:%s:%s' % ( self._reexecution_info['func_name'], self._reexecution_info['source_code_hash'], self._reexecution_info['other_arguments'], + self._store_result, # the x[2] is to only take the *hash* part of the 'file_stat' str([x[2] for x in file_stat_list])) @@ -1329,7 +1370,8 @@ def is_precalculated(self): "but there are these mismatches: %s", self.task_name, '\n'.join(mismatched_target_file_list)) return False - self._result = pickle.loads(database_result[1]) + if self._store_result: + self._result = pickle.loads(database_result[1]) LOGGER.debug("precalculated (%s)" % self) return True except EOFError: @@ -1347,11 +1389,11 @@ def join(self, timeout=None): return successful_wait def get(self, timeout=None): - """Return the result of the `func` once it is ready. + """Return the result of the ``func`` once it is ready. - If `timeout` is None, this call blocks until the task is complete - determined by a call to `.join()`. Otherwise will wait up to `timeout` - seconds before raising a `RuntimeError` if exceeded. + If ``timeout`` is None, this call blocks until the task is complete + determined by a call to ``.join()``. Otherwise will wait up to + ``timeout`` seconds before raising a``RuntimeError`` if exceeded. Args: timeout (float): if not None this parameter is a floating point @@ -1359,10 +1401,17 @@ def get(self, timeout=None): Returns: value of the result + Raises: - RuntimeError when `timeout` exceeded. + RuntimeError when ``timeout`` exceeded. + ValueError if ``store_result`` was set to ``False`` when the task + was created. """ + if not self._store_result: + raise ValueError( + 'must set `store_result` to True in `add_task` to invoke this ' + 'function') timeout = not self.join(timeout) if timeout: raise RuntimeError('call to get timed out') @@ -1372,10 +1421,10 @@ def get(self, timeout=None): def _get_file_stats( base_value, hash_algorithm, ignore_list, ignore_directories): - """Return fingerprints of any filepaths in `base_value`. + """Return fingerprints of any filepaths in ``base_value``. Args: - base_value: any python value. Any file paths in `base_value` + base_value: any python value. Any file paths in ``base_value`` should be "os.path.norm"ed before this function is called. contains filepaths in any nested structure. hash_algorithm (string): either a hash function id that @@ -1386,10 +1435,10 @@ def _get_file_stats( files found in the arguments. This value is used when determining whether a task is precalculated or its target files can be copied to an equivalent task. Note if - `hash_algorithm` is 'sizetimestamp' the task will require the + ``hash_algorithm`` is 'sizetimestamp' the task will require the same base path files to determine equality. If it is a - `hashlib` algorithm only file contents will be considered. If this - value is 'exists' the value of the hash will be 'exists'. + ``hashlib`` algorithm only file contents will be considered. If + this value is 'exists' the value of the hash will be 'exists'. ignore_list (list): any paths found in this list are not included as part of the file stats. All paths in this list should be "os.path.norm"ed. @@ -1441,7 +1490,7 @@ def _filter_non_files( """Remove any values that are files not in ignore list or directories. Args: - base_value: any python value. Any file paths in `base_value` + base_value: any python value. Any file paths in ``base_value`` should be "os.path.norm"ed before this function is called. contains filepaths in any nested structure. keep_list (list): any paths found in this list are not filtered. @@ -1451,7 +1500,7 @@ def _filter_non_files( out. Return: - original `base_value` with any nested file paths for files that + original``base_value`` with any nested file paths for files that exist in the os.exists removed. """ @@ -1486,11 +1535,11 @@ def _filter_non_files( def _scrub_task_args(base_value, target_path_list): - """Attempt to convert `base_value` to canonical values. + """Attempt to convert ``base_value`` to canonical values. - Any paths in `base_value` are normalized, any paths that are also in - the `target_path_list` are replaced with a placeholder so that if - all other arguments are the same in `base_value` except target path + Any paths in ``base_value`` are normalized, any paths that are also in + the``target_path_list`` are replaced with a placeholder so that if + all other arguments are the same in ``base_value`` except target path name the function will hash to the same. This function can be called before the Task dependencies are satisfied @@ -1499,11 +1548,11 @@ def _scrub_task_args(base_value, target_path_list): Args: base_value: any python value target_path_list (list): a list of strings that if found in - `base_value` should be replaced with 'in_target_path' so + ``base_value`` should be replaced with 'in_target_path' so Returns: base_value with any functions replaced as strings and paths in - `target_path_list` with a 'target_path_list[n]' placeholder. + ``target_path_list`` with a 'target_path_list[n]' placeholder. """ if callable(base_value): @@ -1544,7 +1593,7 @@ def _scrub_task_args(base_value, target_path_list): def _hash_file(file_path, hash_algorithm, buf_size=2**20): - """Return a hex digest of `file_path`. + """Return a hex digest of ``file_path``. Args: file_path (string): path to file to hash. @@ -1555,12 +1604,12 @@ def _hash_file(file_path, hash_algorithm, buf_size=2**20): 'sizetimestamp' the size and timestamp of the file are returned in a string of the form '[sizeinbytes]:[lastmodifiedtime]'. - buf_size (int): number of bytes to read from `file_path` at a time + buf_size (int): number of bytes to read from ``file_path`` at a time for digesting. Returns: - a hash hex digest computed with hash algorithm `hash_algorithm` - of the binary contents of the file located at `file_path`. + a hash hex digest computed with hash algorithm ``hash_algorithm`` + of the binary contents of the file located at ``file_path``. """ if hash_algorithm == 'sizetimestamp': @@ -1578,7 +1627,7 @@ def _hash_file(file_path, hash_algorithm, buf_size=2**20): def _normalize_path(path): - """Convert `path` into normalized, normcase, absolute filepath.""" + """Convert ``path`` into normalized, normcase, absolute filepath.""" norm_path = os.path.normpath(path) try: abs_path = os.path.abspath(norm_path) @@ -1602,16 +1651,16 @@ def _execute_sqlite( Args: sqlite_command (str): a well formatted SQLite command. database_path (str): path to the SQLite database to operate on. - argument_list (list): `execute == 'execute` then this list is passed to - the internal sqlite3 `execute` call. + argument_list (list): ``execute == 'execute'`` then this list is passed + to the internal sqlite3 ``execute`` call. mode (str): must be either 'read_only' or 'modify'. execute (str): must be either 'execute' or 'script'. - fetch (str): if not `None` can be either 'all' or 'one'. + fetch (str): if not ``None`` can be either 'all' or 'one'. If not None the result of a fetch will be returned by this function. Returns: - result of fetch if `fetch` is not None. + result of fetch if ``fetch`` is not None. """ cursor = None diff --git a/tests/test_task.py b/tests/test_task.py index 088ed98..47e28e1 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -35,12 +35,12 @@ def _noop_function(**kwargs): def _long_running_function(delay): - """Wait for `delay` seconds.""" + """Wait for ``delay`` seconds.""" time.sleep(delay) def _create_two_files_on_disk(value, target_a_path, target_b_path): - """Create two files and write `value` and append if possible.""" + """Create two files and write ``value`` and append if possible.""" with open(target_a_path, 'a') as a_file: a_file.write(value) @@ -57,18 +57,18 @@ def _merge_and_append_files(base_a_path, base_b_path, target_path): def _create_list_on_disk(value, length, target_path=None): - """Create a numpy array on disk filled with value of `size`.""" + """Create a numpy array on disk filled with value of ``size``.""" target_list = [value] * length pickle.dump(target_list, open(target_path, 'wb')) def _call_it(target, *args): - """Invoke `target` with `args`.""" + """Invoke ``target`` with ``args``.""" target(*args) def _append_val(path, *val): - """Append a `val` to file at `path`.""" + """Append a ``val`` to file at ``path``.""" with open(path, 'a') as target_file: for v in val: target_file.write(str(v)) @@ -124,7 +124,7 @@ def _copy_two_files_once(base_path, target_a_path, target_b_path): def _log_from_another_process(logger_name, log_message): """Write a log message to a given logger. - Parameters: + Args: logger_name (string): The string logger name to which ``log_message`` will be logged. log_message (string): The string log message to be logged (at INFO @@ -231,8 +231,8 @@ def test_task_rel_vs_absolute(self): """TaskGraph: test that relative path equates to absolute.""" task_graph = taskgraph.TaskGraph(self.workspace_dir, 0) - target_a_path = os.path.relpath( - os.path.join(self.workspace_dir, 'a.txt'), start=self.workspace_dir) + target_a_path = os.path.relpath(os.path.join( + self.workspace_dir, 'a.txt'), start=self.workspace_dir) target_b_path = os.path.abspath(target_a_path) _ = task_graph.add_task( @@ -845,7 +845,7 @@ def test_repeated_function(self): # this causes the address to change def _append_val(path, *val): - """Append a `val` to file at `path`.""" + """Append a ``val`` to file at ``path``.""" with open(path, 'a') as target_file: for v in val: target_file.write(str(v)) @@ -1030,7 +1030,6 @@ def test_duplicate_call_changed_target(self): result_contents = result_file.read() self.assertEqual('testupdated', result_contents) - def test_duplicate_call_modify_timestamp(self): """TaskGraph: test that duplicate call modified stamp recompute.""" task_graph = taskgraph.TaskGraph(self.workspace_dir, 0) @@ -1158,6 +1157,8 @@ def test_duplicate_but_different_target(self): target_b_path = os.path.join(self.workspace_dir, 'testb.txt') target_c_path = os.path.join(self.workspace_dir, 'testc.txt') target_d_path = os.path.join(self.workspace_dir, 'testd.txt') + target_e_path = os.path.join(self.workspace_dir, 'teste.txt') + target_f_path = os.path.join(self.workspace_dir, 'testf.txt') test_string = 'test string' with open(base_path, 'w') as base_file: @@ -1168,18 +1169,32 @@ def test_duplicate_but_different_target(self): args=(base_path, target_a_path, target_b_path), copy_duplicate_artifact=True, hash_algorithm='md5', - target_path_list=[target_a_path, target_b_path]) - # this task should copy b to c but not a to a. + target_path_list=[target_a_path, target_b_path], + task_name='copy file ab') + + # this task should copy a to c and b to d _ = task_graph.add_task( func=_copy_two_files_once, args=(base_path, target_c_path, target_d_path), copy_duplicate_artifact=True, hash_algorithm='md5', - target_path_list=[target_c_path, target_d_path]) + target_path_list=[target_c_path, target_d_path], + task_name='copy file cd') + + # this task should hardlink a to e and b to f if allowed on this OS + _ = task_graph.add_task( + func=_copy_two_files_once, + args=(base_path, target_e_path, target_f_path), + copy_duplicate_artifact=True, + hardlink_allowed=True, + hash_algorithm='md5', + target_path_list=[target_e_path, target_f_path], + task_name='copy file ef') task_graph.close() task_graph.join() - for path in (target_a_path, target_b_path, target_c_path): + for path in (target_a_path, target_b_path, target_c_path, + target_e_path, target_f_path): with open(path, 'r') as target_file: contents = target_file.read() self.assertEqual(contents, test_string) @@ -1334,8 +1349,22 @@ def test_same_timestamp_and_value(self): task_a._task_id_hash, task_b._task_id_hash, "task ids should be different since the filenames are different") + def test_return_value_no_record(self): + """TaskGraph: test ``get`` raises exception if not set to record.""" + task_graph = taskgraph.TaskGraph(self.workspace_dir, -1) + value_task = task_graph.add_task( + func=_noop_function, + store_result=False) + + # get wil raise a ValueError because store_result is not True + with self.assertRaises(ValueError) as cm: + _ = value_task.get() + expected_message = 'must set `store_result` to True in `add_task`' + actual_message = str(cm.exception) + self.assertTrue(expected_message in actual_message, actual_message) + def test_return_value(self): - """TaskGraph: test that `.get` behavior works as expected.""" + """TaskGraph: test that ``.get`` behavior works as expected.""" n_iterations = 3 for iteration_id in range(n_iterations): transient_run = iteration_id == n_iterations-1 @@ -1345,6 +1374,7 @@ def test_return_value(self): value_task = task_graph.add_task( func=_return_value_once, transient_run=transient_run, + store_result=True, args=(expected_value,)) value = value_task.get() self.assertEqual(value, expected_value) @@ -1362,6 +1392,7 @@ def test_return_value(self): value_task = task_graph.add_task( func=_return_value_once, transient_run=True, + store_result=True, args=(expected_value,)) value = value_task.get() self.assertEqual(value, expected_value) @@ -1370,6 +1401,7 @@ def test_return_value(self): value_task = task_graph.add_task( func=_return_value_once, transient_run=True, + store_result=True, args=(expected_value,)) value = value_task.get() @@ -1459,7 +1491,7 @@ def test_terminate_log(self): def Fail(n_tries, result_path): - """Create a function that fails after `n_tries`.""" + """Create a function that fails after ``n_tries``.""" def fail_func(): fail_func._n_tries -= 1 if fail_func._n_tries > 0: