Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Commit

Permalink
Adds initial data quality decorator
Browse files Browse the repository at this point in the history
This is the first-take at the initial data quality decorator.

A few components:

1. check_outputs decorator -- this enables us to run a few defualt decorators
2. the DataValidator base class -- this allows us to have extensible data validators
3. the DefaultDataValidator base class -- this allows us to have a few default validators that map to args of check_outputs
4. some basic default data validators

All is tested so far.

Upcoming is:

1. round out the list of default data validators
2. Add documentation for check_output
3. Add end-to-end tests
4. Configure log/warn levels
5. Add documentatino for extending validators
  • Loading branch information
elijahbenizzy committed May 9, 2022
1 parent 80c55c1 commit 5eb2e98
Show file tree
Hide file tree
Showing 7 changed files with 554 additions and 1 deletion.
Empty file.
84 changes: 84 additions & 0 deletions hamilton/data_quality/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import abc
from typing import Type, Any, List, Dict

import dataclasses


class DataValidationError:
pass


@dataclasses.dataclass
class ValidationResult:
passes: bool # Whether or not this passed the validation
message: str # Error message or success message
diagnostics: Dict[str, Any] = dataclasses.field(default_factory=dict) # Any extra diagnostics information needed, free-form


class DataValidator(abc.ABC):
"""Base class for a data quality operator. This will be used by the `data_quality` operator"""
# Importance levels
WARN = 'warn'
FAIL = 'fail'

VALID_IMPORTANCES = {WARN, FAIL} # TODO -- think through the API

def __init__(self, importance: str):
self._importance = importance

@property
def importance(self) -> str:
return self._importance

@staticmethod
def validate_importance_level(importance: str):
if importance not in DataValidator.VALID_IMPORTANCES:
raise ValueError(f'Importance level must be one of: {DataValidator.VALID_IMPORTANCES}')

@abc.abstractmethod
def applies_to(self, datatype: Type[Type]) -> bool:
"""Whether or not this data validator can apply to the specified dataset
:param datatype:
:return: True if it can be run on the specified type, false otherwise
"""
pass

@abc.abstractmethod
def description(self) -> str:
"""Gives a description of this validator. E.G.
`Checks whether the entire dataset lies between 0 and 1.`
Note it should be able to access internal state (E.G. constructor arguments).
:return: The description of the validator as a string
"""
pass

@abc.abstractmethod
def name(self) -> str:
"""Returns the name for this validator."""

@abc.abstractmethod
def validate(self, dataset: Any) -> ValidationResult:
"""Actually performs the validation. Note when you
:param dataset:
:return:
"""
pass

def required_config(self) -> List[str]:
"""Gets the required configuration items. These are likely passed in in construction
(E.G. in the constructor parameters).
:return: A list of required configurations
"""
return []

def dependencies(self) -> List[str]:
"""Nodes upon which this depends. For example,
this might depend on a node that provides the output from the
last run of this DAG to execute an auto-correlation.
:return: The list of node-name dependencies.
"""
return []
156 changes: 156 additions & 0 deletions hamilton/data_quality/default_validators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import abc
import numbers
from typing import Any, Type, List, Optional, Tuple

from hamilton.data_quality.base import DataValidator, ValidationResult
import pandas as pd


class BaseDefaultValidator(DataValidator, abc.ABC):
"""Base class for a default validator.
These are all validators that utilize a single argument to be passed to the decorator check_output.
check_output can thus delegate to multiple of these. This is an internal abstraction to allow for easy
creation of validators.
"""

@classmethod
@abc.abstractmethod
def applies_to(cls, datatype: Type[Type]) -> bool:
pass

@abc.abstractmethod
def description(self) -> str:
pass

@abc.abstractmethod
def validate(self, data: Any) -> ValidationResult:
pass

@classmethod
@abc.abstractmethod
def arg(cls) -> str:
"""Yields a string that represents this validator's argument.
@check_output() will be passed a series of kwargs, each one of which will correspond to
one of these default validators. Note that we have the limitation of allowing just a single
argument.
:return: The argument that this needs.
"""
pass


class DataInRangeValidatorPandas(BaseDefaultValidator):

def name(self) -> str:
return f'data_in_range_validator'

def __init__(self, range: Tuple[float, float], importance: str):
"""Data validator that tells if data is in a range. This applies to primitives (ints, floats).
:param range: Inclusive range of parameters
"""
super(DataInRangeValidatorPandas).__init__(importance=importance)
self.range = range

@classmethod
def arg(cls) -> str:
return 'range'

@classmethod
def applies_to(cls, datatype: Type[Type]) -> bool:
return issubclass(datatype, pd.Series) # TODO -- handle dataframes?

def description(self) -> str:
return f'Validates that the datapoint falls within the range ({self.range[0]}, {self.range[1]})'

def validate(self, data: pd.Series) -> ValidationResult:
min_, max_ = self.range
between = data.between(min_, max_, inclusive=True)
counts = between.value_counts()
in_range = counts[True]
out_range = counts[False]
passes = out_range == 0
message = f'Series contains {in_range} values in range ({min_},{max_}), and {out_range} outside.'
return ValidationResult(
passes=passes,
message=message,
diagnostics={
'range': self.range,
'in_range': in_range,
'out_range': out_range,
'data_size': len(data)
}
)


class DataInRangeValidatorPrimitives(BaseDefaultValidator):
def __init__(self, range: str, importance: str):
"""Data validator that tells if data is in a range. This applies to primitives (ints, floats).
:param range: Inclusive range of parameters
"""
super(DataInRangeValidatorPrimitives).__init__(importance=importance)
self.range = range

@classmethod
def applies_to(cls, datatype: Type[Type]) -> bool:
return issubclass(datatype, numbers.Real)

def description(self) -> str:
return f'Validates that the datapoint falls within the range ({self.range[0]}, {self.range[1]})'

def validate(self, data: numbers.Real) -> ValidationResult:
min_, max_ = self.range
passes = min_ <= data <= max_
message = f'Data point {data} falls within acceptable range: ({min_}, {max_})' if passes else \
f'Data point {data} does not fall within acceptable range: ({min_}, {max_})'
return ValidationResult(
passes=passes,
message=message,
diagnostics={
'range': self.range,
'value': data
}
)

@classmethod
def arg(cls) -> str:
return 'range'

def name(self) -> str:
return 'data_in_range_validator'


AVAILABLE_DEFAULT_VALIDATORS = [
DataInRangeValidatorPandas,
DataInRangeValidatorPrimitives,
]


def resolve_default_validators(
output_type: Type[Type],
importance: str,
available_validators: List[Type[BaseDefaultValidator]] = None,
**default_validator_kwargs) -> List[BaseDefaultValidator]:
"""Resolves default validators given a set pof parameters and the type to which they apply.
Note that each (kwarg, type) combination should map to a validator
@param importance: importance level of the validator to instantiate
@param output_type: The type to which the validator should apply
@param available_validators: The available validators to choose from
@param default_validator_kwargs: Kwargs to use
@return: A list of validators to use
"""
if available_validators is None:
available_validators = AVAILABLE_DEFAULT_VALIDATORS
validators = []
for key in default_validator_kwargs.keys():
for validator_cls in available_validators:
if key == validator_cls.arg() and validator_cls.applies_to(output_type):
validators.append(validator_cls(**{key: default_validator_kwargs[key], 'importance': importance}))
break
else:
raise ValueError(f'No registered subclass of BaseDefaultValidator is available '
f'for arg: {key} and type {output_type}. This either means (a) this arg-type '
f"contribution isn't supported or (b) this has not been added yet (but should be). "
f'In the case of (b), we welcome contributions. Get started at github.com/stitchfix/hamilton')
return validators
116 changes: 115 additions & 1 deletion hamilton/function_modifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import typing_inspect

from hamilton import node
from hamilton.function_modifiers_base import NodeCreator, NodeResolver, NodeExpander, sanitize_function_name, NodeDecorator
from hamilton.data_quality.base import DataValidator, ValidationResult
from hamilton.data_quality.default_validators import resolve_default_validators, BaseDefaultValidator
from hamilton.function_modifiers_base import NodeCreator, NodeResolver, NodeExpander, sanitize_function_name, NodeDecorator, NodeTransformer
from hamilton.models import BaseModel

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -735,3 +737,115 @@ def validate(self, fn: Callable):
'Paths components also cannot be empty. '
'The value can be anything. Note that the following top-level prefixes are '
f'reserved as well: {self.RESERVED_TAG_NAMESPACES}')


class check_output(NodeTransformer):
def __init__(self,
importance: str = DataValidator.WARN,
default_decorator_candidates: Type[BaseDefaultValidator] = None,
**default_validator_kwargs: Any):
"""Creates the check_output validator. This constructs the default validator class.
Note that this creates a whole set of default validators
TODO -- enable construction of custom validators using check_output.custom(*validators)
:param importance: For the default validator, how important is it that this passes.
:param validator_kwargs: keyword arguments to be passed to the validator
"""
self.importance = importance
self.default_validator_kwargs = default_validator_kwargs
self.default_decorator_candidates = default_decorator_candidates
# We need to wait until we actually have the function in order to construct the validators
# So, we'll just store the constructor arguments for now and check it in validation

@staticmethod
def _validate_constructor_args(*validator: DataValidator, importance: str = None, **default_validator_kwargs: Any):
if len(validator) != 0:
if importance is not None or len(default_validator_kwargs) > 0:
raise ValueError(
f'Can provide *either* a list of custom validators or arguments for the default validator. '
f'Instead received both.')
else:
if importance is None:
raise ValueError(f'Must supply an ipmortance level if using the default validator.')

def _resolve_validators(self, return_type: Type[Type]) -> typing.Collection[DataValidator]:
return resolve_default_validators(
return_type,
importance=self.importance,
available_validators=self.default_decorator_candidates,
**self.default_validator_kwargs)

def transform_node(self, node_: node.Node, config: Dict[str, Any], fn: Callable) -> Collection[node.Node]:
"""Transforms the node into a subdag that does validation and still returns the node's result.
Say we have a node n and two validators (V1 and V2). The subdag will look like:
n_raw :=
> n_raw -> V1 -> n
> n_raw -> V2 -> n
> n_raw -> n
where
- V1 and V2 are nodes that return a tuple of original result
- n_raw is the original node, clone of n
- n is the final node -- it takes in any number of data quality results and drops them, as well as the original result
Note that n takes in params it does not use -- this is to ensure execution order of the DAG.
:param node_: Node to transform.
:param config: Configuration used to transform TODO -- add configuration elements for turning on/off dq actions
:param fn: Function that was being decorated
:return: A new list of nodes specifying the original DAG.
"""
raw_node = node.Node(
name=node_.name + '_raw', # TODO -- make this unique -- this will break with multiple validation decorators, which we *don't* want
typ=node_.type,
doc_string=node_.documentation,
callabl=node_.callable,
node_source=node_.node_source,
input_types=node_.input_types,
tags=node_.tags)
validators = self._resolve_validators(node_.type)
validator_nodes = []
for validator in validators:
def validation_function(validator_to_call: DataValidator = validator, **kwargs):
result = list(kwargs.values())[0] # This should just have one kwarg
return validator_to_call.validate(result)

validator_node = node.Node(
name=node_.name + '_' + validator.name(), # TODO -- determine a good approach towards naming this
typ=ValidationResult,
doc_string=validator.description(),
callabl=validation_function,
node_source=node.NodeSource.STANDARD,
input_types={raw_node.name: (node_.type, node.DependencyType.REQUIRED)},
)
validator_nodes.append(validator_node)

def final_node_callable(**kwargs):
return kwargs[raw_node.name]

final_node = node.Node(
name=node_.name,
typ=node_.type,
doc_string=node_.documentation,
callabl=final_node_callable,
node_source=node_.node_source,
input_types={
raw_node.name: (node_.type, node.DependencyType.REQUIRED),
**{validator_node.name: (validator_node.type, node.DependencyType.REQUIRED) for validator_node in validator_nodes}},
tags=node_.tags)
return [*validator_nodes, final_node, raw_node]

def validate(self, fn: Callable):
"""Validates that the check_output node works on the function on which it was called
:param fn: Function to validate
:raises: InvalidDecoratorException if the decorator is not valid for the function's return type
"""
pass
# sig = inspect.signature(fn)
# return_annotation = sig.return_annotation
# We may want to use the upstream node to validate rather than the function...
# Temporarily we'll assume that they're the same, although this will have to get fixed prior to release
# This will break, for instance, with extract_columns (dataframe versus series)
# The solution is probably to apply the validator within transform_node and not validate here, *or*
# redo the framework to validate based off of dependent nodes?
# self._resolve_validators(return_annotation) # Just resolve validators to ensure that we can
2 changes: 2 additions & 0 deletions hamilton/function_modifiers_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ def allows_multiple(cls) -> bool:


class NodeTransformer(SubDAGModifier):
NON_FINAL_TAG = 'hamilton.decorators.non_final' # TODO -- utilize this in _separate_final_nodes

def _separate_final_nodes(self, nodes: Collection[node.Node]) -> Tuple[Collection[node.Node], Collection[node.Node]]:
"""Separates out final nodes (sinks) from the nodes.
Expand Down
Loading

0 comments on commit 5eb2e98

Please sign in to comment.