Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Add function to enable smart path search #1098 #1103

Merged
merged 22 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 212 additions & 49 deletions mne_bids/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,14 +789,16 @@ def update(self, *, check=None, **kwargs):
raise e
return self

def match(self, check=False):
def match(self, ignore_json=True, check=False):
"""Get a list of all matching paths in the root directory.

Performs a recursive search, starting in ``.root`` (if set), based on
`BIDSPath.entities` object. Ignores ``.json`` files.

Parameters
----------
ignore_json : bool
If ``True``, ignores json files. Defaults to ``True``.
check : bool
If ``True``, only returns paths that conform to BIDS. If ``False``
(default), the ``.check`` attribute of the returned
Expand All @@ -814,45 +816,14 @@ def match(self, check=False):
'BIDS root directory path to `root` via '
'BIDSPath.update().')

# allow searching by datatype
# all other entities are filtered below
if self.datatype is not None:
search_str = f'*/{self.datatype}/*'
else:
search_str = '*.*'
paths = _return_root_paths(self.root, datatype=self.datatype,
ignore_json=ignore_json)

paths = self.root.rglob(search_str)
# Only keep files (not directories), and omit the JSON sidecars.
paths = [p for p in paths
if p.is_file() and p.suffix != '.json']
fnames = _filter_fnames(paths, suffix=self.suffix,
extension=self.extension,
**self.entities)

bids_paths = []
for fname in fnames:
# Form the BIDSPath object.
# To check whether the BIDSPath is conforming to BIDS if
# check=True, we first instantiate without checking and then run
# the check manually, allowing us to be more specific about the
# exception to catch
datatype = _infer_datatype_from_path(fname)
bids_path = get_bids_path_from_fname(fname, check=False)
bids_path.root = self.root
bids_path.datatype = datatype
bids_path.check = True

try:
bids_path._check()
except ValueError:
# path is not BIDS-compatible
if check: # skip!
continue
else:
bids_path.check = False

bids_paths.append(bids_path)

bids_paths = _fnames_to_bidspaths(fnames, self.root, check=check)
return bids_paths

def _check(self):
Expand Down Expand Up @@ -1935,6 +1906,8 @@ def _filter_fnames(fnames, *, subject=None, session=None, task=None,
extension=None):
"""Filter a list of BIDS filenames / paths based on BIDS entity values.

Input can be str or list of str.

Parameters
----------
fnames : iterable of pathlib.Path | iterable of str
Expand All @@ -1944,25 +1917,48 @@ def _filter_fnames(fnames, *, subject=None, session=None, task=None,
list of pathlib.Path

"""
subject = _ensure_tuple(subject)
session = _ensure_tuple(session)
task = _ensure_tuple(task)
acquisition = _ensure_tuple(acquisition)
run = _ensure_tuple(run)
processing = _ensure_tuple(processing)
space = _ensure_tuple(space)
recording = _ensure_tuple(recording)
split = _ensure_tuple(split)
description = _ensure_tuple(description)
suffixe = _ensure_tuple(suffix)
extension = _ensure_tuple(extension)

leading_path_str = r'.*\/?' # nothing or something ending with a `/`
sub_str = f'sub-{subject}' if subject else r'sub-([^_]+)'
ses_str = f'_ses-{session}' if session else r'(|_ses-([^_]+))'
task_str = f'_task-{task}' if task else r'(|_task-([^_]+))'
acq_str = f'_acq-{acquisition}' if acquisition else r'(|_acq-([^_]+))'
run_str = f'_run-{run}' if run else r'(|_run-([^_]+))'
proc_str = f'_proc-{processing}' if processing else r'(|_proc-([^_]+))'
rec_str = f'_rec-{recording}' if recording else r'(|_rec-([^_]+))'
space_str = f'_space-{space}' if space else r'(|_space-([^_]+))'
split_str = f'_split-{split}' if split else r'(|_split-([^_]+))'
desc_str = f'_desc-{description}' if description else r'(|_desc-([^_]+))'
suffix_str = (f'_{suffix}' if suffix
else r'_(' + '|'.join(ALLOWED_FILENAME_SUFFIX) + ')')
ext_str = extension if extension else r'.([^_]+)'
sub_str = (r'sub-(' + '|'.join(subject) + ')'
if subject else r'sub-([^_]+)')
ses_str = (r'_ses-(' + '|'.join(session) + ')'
if session else r'(|_ses-([^_]+))')
task_str = (r'_task-(' + '|'.join(task) + ')'
if task else r'(|_task-([^_]+))')
acq_str = (r'_acq-(' + '|'.join(acquisition) + ')'
if acquisition else r'(|_acq-([^_]+))')
run_str = (r'_run-(' + '|'.join(run) + ')'
if run else r'(|_run-([^_]+))')
proc_str = (r'_proc-(' + '|'.join(processing) + ')'
if processing else r'(|_proc-([^_]+))')
space_str = (r'_space-(' + '|'.join(space) + ')'
if space else r'(|_space-([^_]+))')
rec_str = (r'_rec-(' + '|'.join(recording) + ')'
if recording else r'(|_rec-([^_]+))')
split_str = (r'_split-(' + '|'.join(split) + ')'
if split else r'(|_split-([^_]+))')
desc_str = (r'_desc-(' + '|'.join(description) + ')'
if description else r'(|_desc-([^_]+))')
suffix_str = (r'_(' + '|'.join(suffixe) + ')' if suffix
else (r'_(' + '|'.join(ALLOWED_FILENAME_SUFFIX) + ')'))
ext_str = r'(' + '|'.join(extension) + ')' if extension else r'.([^_]+)'

regexp = (
leading_path_str +
sub_str + ses_str + task_str + acq_str + run_str + proc_str +
rec_str + space_str + split_str + desc_str + suffix_str + ext_str
Copy link
Contributor Author

@moritz-gerster moritz-gerster Nov 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I fixed a small bug. "space" must be searched before "recording", according to their order in the basename.

space_str + rec_str + split_str + desc_str + suffix_str + ext_str
)

# Convert to str so we can apply the regexp ...
Expand All @@ -1974,3 +1970,170 @@ def _filter_fnames(fnames, *, subject=None, session=None, task=None,
# ... and return Paths.
fnames_filtered = [Path(f) for f in fnames_filtered]
return fnames_filtered


def find_matching_paths(root, subjects=None, sessions=None, tasks=None,
acquisitions=None, runs=None, processings=None,
recordings=None, spaces=None, splits=None,
descriptions=None, suffixes=None, extensions=None,
datatypes=None, check=True):
"""Get list of all matching paths for all matching entity values.

Input can be str or list of str. None matches all found values.

Performs a recursive search, starting in ``.root`` (if set), based on
`BIDSPath.entities` object.

Parameters
----------
root : pathlib.Path | str
The root of the BIDS path.
subjects : str | array-like of str | None
The subject ID. Corresponds to "sub".
sessions : str | array-like of str | None
The acquisition session. Corresponds to "ses".
tasks : str | array-like of str | None
The experimental task. Corresponds to "task".
acquisitions: str | array-like of str | None
The acquisition parameters. Corresponds to "acq".
runs : str | array-like of str | None
The run number. Corresponds to "run".
processings : str | array-like of str | None
The processing label. Corresponds to "proc".
recordings : str | array-like of str | None
The recording name. Corresponds to "rec".
spaces : str | array-like of str | None
The coordinate space for anatomical and sensor location
files (e.g., ``*_electrodes.tsv``, ``*_markers.mrk``).
Corresponds to "space".
Note that valid values for ``space`` must come from a list
of BIDS keywords as described in the BIDS specification.
splits : str | array-like of str | None
The split of the continuous recording file for ``.fif`` data.
Corresponds to "split".
descriptions : str | array-like of str | None
This corresponds to the BIDS entity ``desc``. It is used to provide
additional information for derivative data, e.g., preprocessed data
may be assigned ``description='cleaned'``.

.. versionadded:: 0.11
suffixes : str | array-like of str | None
The filename suffix. This is the entity after the
last ``_`` before the extension. E.g., ``'channels'``.
The following filename suffix's are accepted:
'meg', 'markers', 'eeg', 'ieeg', 'T1w',
'participants', 'scans', 'electrodes', 'coordsystem',
'channels', 'events', 'headshape', 'digitizer',
'beh', 'physio', 'stim'
extensions : str | array-like of str | None
The extension of the filename. E.g., ``'.json'``.
datatypes : str | array-like of str | None
The BIDS data type, e.g., ``'anat'``, ``'func'``, ``'eeg'``, ``'meg'``,
``'ieeg'``.
check : bool
If ``True``, only returns paths that conform to BIDS. If ``False``
(default), the ``.check`` attribute of the returned
`mne_bids.BIDSPath` object will be set to ``True`` for paths that
do conform to BIDS, and to ``False`` for those that don't.

Returns
-------
bids_paths : list of mne_bids.BIDSPath
The matching paths.

"""
fpaths = _return_root_paths(root, datatype=datatypes, ignore_json=False)

fpaths_filtered = _filter_fnames(fpaths,
subject=subjects,
session=sessions,
task=tasks,
acquisition=acquisitions,
run=runs,
processing=processings,
recording=recordings,
space=spaces,
split=splits,
description=descriptions,
suffix=suffixes,
extension=extensions)

bids_paths = _fnames_to_bidspaths(fpaths_filtered, root, check=check)
return bids_paths


def _return_root_paths(root, datatype=None, ignore_json=True):
"""Return all paths in root.

Can be filtered by datatype (which is present in the path but not in
the BIDSPath basename). Can also be list of datatypes.
root : pathlib.Path | str
The root of the BIDS path.
datatype : str | array-like of str | None
The BIDS data type, e.g., ``'anat'``, ``'func'``, ``'eeg'``, ``'meg'``,
``'ieeg'``.
"""
root = Path(root) # if root is str

if datatype is not None:
datatype = _ensure_tuple(datatype)
search_str = f'*/{"|".join(datatype)}/*'
else:
search_str = '*.*'

paths = root.rglob(search_str)
# Only keep files (not directories), and omit the JSON sidecars
# if ignore_json is True.
if ignore_json:
paths = [p for p in paths
if p.is_file() and p.suffix != '.json']
else:
paths = [p for p in paths if p.is_file()]

return paths


def _fnames_to_bidspaths(fnames, root, check=False):
"""Make BIDSPaths from file names.

To check whether the BIDSPath is conforming to BIDS if check=True, we
first instantiate without checking and then run the check manually,
allowing us to be more specific about the exception to catch.

Parameters
----------
fnames : list of str
Filenames as list of strings.
root : path-like | None
The root directory of the BIDS dataset.
check : bool
If ``True``, only returns paths that conform to BIDS. If ``False``
(default), the ``.check`` attribute of the returned
`mne_bids.BIDSPath` object will be set to ``True`` for paths that
do conform to BIDS, and to ``False`` for those that don't.

Returns
-------
bids_paths : list of mne_bids.BIDSPath
Bids paths.

"""
bids_paths = []
for fname in fnames:
datatype = _infer_datatype_from_path(fname)
bids_path = get_bids_path_from_fname(fname, check=False)
bids_path.root = root
bids_path.datatype = datatype
bids_path.check = True

try:
bids_path._check()
except ValueError:
# path is not BIDS-compatible
if check: # skip!
continue
else:
bids_path.check = False

bids_paths.append(bids_path)
return bids_paths
66 changes: 64 additions & 2 deletions mne_bids/tests/test_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from mne_bids.path import (_parse_ext, get_entities_from_fname,
_find_best_candidates,
_filter_fnames, search_folder_for_text,
get_bids_path_from_fname)
get_bids_path_from_fname, find_matching_paths)
from mne_bids.config import ALLOWED_PATH_ENTITIES_SHORT

from test_read import _read_raw_fif, warning_str
Expand Down Expand Up @@ -742,7 +742,8 @@ def test_make_filenames():
(dict(suffix='meg'), 4),
(dict(acquisition='lowres'), 1),
(dict(task='test', processing='ica', suffix='eeg'), 2),
(dict(subject='5', task='test', processing='ica', suffix='eeg'), 1)
(dict(subject='5', task='test', processing='ica', suffix='eeg'), 1),
(dict(subject=['01', '02']), 3), # test multiple input
])
def test_filter_fnames(entities, expected_n_matches):
"""Test filtering filenames based on BIDS entities works."""
Expand Down Expand Up @@ -851,6 +852,67 @@ def test_match(return_bids_test_dir):
assert bids_path_01.match(check=False)[0].fpath.name == 'sub-01_foo.eeg'


@testing.requires_testing_data
def test_find_matching_paths(return_bids_test_dir):
"""We test by yielding the same results as BIDSPath.match() which
is extensively tested above."""
bids_root = Path(return_bids_test_dir)

# Check a few exemplary entities
bids_path_01 = BIDSPath(root=bids_root)
paths_match = bids_path_01.match(ignore_json=False)
paths_find = find_matching_paths(bids_root)
assert paths_match == paths_find

# Datatype is important because handled differently
bids_path_01 = BIDSPath(root=bids_root, datatype="meg")
paths_match = bids_path_01.match(ignore_json=False)
paths_find = find_matching_paths(bids_root, datatypes="meg")
assert paths_match == paths_find

bids_path_01 = BIDSPath(root=bids_root, run="02")
paths_match = bids_path_01.match(ignore_json=False)
paths_find = find_matching_paths(bids_root, runs="02")
assert paths_match == paths_find

# Check list of str as input
bids_path_01 = BIDSPath(root=bids_root, extension=".tsv")
bids_path_02 = BIDSPath(root=bids_root, extension=".json")
paths_match1 = bids_path_01.match(ignore_json=False)
paths_match2 = bids_path_02.match(ignore_json=False)
paths_match = paths_match1 + paths_match2
paths_match = sorted([str(f.fpath) for f in paths_match])
paths_find = find_matching_paths(bids_root, extensions=[".tsv", ".json"])
paths_find = sorted([str(f.fpath) for f in paths_find])
assert paths_match == paths_find

# Test ignore_json parameter
bids_path_01 = BIDSPath(root=bids_root)
paths_match = bids_path_01.match(ignore_json=True)
paths_find = find_matching_paths(bids_root,
extensions=[".tsv", ".fif", ".dat"])
assert paths_match == paths_find

# Test `check` parameter
bids_path_01 = _bids_path.copy()
bids_path_01.update(
root=bids_root, session=None, task=None, run=None,
suffix='foo', extension='.eeg', check=False
)
bids_path_01.fpath.touch()
paths_match = bids_path_01.match(check=True)
paths_find = find_matching_paths(bids_root, sessions=None, tasks=None,
runs=None, suffixes='foo',
extensions='.eeg', check=True)
assert paths_match == paths_find

paths_match = bids_path_01.match(check=False)
paths_find = find_matching_paths(bids_root, sessions=None, tasks=None,
runs=None, suffixes='foo',
extensions='.eeg', check=False)
assert paths_match == paths_find


@pytest.mark.filterwarnings(warning_str['meas_date_set_to_none'])
@pytest.mark.filterwarnings(warning_str['channel_unit_changed'])
@testing.requires_testing_data
Expand Down