Skip to content

Commit

Permalink
fixing merge conflict in history
Browse files Browse the repository at this point in the history
  • Loading branch information
richpsharp committed Aug 25, 2020
2 parents 2921553 + 146be6c commit baabc97
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 1 deletion.
2 changes: 2 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Unrleased Changes
hang on an otherwise ordinary termination.
* Changed logging level to "INFO" on cases where the taskgraph was not
precalculated since it's an expected path of execution in ``TaskGraph``.
* Fixed issue that would cause the logger thread to continue reporting status
after all tasks were complete and the graph was closed.

0.9.1 (2020-06-04)
------------------
Expand Down
13 changes: 12 additions & 1 deletion taskgraph/Task.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ def __init__(self, *args, **kwargs):
super(NonDaemonicPool, self).__init__(*args, **kwargs)


def _null_func():
"""Used when func=None on add_task."""
return None


def _initialize_logging_to_queue(logging_queue):
"""Add a synchronized queue to a new process.
Expand Down Expand Up @@ -656,7 +661,7 @@ def add_task(
if ignore_path_list is None:
ignore_path_list = []
if func is None:
def func(): return None
func = _null_func

# this is a pretty common error to accidentally not pass a
# Task to the dependent task list.
Expand Down Expand Up @@ -835,6 +840,9 @@ def join(self, timeout=None):
LOGGER.info(
"task %s timed out in graph join", task.task_name)
return False
if self._closed and self._logging_queue:
# Close down the taskgraph
self._terminate()
return True
except Exception:
# If there's an exception on a join it means that a task failed
Expand Down Expand Up @@ -867,6 +875,9 @@ def _terminate(self):
return
self._terminated = True

if self._logging_queue:
self._logging_queue.put(None)

for task in self._task_hash_map.values():
LOGGER.debug("setting task done for %s", task.task_name)
task.task_done_executing_event.set()
Expand Down
22 changes: 22 additions & 0 deletions tests/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -1435,6 +1435,28 @@ def test_malformed_taskgraph_database(self):
'taskgraph_data ')
self.assertEqual(len(result), len(expected_column_name_list))

def test_terminate_log(self):
"""TaskGraph: test that the logger thread terminates on .join."""
task_graph = taskgraph.TaskGraph(self.workspace_dir, 1, 5.0)
_ = task_graph.add_task()
task_graph.join()

# logger should not terminate until after join, give it enough time
# to have a chance to close, but not so long the test hangs
task_graph._logging_monitor_thread.join(0.1)
self.assertTrue(task_graph._logging_monitor_thread.is_alive())
task_graph._execution_monitor_thread.join(0.1)
self.assertTrue(task_graph._execution_monitor_thread.is_alive())

task_graph.close()
task_graph.join()

# 5 seconds should be way too much time to expect the thread to join
task_graph._logging_monitor_thread.join(5)
self.assertFalse(task_graph._logging_monitor_thread.is_alive())
task_graph._execution_monitor_thread.join(5)
self.assertFalse(task_graph._execution_monitor_thread.is_alive())


def Fail(n_tries, result_path):
"""Create a function that fails after `n_tries`."""
Expand Down

0 comments on commit baabc97

Please sign in to comment.