-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add generic rule: models should implement uniqueness test for their PK #90
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💪
16bca96
to
8461cf2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Some minor comments
@rule(rule_filters={is_table()}) | ||
def has_uniqueness_test(model: Model) -> RuleViolation | None: | ||
"""Model has uniqueness test for primary key.""" | ||
# ruff: noqa: C901 [too-complex] | ||
# ruff: noqa: PLR0912 [too-many-branches] | ||
|
||
# Extract PK | ||
pk_columns = None | ||
# At column level? | ||
for column in model.columns: | ||
for column_constraint in column.constraints: | ||
if column_constraint.type == "primary_key": | ||
pk_columns = [column.name] | ||
break | ||
else: | ||
continue | ||
break | ||
# Or at table level? | ||
if pk_columns is None: | ||
for model_constraint in model.constraints: | ||
if model_constraint.type == "primary_key": | ||
pk_columns = model_constraint.columns | ||
break | ||
|
||
if pk_columns is None: # No PK, no need for uniqueness test | ||
return None | ||
|
||
# Look for matching uniqueness test | ||
if len(pk_columns) == 1: | ||
for column in model.columns: | ||
if column.name == pk_columns[0]: | ||
for data_test in column.tests: | ||
if data_test.type == "unique": | ||
return None | ||
|
||
for data_test in model.tests: | ||
if data_test.type == "unique_combination_of_columns": | ||
if set(data_test.kwargs.get("combination_of_columns")) == set(pk_columns): # type: ignore | ||
return None | ||
|
||
return RuleViolation("There is no uniqueness test defined and matching the PK.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rule(rule_filters={is_table()}) | |
def has_uniqueness_test(model: Model) -> RuleViolation | None: | |
"""Model has uniqueness test for primary key.""" | |
# ruff: noqa: C901 [too-complex] | |
# ruff: noqa: PLR0912 [too-many-branches] | |
# Extract PK | |
pk_columns = None | |
# At column level? | |
for column in model.columns: | |
for column_constraint in column.constraints: | |
if column_constraint.type == "primary_key": | |
pk_columns = [column.name] | |
break | |
else: | |
continue | |
break | |
# Or at table level? | |
if pk_columns is None: | |
for model_constraint in model.constraints: | |
if model_constraint.type == "primary_key": | |
pk_columns = model_constraint.columns | |
break | |
if pk_columns is None: # No PK, no need for uniqueness test | |
return None | |
# Look for matching uniqueness test | |
if len(pk_columns) == 1: | |
for column in model.columns: | |
if column.name == pk_columns[0]: | |
for data_test in column.tests: | |
if data_test.type == "unique": | |
return None | |
for data_test in model.tests: | |
if data_test.type == "unique_combination_of_columns": | |
if set(data_test.kwargs.get("combination_of_columns")) == set(pk_columns): # type: ignore | |
return None | |
return RuleViolation("There is no uniqueness test defined and matching the PK.") | |
pk_columns = next( | |
( | |
[column.name] | |
for column in model.columns | |
for constraint in column.constraints | |
if constraint.type == "primary_key" | |
), | |
[], | |
) | |
if not pk_columns: | |
pk_columns = next( | |
( | |
constraint.columns | |
for constraint in model.constraints | |
if constraint.columns and constraint.type == "primary_key" | |
), | |
[], | |
) | |
if len(pk_columns) == 1: | |
pk_column = next( | |
column for column in model.columns if column.name == pk_columns[0] | |
) | |
if not any(test.type == "unique" for test in pk_column.tests): | |
return RuleViolation("There is no uniqueness test defined.") | |
elif len(pk_columns) > 1: | |
constraint_test = next( | |
( | |
test | |
for test in model.tests | |
if test.type == "unique_combination_of_columns" | |
), | |
None, | |
) | |
if not constraint_test: | |
return RuleViolation("There is no uniqueness test defined.") | |
if set(constraint_test.kwargs.get("combination_of_columns", [])) == set( | |
pk_columns | |
): | |
return RuleViolation("Uniqueness test do not match the PK.") |
I tried to get rid of lint exceptions and make it a bit more flat - feel free to use fully or partially
For models materialized as
table
orincremental
:Consider both table- and column-level constraints and tests.