Skip to content

Commit

Permalink
removed the installed state
Browse files Browse the repository at this point in the history
* The installed state was the same as "stopped" but with the additional
  information that the workflow had not yet been run.
* Added this information to the workflow status message in cylc-uiserver
  (as it is offline information).
* Improved the workflow status message provided by cylc-flow.
* Put the status message into the UI toolbar.
  • Loading branch information
oliver-sanders authored and hjoliver committed Oct 15, 2021
1 parent 41b0b44 commit b4f6711
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 14 deletions.
58 changes: 53 additions & 5 deletions cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
from functools import partial
from pathlib import Path
import time

from cylc.flow import ID_DELIM
Expand All @@ -45,7 +46,11 @@
EDGES, DATA_TEMPLATE, ALL_DELTAS, DELTAS_MAP, WORKFLOW,
apply_delta, generate_checksum, create_delta_store
)
from cylc.flow.workflow_files import ContactFileFields as CFF
from cylc.flow.workflow_files import (
ContactFileFields as CFF,
WorkflowFiles,
get_workflow_srv_dir,
)
from cylc.flow.workflow_status import WorkflowStatus

from .workflows_mgr import workflow_request
Expand All @@ -70,7 +75,13 @@ def __init__(self, workflows_mgr, log):
self.delta_queues = {}

def update_contact(
self, w_id, contact_data=None, status=None, pruned=False):
self,
w_id,
contact_data=None,
status=None,
status_msg=None,
pruned=False,
):
delta = DELTAS_MAP[ALL_DELTAS]()
delta.workflow.time = time.time()
flow = delta.workflow.updated
Expand All @@ -94,6 +105,8 @@ def update_contact(

if status is not None:
flow.status = status
if status_msg is not None:
flow.status_msg = status_msg
if pruned:
flow.pruned = True
delta.workflow.pruned = w_id
Expand Down Expand Up @@ -138,7 +151,7 @@ async def sync_workflow(self, w_id, contact_data):
)
await self.entire_workflow_update(ids=[w_id])

async def register_workflow(self, w_id):
async def register_workflow(self, w_id: str, is_active: bool) -> None:
self.log.debug(f'register_workflow({w_id})')
self.delta_queues[w_id] = {}

Expand All @@ -147,7 +160,31 @@ async def register_workflow(self, w_id):
self.data[w_id] = data

# create new entry in the delta store
self.update_contact(w_id, status=WorkflowStatus.INSTALLED.value)
self.update_contact(
w_id,
status=WorkflowStatus.STOPPED.value,
status_msg=self.get_status_msg(w_id, is_active),
)

def get_status_msg(self, w_id: str, is_active: bool) -> str:
"""Derive a status message for the workflow.
Running schedulers provide their own status messages.
We must derive a status message for stopped workflows.
"""
if is_active:
# this will get overridden when we sync with the workflow
# set a sensible default here incase the sync takes a while
return 'Running'
reg = w_id.split(ID_DELIM)[-1]
db_file = Path(get_workflow_srv_dir(reg), WorkflowFiles.Service.DB)
if db_file.exists():
# the workflow has previously run
return 'Stopped'
else:
# the workflow has not yet run
return 'Not yet run'

async def unregister_workflow(self, w_id):
self.log.debug(f'unregister_workflow({w_id})')
Expand All @@ -162,7 +199,11 @@ async def unregister_workflow(self, w_id):
def stop_workflow(self, w_id):
self.log.debug(f'stop_workflow({w_id})')
self.purge_workflow(w_id, data=False)
self.update_contact(w_id, status=WorkflowStatus.STOPPED.value)
self.update_contact(
w_id,
status=WorkflowStatus.STOPPED.value,
status_msg=self.get_status_msg(w_id, False),
)

def purge_workflow(self, w_id, data=True):
"""Purge the manager of a workflow's subscription and data."""
Expand Down Expand Up @@ -221,8 +262,15 @@ def update_workflow_data(self, topic, delta, w_id):
loop_cnt += 1
continue
if topic == 'shutdown':
self.log.debug(f'shutdown({w_id})')
self.delta_store_to_queues(w_id, topic, delta)
self.workflows_mgr.stopping.add(w_id)
# update the status to stopped and set the status message
self.update_contact(
w_id,
status=WorkflowStatus.STOPPED.value,
status_msg=self.get_status_msg(w_id, False),
)
return
self.apply_all_delta(w_id, delta)
self.delta_store_to_queues(w_id, topic, delta)
Expand Down
8 changes: 4 additions & 4 deletions cylc/uiserver/tests/test_data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ async def test_register_workflow(
an entry for the workflow in the data store .data map, and another
entry in the data store .delta_queues map."""
w_id = 'user|workflow_id'
await data_store_mgr.register_workflow(w_id=w_id)
await data_store_mgr.register_workflow(w_id=w_id, is_active=False)
assert w_id in data_store_mgr.data
assert w_id in data_store_mgr.delta_queues

Expand All @@ -144,7 +144,7 @@ async def test_update_contact_no_contact_data(
for the workflow in the data store, like the API version set to zero."""
w_id = 'user|workflow_id'
api_version = 0
await data_store_mgr.register_workflow(w_id=w_id)
await data_store_mgr.register_workflow(w_id=w_id, is_active=False)
data_store_mgr.update_contact(w_id=w_id, contact_data=None)
assert api_version == data_store_mgr.data[w_id]['workflow'].api_version

Expand All @@ -157,7 +157,7 @@ async def test_update_contact_with_contact_data(
for the workflow."""
w_id = 'user|workflow_id'
api_version = 1
await data_store_mgr.register_workflow(w_id=w_id)
await data_store_mgr.register_workflow(w_id=w_id, is_active=False)
contact_data = {
'name': 'workflow_id',
'owner': 'cylc',
Expand All @@ -177,7 +177,7 @@ async def test_stop_workflow(
contact with no contact data."""
w_id = 'user|workflow_id'
api_version = 1
await data_store_mgr.register_workflow(w_id=w_id)
await data_store_mgr.register_workflow(w_id=w_id, is_active=False)
contact_data = {
'name': 'workflow_id',
'owner': 'cylc',
Expand Down
2 changes: 1 addition & 1 deletion cylc/uiserver/tests/test_workflows_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ async def test_unregister(
workflow_name = 'unregister-me'
workflow_id = f'{getuser()}{ID_DELIM}{workflow_name}'
uiserver = CylcUIServer()
await uiserver.workflows_mgr._register(workflow_id, None)
await uiserver.workflows_mgr._register(workflow_id, None, is_active=False)

uiserver.workflows_mgr._scan_pipe = empty_aiter()
uiserver.workflows_mgr.inactive.add(workflow_id)
Expand Down
8 changes: 4 additions & 4 deletions cylc/uiserver/workflows_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ async def _workflow_state_changes(self):
for wid in inactive_before - (active | inactive):
yield (wid, 'inactive', None, None)

async def _register(self, wid, flow):
async def _register(self, wid, flow, is_active):
"""Register a new workflow with the data store."""
await self.uiserver.data_store_mgr.register_workflow(wid)
await self.uiserver.data_store_mgr.register_workflow(wid, is_active)

async def _connect(self, wid, flow):
"""Open a connection to a running workflow."""
Expand Down Expand Up @@ -240,11 +240,11 @@ async def update(self):
await self._unregister(wid)

elif before is None and after == 'active':
await self._register(wid, flow)
await self._register(wid, flow, is_active=True)
await self._connect(wid, flow)

elif before is None and after == 'inactive':
await self._register(wid, flow)
await self._register(wid, flow, is_active=False)

elif before == 'inactive' and after == 'active':
await self._connect(wid, flow)
Expand Down

0 comments on commit b4f6711

Please sign in to comment.