Skip to content

Commit

Permalink
Merge branch 'master' into centralised-reg-parse
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie committed Jul 23, 2021
2 parents faa9bcc + 5408474 commit fd1d6cc
Show file tree
Hide file tree
Showing 41 changed files with 1,641 additions and 164 deletions.
31 changes: 28 additions & 3 deletions cylc/flow/cfgspec/globalcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,10 @@
Configuration of the Cylc Scheduler's main loop.
'''):
Conf('plugins', VDR.V_STRING_LIST,
['health check', 'prune flow labels'], desc='''
Configure the default main loop plugins to use when
starting new workflows.
['health check', 'prune flow labels', 'reset bad hosts'],
desc='''
Configure the default main loop plugins to use when
starting new workflows.
''')

with Conf('<plugin name>', desc='''
Expand All @@ -346,6 +347,14 @@
The interval with which this plugin is run.
''')

with Conf('reset bad hosts', meta=MainLoopPlugin, desc='''
Periodically clear the scheduler list of unreachable (bad)
hosts.
'''):
Conf('interval', VDR.V_INTERVAL, DurationFloat(1800), desc='''
How often (in seconds) to run this plugin.
''')

with Conf('logging', desc='''
The workflow event log, held under the workflow run directory, is
maintained as a rolling archive. Logs are rolled over (backed up
Expand Down Expand Up @@ -754,6 +763,22 @@
systems so for safety there is an upper limit on the number
of job submissions which can be batched together.
''')
with Conf('selection'):
Conf(
'method', VDR.V_STRING, default='random',
options=['random', 'definition order'],
desc='''
Host selection method for the platform. Available
options:
- random: Suitable for an identical pool of hosts.
- definition order: Take the first host in the list
unless that host has been unreachable. In many cases
this is likely to cause load imbalances, but might
be appropriate if your hosts were
``main, backup, failsafe``.
'''
)
with Conf('localhost', meta=Platform):
Conf('hosts', VDR.V_STRING_LIST, ['localhost'])

Expand Down
22 changes: 22 additions & 0 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,25 @@ def __str__(self):
for key, value in data.items():
ret += f'\n {key}: {value}'
return ret


class NoHostsError(CylcError):
"""None of the hosts of a given platform were reachable."""
def __init__(self, platform):
self.platform_n = platform['name']
super().__init__(f'Unable to find valid host for {self.platform_n}')


class CylcVersionError(CylcError):
"""Contact file is for a Cylc Version not supported by this script."""
def __init__(self, version=None):
self.version = version

def __str__(self):
if self.version is not None:
return (
f'Installed Cylc {self.version} workflow is not '
'compatible with Cylc 8.'
)
else:
return "Installed workflow is not compatible with Cylc 8."
24 changes: 24 additions & 0 deletions cylc/flow/main_loop/reset_bad_hosts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Reset the list of bad hosts."""

from cylc.flow.main_loop import periodic


@periodic
async def reset_bad_hosts(scheduler, _):
"""Empty bad_hosts."""
scheduler.task_events_mgr.reset_bad_hosts()
9 changes: 8 additions & 1 deletion cylc/flow/network/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from cylc.flow.exceptions import (
ClientError,
CylcError,
CylcVersionError,
ServiceFileError,
WorkflowStopped
)
Expand Down Expand Up @@ -71,6 +72,7 @@ def get_location(workflow: str):
Tuple[str, int, int]: tuple with the host name and port numbers.
Raises:
ClientError: if the workflow is not running.
CylcVersionError: if target is a Cylc 7 (or earlier) workflow.
"""
try:
contact = load_contact_file(workflow)
Expand All @@ -80,7 +82,12 @@ def get_location(workflow: str):
host = contact[ContactFileFields.HOST]
host = get_fqdn_by_host(host)
port = int(contact[ContactFileFields.PORT])
pub_port = int(contact[ContactFileFields.PUBLISH_PORT])
if ContactFileFields.PUBLISH_PORT in contact:
pub_port = int(contact[ContactFileFields.PUBLISH_PORT])
else:
version = (
contact['CYLC_VERSION'] if 'CYLC_VERSION' in contact else None)
raise CylcVersionError(version=version)
return host, port, pub_port


Expand Down
56 changes: 32 additions & 24 deletions cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Tests for the platform lookup.
"""Functions relating to (job) platforms."""

import random
import re
from copy import deepcopy
from typing import (
Any, Dict, Iterable, List, Optional, Tuple, Union, overload)
Any, Dict, Iterable, List, Optional, Tuple, Union, Set, overload
)

from cylc.flow.exceptions import PlatformLookupError
from cylc.flow.exceptions import PlatformLookupError, CylcError, NoHostsError
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.hostuserutil import is_remote_host

Expand All @@ -39,6 +40,11 @@
HOST_REC_COMMAND = re.compile(r'(`|\$\()\s*(.*)\s*([`)])$')
PLATFORM_REC_COMMAND = re.compile(r'(\$\()\s*(.*)\s*([)])$')

HOST_SELECTION_METHODS = {
'definition order': lambda goodhosts: goodhosts[0],
'random': random.choice
}


@overload
def get_platform(
Expand Down Expand Up @@ -382,35 +388,37 @@ def generic_items_match(
return True


def get_host_from_platform(platform, method='random'):
def get_host_from_platform(
platform: Dict[str, Any], bad_hosts: Optional[Set[str]] = None
) -> str:
"""Placeholder for a more sophisticated function which returns a host
given a platform dictionary.
Args:
platform (dict):
A dict representing a platform.
method (str):
Name a function to use when selecting hosts from list provided
by platform.
- 'random' (default): Pick a random host from list
- 'first': Return the first host in the list
platform: A dict representing a platform.
bad_hosts: A set of hosts Cylc knows to be unreachable.
Returns:
hostname (str):
TODO:
Other host selection methods:
- Random Selection with check for host availability
hostname: The name of a host.
"""
if method == 'random':
return random.choice(platform['hosts'])
elif method == 'first': # noqa: SIM106
return platform['hosts'][0]
# Get list of goodhosts:
if bad_hosts:
goodhosts = [i for i in platform['hosts'] if i not in bad_hosts]
else:
raise NotImplementedError(
f'method {method} is not a valid input for get_host_from_platform'
)
goodhosts = platform['hosts']

# Get the selection method
method = platform['selection']['method']
if not goodhosts:
raise NoHostsError(platform)
else:
if method not in HOST_SELECTION_METHODS:
raise CylcError(
f'method \"{method}\" is not a supported host '
'selection method.'
)
else:
return HOST_SELECTION_METHODS[method](goodhosts)


def fail_if_platform_and_host_conflict(task_conf, task_name):
Expand Down
33 changes: 21 additions & 12 deletions cylc/flow/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from subprocess import Popen, PIPE, DEVNULL
import sys
from time import sleep
from typing import Any, Dict, List, Tuple

import cylc.flow.flags
from cylc.flow import __version__ as CYLC_VERSION
Expand Down Expand Up @@ -162,20 +163,26 @@ def get_includes_to_rsync(rsync_includes=None):


def construct_rsync_over_ssh_cmd(
src_path, dst_path, platform, rsync_includes=None):
src_path: str, dst_path: str, platform: Dict[str, Any],
rsync_includes=None, bad_hosts=None
) -> Tuple[List[str], str]:
"""Constructs the rsync command used for remote file installation.
Includes as standard the directories: app, bin, etc, lib; and the server
key, used for ZMQ authentication.
Args:
src_path(string): source path
dst_path(string): path of target
platform(dict)): contains info relating to platform
rsync_includes(list): files and directories to be included in the rsync
src_path: source path
dst_path: path of target
platform: contains info relating to platform
rsync_includes: files and directories to be included in the rsync
Developer Warning:
The Cylc Subprocess Pool method ``rsync_255_fail`` relies on
``rsync_cmd[0] == 'rsync'``. Please check that changes to this funtion
do not break ``rsync_255_fail``.
"""
dst_host = get_host_from_platform(platform)
dst_host = get_host_from_platform(platform, bad_hosts=bad_hosts)
ssh_cmd = platform['ssh command']
rsync_cmd = [
"rsync",
Expand All @@ -202,10 +209,12 @@ def construct_rsync_over_ssh_cmd(
rsync_cmd.append("--exclude=*") # exclude everything else
rsync_cmd.append(f"{src_path}/")
rsync_cmd.append(f"{dst_host}:{dst_path}/")
return rsync_cmd
return rsync_cmd, dst_host


def construct_ssh_cmd(raw_cmd, platform, **kwargs):
def construct_ssh_cmd(
raw_cmd, platform, host, **kwargs
):
"""Build an SSH command for execution on a remote platform.
Constructs the SSH command according to the platform configuration.
Expand All @@ -214,7 +223,7 @@ def construct_ssh_cmd(raw_cmd, platform, **kwargs):
"""
return _construct_ssh_cmd(
raw_cmd,
host=get_host_from_platform(platform),
host=host,
ssh_cmd=platform['ssh command'],
remote_cylc_path=platform['cylc path'],
ssh_login_shell=platform['use login shell'],
Expand Down Expand Up @@ -331,7 +340,7 @@ def _construct_ssh_cmd(
return command


def remote_cylc_cmd(cmd, platform, **kwargs):
def remote_cylc_cmd(cmd, platform, bad_hosts=None, **kwargs):
"""Execute a Cylc command on a remote platform.
Uses the platform configuration to construct the command.
Expand All @@ -340,7 +349,7 @@ def remote_cylc_cmd(cmd, platform, **kwargs):
"""
return _remote_cylc_cmd(
cmd,
host=get_host_from_platform(platform),
host=get_host_from_platform(platform, bad_hosts=bad_hosts),
ssh_cmd=platform['ssh command'],
remote_cylc_path=platform['cylc path'],
ssh_login_shell=platform['use login shell'],
Expand Down
9 changes: 6 additions & 3 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from threading import Barrier
from time import sleep, time
import traceback
from typing import Iterable, NoReturn, Optional, List
from typing import Iterable, NoReturn, Optional, List, Set
from uuid import uuid4
import zmq
from zmq.auth.thread import ThreadAuthenticator
Expand Down Expand Up @@ -180,6 +180,7 @@ class Scheduler:
id: Optional[str] = None # noqa: A003 (instance attr not local)
uuid_str: Optional[str] = None
contact_data: Optional[dict] = None
bad_hosts: Optional[Set[str]] = None

# run options
is_restart: Optional[bool] = None
Expand Down Expand Up @@ -253,6 +254,7 @@ def __init__(self, reg, options):
self._profile_amounts = {}
self._profile_update_times = {}
self.pre_submit_tasks = []
self.bad_hosts: Set[str] = set()

self.restored_stop_task_id = None

Expand Down Expand Up @@ -343,7 +345,6 @@ async def initialise(self):
self, context=self.zmq_context, barrier=self.barrier)
self.publisher = WorkflowPublisher(
self.workflow, context=self.zmq_context, barrier=self.barrier)

self.proc_pool = SubProcPool()
self.command_queue = Queue()
self.message_queue = Queue()
Expand All @@ -368,6 +369,7 @@ async def initialise(self):
self.xtrigger_mgr,
self.data_store_mgr,
self.options.log_timestamp,
self.bad_hosts,
self.reset_inactivity_timer
)
self.task_events_mgr.uuid_str = self.uuid_str
Expand All @@ -377,7 +379,8 @@ async def initialise(self):
self.proc_pool,
self.workflow_db_mgr,
self.task_events_mgr,
self.data_store_mgr
self.data_store_mgr,
self.bad_hosts
)
self.task_job_mgr.task_remote_mgr.uuid_str = self.uuid_str

Expand Down
1 change: 1 addition & 0 deletions cylc/flow/scripts/cat_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ def main(
cmd.append('--remote-arg=%s' % shlex.quote(batchview_cmd))
cmd.append(workflow_name)
is_edit_mode = (mode == 'edit')
# TODO: Add Intelligent Host selection to this
try:
proc = remote_cylc_cmd(
cmd,
Expand Down
12 changes: 10 additions & 2 deletions cylc/flow/scripts/check_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from cylc.flow.cylc_subproc import procopen, PIPE, DEVNULL
from cylc.flow import __version__ as CYLC_VERSION
from cylc.flow.config import WorkflowConfig
from cylc.flow.platforms import get_platform
from cylc.flow.platforms import get_platform, get_host_from_platform
from cylc.flow.remote import construct_ssh_cmd
from cylc.flow.workflow_files import parse_reg
from cylc.flow.templatevars import load_template_vars
Expand Down Expand Up @@ -91,7 +91,15 @@ def main(_, options: 'Values', reg: str) -> None:
versions = {}
for platform_name in sorted(platforms):
platform = get_platform(platform_name)
cmd = construct_ssh_cmd(['version'], platform)
host = get_host_from_platform(
platform,
bad_hosts=None
)
cmd = construct_ssh_cmd(
['version'],
platform,
host
)
if verbose:
print(cmd)
proc = procopen(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE)
Expand Down
Loading

0 comments on commit fd1d6cc

Please sign in to comment.