Skip to content

Commit

Permalink
Merge branch 'master' into centralised-reg-parse
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie committed Aug 12, 2021
2 parents d1d44c9 + 4686ba6 commit 32f8f64
Show file tree
Hide file tree
Showing 19 changed files with 458 additions and 185 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/bash.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
run: |
docker cp bash:/root/cylc-run .
- name: Upload
- name: Upload artifact
if: failure()
uses: actions/upload-artifact@v2
with:
Expand Down
18 changes: 11 additions & 7 deletions .github/workflows/test_fast.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 2 # required by codecov

- name: Configure Python
uses: actions/setup-python@v2
Expand All @@ -49,17 +47,17 @@ jobs:
- name: Install
run: |
pip install ."[all]"
pip install -e ."[all]"
- name: Configure git # Needed by the odd test
uses: cylc/release-actions/configure-git@v1

- name: style
- name: Style
run: |
flake8
etc/bin/shellchecker
- name: typing
- name: Typing
run: mypy

- name: Doctests
Expand All @@ -74,7 +72,13 @@ jobs:
run: |
pytest --cov --cov-append -n 5 tests/integration
- name: Coverage
- name: Coverage report
run: |
coverage xml
coverage report
bash <(curl -s https://codecov.io/bash)
- name: Codecov upload
uses: codecov/codecov-action@v2
with:
name: '"${{ github.workflow }} ${{ matrix.os }} py-${{ matrix.python-version }}"'
fail_ci_if_error: true
14 changes: 10 additions & 4 deletions .github/workflows/test_functional.yml
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,15 @@ jobs:
find "${TMPDIR:-/tmp}/${USER}/cylctb-"* -type f \
-exec echo '====== {} ======' \; -exec cat '{}' \;
- name: Set Upload Name
- name: Set artifact upload name
if: failure() && steps.test.outcome == 'failure'
id: uploadname
run: |
# artifact name cannot contain '/' characters
CID="$(sed 's|/|-|g' <<< "${{ matrix.name || matrix.chunk }}")"
echo "::set-output name=uploadname::$CID"
- name: Upload
- name: Upload artifact
if: failure() && steps.test.outcome == 'failure'
uses: actions/upload-artifact@v2
with:
Expand Down Expand Up @@ -262,8 +262,14 @@ jobs:
run: |
etc/bin/swarm kill
- name: Coverage
- name: Combine coverage & report
run: |
coverage combine -a
coverage xml
coverage report
bash <(curl -s https://codecov.io/bash)
- name: Codecov upload
uses: codecov/codecov-action@v2
with:
name: '"${{ github.workflow }} ${{ matrix.name }} ${{ matrix.chunk }}"'
fail_ci_if_error: true
9 changes: 8 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,21 @@ erroneous use of both `expr => bar` and `expr => !bar` in the same graph.
infers the latest numbered run of the workflow for most commands (e.g. you can
run `cylc pause foo` instead of having to type out `foo/run3`).

[#4346](https://github.com/cylc/cylc-flow/pull/4346) -
Use natural sort order for the `cylc scan --sort` option.

### Fixes

[#4310](https://github.com/cylc/cylc-flow/pull/4310 -
[#4341](https://github.com/cylc/cylc-flow/pull/4341 -
Remove obsolete Cylc 7 `[scheduling]spawn to max active cycle points` config.

[#4319](https://github.com/cylc/cylc-flow/pull/4319) -
Update cylc reinstall to skip cylc dirs work and share

[#4289](https://github.com/cylc/cylc-flow/pull/4289) - Make `cylc clean`
safer by preventing cleaning of dirs that contain more than one workflow
run dir (use `--force` to override this safeguard).

-------------------------------------------------------------------------------
## __cylc-8.0b2 (<span actions:bind='release-date'>Released 2021-07-28</span>)__

Expand Down
5 changes: 3 additions & 2 deletions cylc/flow/async_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import asyncio
from functools import partial
from pathlib import Path
from typing import List, Union

import pyuv

Expand Down Expand Up @@ -401,9 +402,9 @@ def _scandir(future, path, request):
])


async def scandir(path):
async def scandir(path: Union[Path, str]) -> List[Path]:
"""Asynchronous directory listing using pyuv."""
ret = asyncio.Future()
ret: asyncio.Future[List[Path]] = asyncio.Future()

loop = pyuv.Loop.default_loop()
pyuv.fs.scandir(loop, str(path), callback=partial(_scandir, ret, path))
Expand Down
11 changes: 8 additions & 3 deletions cylc/flow/loggingutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,21 @@ class CylcLogFormatter(logging.Formatter):
# deamonise script (url, pid) are not wrapped
MAX_WIDTH = 999

def __init__(self, timestamp=True, color=False, max_width=None):
def __init__(
self, timestamp=True, color=False, max_width=None, dev_info=False
):
self.timestamp = None
self.color = None
self.max_width = self.MAX_WIDTH
self.wrapper = None
self.configure(timestamp, color, max_width)
# You may find adding %(filename)s %(lineno)d are useful when debugging
prefix = '%(asctime)s %(levelname)-2s - '
if dev_info is True:
prefix += '[%(module)s:%(lineno)d] - '

logging.Formatter.__init__(
self,
'%(asctime)s %(levelname)-2s - %(message)s',
prefix + '%(message)s',
'%Y-%m-%dT%H:%M:%S%Z')

def configure(self, timestamp=None, color=None, max_width=None):
Expand Down
63 changes: 33 additions & 30 deletions cylc/flow/network/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import asyncio
from pathlib import Path
import re
from typing import AsyncGenerator, Iterable
from typing import AsyncGenerator, Dict, Iterable, List, Optional, Tuple, Union

from pkg_resources import (
parse_requirements,
Expand Down Expand Up @@ -85,7 +85,7 @@
WorkflowFiles.Service.DIRNAME,
WorkflowFiles.SUITE_RC, # cylc7 flow definition file name
WorkflowFiles.FLOW_FILE, # cylc8 flow definition file name
'log'
WorkflowFiles.LOG_DIR
}

EXCLUDE_FILES = {
Expand All @@ -94,7 +94,7 @@
}


def dir_is_flow(listing):
def dir_is_flow(listing: Iterable[Path]) -> bool:
"""Return True if a Path contains a flow at the top level.
Args:
Expand All @@ -106,11 +106,8 @@ def dir_is_flow(listing):
bool - True if the listing indicates that this is a flow directory.
"""
listing = {
path.name
for path in listing
}
return bool(FLOW_FILES & listing)
names = {path.name for path in listing}
return bool(FLOW_FILES & names)


@pipe
Expand Down Expand Up @@ -138,20 +135,24 @@ async def scan_multi(


@pipe
async def scan(run_dir=None, scan_dir=None, max_depth=MAX_SCAN_DEPTH):
async def scan(
run_dir: Optional[Path] = None,
scan_dir: Optional[Path] = None,
max_depth: int = MAX_SCAN_DEPTH
) -> AsyncGenerator[Dict[str, Union[str, Path]], None]:
"""List flows installed on the filesystem.
Args:
run_dir (pathlib.Path):
run_dir:
The run dir to look for workflows in, defaults to ~/cylc-run.
All workflow registrations will be given relative to this path.
scan_dir(pathlib.Path):
scan_dir:
The directory to scan for workflows in.
Use in combination with run_dir if you want to scan a subdir
within the run_dir.
max_depth (int):
max_depth:
The maximum number of levels to descend before bailing.
* ``max_depth=1`` will pick up top-level workflows (e.g. ``foo``).
Expand All @@ -161,26 +162,35 @@ async def scan(run_dir=None, scan_dir=None, max_depth=MAX_SCAN_DEPTH):
dict - Dictionary containing information about the flow.
"""
cylc_run_dir = Path(get_cylc_run_dir())
if not run_dir:
run_dir = Path(get_cylc_run_dir())
run_dir = cylc_run_dir
if not scan_dir:
scan_dir = run_dir

running = []
running: List[asyncio.tasks.Task] = []

# wrapper for scandir to preserve context
async def _scandir(path, depth):
async def _scandir(path: Path, depth: int) -> Tuple[Path, int, List[Path]]:
contents = await scandir(path)
return path, depth, contents

# perform the first directory listing
for subdir in await scandir(scan_dir):
if subdir.is_dir():
running.append(
asyncio.create_task(
_scandir(subdir, 1)
def _scan_subdirs(listing: List[Path], depth: int) -> None:
for subdir in listing:
if subdir.is_dir() and subdir.stem not in EXCLUDE_FILES:
running.append(
asyncio.create_task(
_scandir(subdir, depth + 1)
)
)
)

# perform the first directory listing
scan_dir_listing = await scandir(scan_dir)
if scan_dir != cylc_run_dir and dir_is_flow(scan_dir_listing):
# If the scan_dir itself is a workflow run dir, yield nothing
return

_scan_subdirs(scan_dir_listing, depth=0)

# perform all further directory listings
while running:
Expand All @@ -200,14 +210,7 @@ async def _scandir(path, depth):
}
elif depth < max_depth:
# we may have a nested flow, lets see...
for subdir in contents:
if (subdir.is_dir()
and subdir.stem not in EXCLUDE_FILES):
running.append(
asyncio.create_task(
_scandir(subdir, depth + 1)
)
)
_scan_subdirs(contents, depth)
# don't allow this to become blocking
await asyncio.sleep(0)

Expand Down
4 changes: 3 additions & 1 deletion cylc/flow/option_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,9 @@ def parse_args(self, api_args, remove_opts=None):
LOG.removeHandler(LOG.handlers[0])
errhandler = logging.StreamHandler(sys.stderr)
errhandler.setFormatter(CylcLogFormatter(
timestamp=options.log_timestamp))
timestamp=options.log_timestamp,
dev_info=bool(options.verbosity > 2)
))
LOG.addHandler(errhandler)

return (options, args)
Expand Down
34 changes: 34 additions & 0 deletions cylc/flow/pathutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,40 @@ def remove_dir_or_file(path: Union[Path, str]) -> None:
rmtree(path, onerror=handle_rmtree_err)


def remove_empty_parents(
path: Union[Path, str], tail: Union[Path, str]
) -> None:
"""Work our way up the tail of path, removing empty dirs only.
Args:
path: Absolute path to the directory, e.g. /foo/bar/a/b/c
tail: The tail of the path to work our way up, e.g. a/b/c
Example:
remove_empty_parents('/foo/bar/a/b/c', 'a/b/c') would remove
/foo/bar/a/b (assuming it's empty), then /foo/bar/a (assuming it's
empty).
"""
path = Path(path)
if not path.is_absolute():
raise ValueError('path must be absolute')
tail = Path(tail)
if tail.is_absolute():
raise ValueError('tail must not be an absolute path')
if not str(path).endswith(str(tail)):
raise ValueError(f"path '{path}' does not end with '{tail}'")
depth = len(tail.parts) - 1
for i in range(depth):
parent = path.parents[i]
if not parent.is_dir():
continue
try:
parent.rmdir()
LOG.debug(f'Removing directory: {parent}')
except OSError:
break


def get_next_rundir_number(run_path):
"""Return the new run number"""
run_n_path = os.path.expanduser(os.path.join(run_path, "runN"))
Expand Down
19 changes: 14 additions & 5 deletions cylc/flow/scripts/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ def get_option_parser():

parser.add_option(
'--rm', metavar='DIR[:DIR:...]',
help="Only clean the specified subdirectories (or files) in the "
"run directory, rather than the whole run directory. "
"Accepts quoted globs.",
help=("Only clean the specified subdirectories (or files) in the "
"run directory, rather than the whole run directory. "
"Accepts quoted globs."),
action='append', dest='rm_dirs', default=[]
)

Expand All @@ -94,10 +94,19 @@ def get_option_parser():
action='store_true', dest='remote_only'
)

parser.add_option(
'--force',
help=("Allow cleaning of directories that contain workflow run dirs "
"(e.g. 'cylc clean foo' when foo contains run1, run2 etc.). "
"Warning: this might lead to remote installations and "
"symlink dir targets not getting removed."),
action='store_true', dest='force'
)

parser.add_option(
'--timeout',
help="The number of seconds to wait for cleaning to take place on "
"remote hosts before cancelling.",
help=("The number of seconds to wait for cleaning to take place on "
"remote hosts before cancelling."),
action='store', default='120', dest='remote_timeout'
)

Expand Down
3 changes: 1 addition & 2 deletions cylc/flow/scripts/report_timings.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ def main(parser: COP, options: 'Values', workflow: str) -> None:

run_db = _get_dao(workflow)
row_buf = format_rows(*run_db.select_task_times())

with smart_open(options.output_filename) as output:
if options.show_raw:
output.write(row_buf.getvalue())
Expand Down Expand Up @@ -218,7 +217,7 @@ def write_summary(self, buf=None):
if buf is None:
buf = sys.stdout
self.write_summary_header(buf)
for group, df in self.by_host_and_runner:
for group, df in self.by_host_and_job_runner:
self.write_group_header(buf, group)
df_reshape = self._reshape_timings(df)
df_describe = df.groupby(level='name').describe()
Expand Down
Loading

0 comments on commit 32f8f64

Please sign in to comment.