Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raise exception if loading merged schemas with duplicate tags #803

Merged
merged 1 commit into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions hed/errors/error_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def val_error_invalid_char(source_string, char_index):
@hed_tag_error(ValidationErrors.INVALID_TAG_CHARACTER, has_sub_tag=True,
actual_code=ValidationErrors.CHARACTER_INVALID)
def val_error_invalid_tag_character(tag, problem_tag):
return f"Invalid character '{problem_tag}' in {tag}"
return f"Invalid character '{problem_tag}' in tag '{tag}'"


@hed_error(ValidationErrors.TILDES_UNSUPPORTED)
Expand All @@ -48,6 +48,12 @@ def val_error_tildes_not_supported(source_string, char_index):
return f"Tildes not supported. Replace (a ~ b ~ c) with (a, (b, c)). '{character}' at index {char_index}'"


@hed_tag_error(ValidationErrors.CURLY_BRACE_UNSUPPORTED_HERE, has_sub_tag=True,
actual_code=SidecarErrors.SIDECAR_BRACES_INVALID)
def val_error_CURLY_BRACE_UNSUPPORTED_HERE(tag, problem_tag):
return (f"Curly braces are only permitted in sidecars, fully wrapping text in place of a tag. "
f"Invalid character '{problem_tag}' in tag '{tag}'")

@hed_error(ValidationErrors.COMMA_MISSING)
def val_error_comma_missing(tag):
return f"Comma missing after - '{tag}'"
Expand Down Expand Up @@ -227,7 +233,6 @@ def val_warning_capitalization(tag):

@hed_tag_error(ValidationErrors.UNITS_MISSING, default_severity=ErrorSeverity.WARNING)
def val_warning_default_units_used(tag, default_unit):
# todo: add a test case for no default unit.
if default_unit is None:
return f"No unit specified on - '{tag}'. No default unit is specified for type."
return f"No unit specified. Using '{default_unit}' as the default - '{tag}'"
Expand Down Expand Up @@ -383,7 +388,7 @@ def onset_wrong_placeholder(tag, has_placeholder):

@hed_error(ColumnErrors.INVALID_COLUMN_REF, actual_code=SidecarErrors.SIDECAR_BRACES_INVALID)
def invalid_column_ref(bad_ref):
return f"The column '{bad_ref}' is unknown.'"
return f"The column '{bad_ref}' is unknown or does not have HED annotations.'"


@hed_error(ColumnErrors.SELF_COLUMN_REF, actual_code=SidecarErrors.SIDECAR_BRACES_INVALID)
Expand Down
2 changes: 2 additions & 0 deletions hed/errors/error_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class ValidationErrors:

INVALID_TAG_CHARACTER = 'invalidTagCharacter'

CURLY_BRACE_UNSUPPORTED_HERE = "CURLY_BRACE_UNSUPPORTED_HERE"



class SidecarErrors:
Expand Down
2 changes: 2 additions & 0 deletions hed/errors/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class HedExceptions:
SCHEMA_DUPLICATE_LIBRARY = "SCHEMA_LIBRARY_INVALID"
BAD_COLUMN_NAMES = 'BAD_COLUMN_NAMES'

SCHEMA_DUPLICATE_NAMES = "SCHEMA_DUPLICATE_NAMES"


class HedFileError(Exception):
"""Exception raised when a file cannot be parsed due to being malformed, file IO, etc."""
Expand Down
14 changes: 11 additions & 3 deletions hed/schema/hed_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def __init__(self):
A HedSchema can be used for validation, checking tag attributes, parsing tags, etc.
"""
super().__init__()
self._has_duplicate_tags = False
self.header_attributes = {}
self.filename = None
self.prologue = ""
Expand Down Expand Up @@ -305,7 +304,7 @@ def __eq__(self, other):
return False
if self.get_save_header_attributes() != other.get_save_header_attributes():
return False
if self._has_duplicate_tags != other._has_duplicate_tags:
if self.has_duplicates() != other.has_duplicates():
return False
if self.prologue != other.prologue:
return False
Expand Down Expand Up @@ -527,12 +526,21 @@ def _validate_remaining_terms(self, tag, working_tag, prefix_tag_adj, current_sl
raise self._TagIdentifyError(error)
word_start_index += len(name) + 1

def has_duplicates(self):
"""Returns the first duplicate tag/unit/etc if any section has a duplicate name"""
for section in self._sections.values():
has_duplicates = bool(section.duplicate_names)
if has_duplicates:
# Return first entry of dict
return next(iter(section.duplicate_names))

return False

# ===============================================
# Semi-private creation finalizing functions
# ===============================================
def finalize_dictionaries(self):
""" Call to finish loading. """
self._has_duplicate_tags = bool(self.tags.duplicate_names)
self._update_all_entries()

def _update_all_entries(self):
Expand Down
15 changes: 15 additions & 0 deletions hed/schema/hed_schema_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def _load_schema_version(xml_version=None, xml_folder=None):
:raises HedFileError:
- The xml_version is not valid.
- The specified version cannot be found or loaded
- Multiple schemas are being loaded with the same prefix, and they have duplicate tags
- Other fatal errors loading the schema (These are unlikely if you are not editing them locally)
- The prefix is invalid
"""
Expand All @@ -144,8 +145,22 @@ def _load_schema_version(xml_version=None, xml_folder=None):
xml_versions = [""]

first_schema = _load_schema_version_sub(schema_namespace, xml_versions[0], xml_folder=xml_folder)
filenames = [os.path.basename(first_schema.filename)]
for version in xml_versions[1:]:
_load_schema_version_sub(schema_namespace, version, xml_folder=xml_folder, schema=first_schema)

# Detect duplicate errors when merging schemas in the same namespace
current_filename = os.path.basename(first_schema.filename)
duplicate_name = first_schema.has_duplicates()
if duplicate_name:
issues = first_schema.check_compliance(check_for_warnings=False)
filename_string = ",".join(filenames)
msg = (f"A duplicate tag, '{duplicate_name}', was detected in the schema file '{current_filename}'. "
f"Previously loaded schemas include: {filename_string}. "
f"To resolve this, consider prefixing the final schema during load: "
f"custom_prefix:schema_version.")
raise HedFileError(HedExceptions.SCHEMA_DUPLICATE_NAMES, msg, first_schema.filename, issues)
filenames.append(current_filename)
return first_schema


Expand Down
58 changes: 52 additions & 6 deletions hed/validator/tag_util/class_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from hed.errors.error_reporter import ErrorHandler
from hed.errors.error_types import ValidationErrors
from hed.schema.hed_schema_constants import HedKey
import functools


class UnitValueValidator:
Expand All @@ -15,6 +17,7 @@ class UnitValueValidator:

DIGIT_OR_POUND_EXPRESSION = r'^(-?[\d.]+(?:e-?\d+)?|#)$'

VALUE_CLASS_ALLOWED_CACHE=20
def __init__(self, value_validators=None):
""" Validates the unit and value classes on a given tag.

Expand Down Expand Up @@ -80,16 +83,59 @@ def check_tag_value_class_valid(self, original_tag, report_as=None, error_code=N
"""
return self._check_value_class(original_tag, original_tag.extension, report_as, error_code)

# char_sets = {
# "letters": set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"),
# "blank": set(" "),
# "digits": set("0123456789"),
# "alphanumeric": set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
# }
#
# @functools.lru_cache(maxsize=VALUE_CLASS_ALLOWED_CACHE)
# def _get_allowed_characters(self, value_classes):
# # This could be pre-computed
# character_set = set()
# for value_class in value_classes:
# allowed_types = value_class.attributes.get(HedKey.AllowedCharacter, "")
# for single_type in allowed_types.split(","):
# if single_type in self.char_sets:
# character_set.update(self.char_sets[single_type])
# else:
# character_set.add(single_type)
# return character_set

def _get_problem_indexes(self, original_tag, stripped_value):
# Extra +1 for the slash
start_index = original_tag.extension.find(stripped_value) + len(original_tag.org_base_tag) + 1
if start_index == -1:
return []

problem_indexes = [(char, index + start_index) for index, char in enumerate(stripped_value) if char in "{}"]
return problem_indexes
# Partial implementation of allowedCharacter
# allowed_characters = self._get_allowed_characters(original_tag.value_classes.values())
# if allowed_characters:
# # Only test the strippedvalue - otherwise numericClass + unitClass won't validate reasonably.
# indexes = [index for index, char in enumerate(stripped_value) if char not in allowed_characters]
# pass

def _check_value_class(self, original_tag, stripped_value, report_as, error_code=None):
"""Returns any issues found if this is a value tag"""
# todo: This function needs to check for allowed characters, not just {}
validation_issues = []
if original_tag.is_takes_value_tag() and \
not self._validate_value_class_portion(original_tag, stripped_value):
if original_tag.is_takes_value_tag():
report_as = report_as if report_as else original_tag
validation_issues += ErrorHandler.format_error(ValidationErrors.VALUE_INVALID, report_as)
if error_code:
validation_issues += ErrorHandler.format_error(ValidationErrors.VALUE_INVALID,
report_as, actual_error=error_code)
problem_indexes = self._get_problem_indexes(original_tag, stripped_value)
for char, index in problem_indexes:
error_code = ValidationErrors.CURLY_BRACE_UNSUPPORTED_HERE \
if char in "{}" else ValidationErrors.INVALID_TAG_CHARACTER
validation_issues += ErrorHandler.format_error(error_code,
tag=report_as, index_in_tag=index,
index_in_tag_end=index + 1)
if not self._validate_value_class_portion(original_tag, stripped_value):
validation_issues += ErrorHandler.format_error(ValidationErrors.VALUE_INVALID, report_as)
if error_code:
validation_issues += ErrorHandler.format_error(ValidationErrors.VALUE_INVALID,
report_as, actual_error=error_code)
return validation_issues

@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion tests/schema/test_hed_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def test_get_hed_xml_version(self):
self.assertEqual(get_hed_xml_version(self.hed_xml_3g), "8.0.0")

def test_has_duplicate_tags(self):
self.assertFalse(self.hed_schema_3g._has_duplicate_tags)
self.assertFalse(self.hed_schema_3g.has_duplicates())

def test_short_tag_mapping(self):
self.assertEqual(len(self.hed_schema_3g.tags.keys()), 1110)
Expand Down
15 changes: 15 additions & 0 deletions tests/schema/test_hed_schema_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,18 @@ def setUpClass(cls):
cls.saved_cache_folder = hed_cache.HED_CACHE_DIRECTORY
schema.set_cache_directory(cls.hed_cache_dir)

# Copy source as dupe into cache for easily testing dupe detection
cls.dupe_library_name = "testscoredupe_1.1.0"
cls.source_library_name = "score_1.1.0"

for filename in os.listdir(hed_cache.INSTALLED_CACHE_LOCATION):
loaded_schema = schema.load_schema(os.path.join(hed_cache.INSTALLED_CACHE_LOCATION, filename))
loaded_schema.save_as_xml(os.path.join(cls.hed_cache_dir, filename), save_merged=False)
if filename == f"HED_{cls.source_library_name}.xml":
new_filename = f"HED_{cls.dupe_library_name}.xml"
loaded_schema.save_as_xml(os.path.join(cls.hed_cache_dir, new_filename), save_merged=False)



@classmethod
def tearDownClass(cls):
Expand Down Expand Up @@ -241,6 +250,12 @@ def test_load_schema_version_merged(self):
self.assertEqual(schemas3._namespace, "", "load_schema_version has the right version with namespace")
self.assertEqual(len(issues), 11)

def test_load_schema_version_merged_duplicates(self):
ver4 = ["score_1.1.0", "testscoredupe_1.1.0"]
with self.assertRaises(HedFileError) as context:
load_schema_version(ver4)
self.assertEqual(len(context.exception.issues), 597)

def test_load_and_verify_tags(self):
# Load 'testlib' by itself
testlib = load_schema_version('testlib_2.0.0')
Expand Down