diff --git a/python_modules/dagster/dagster/core/execution/context/__init__.py b/python_modules/dagster/dagster/core/execution/context/__init__.py index 11ae9c3bd0f42..7bfa75f9484cc 100644 --- a/python_modules/dagster/dagster/core/execution/context/__init__.py +++ b/python_modules/dagster/dagster/core/execution/context/__init__.py @@ -15,13 +15,7 @@ | |--> SystemStepExecutionContext (produced by .for_step() on pipeline context) | -<<<<<<< HEAD - |--> SystemComputeExecutionContext (produced by .for_compute() on step context) - | - |--> SystemExpectationExecutionContext (produced by .for_expectation() on step context) -======= |--> SystemComputeExecutionContext (produced by .for_transform() on step context) ->>>>>>> [expectations] Delete IOExpectationDefinition and related classes ==================================================================================================== In the system contexts, immutable state is stored in SystemPipelineExecutionContextData, and any diff --git a/python_modules/dagster/dagster/core/execution/plan/expectations.py b/python_modules/dagster/dagster/core/execution/plan/expectations.py deleted file mode 100644 index e6e18c98a3506..0000000000000 --- a/python_modules/dagster/dagster/core/execution/plan/expectations.py +++ /dev/null @@ -1,173 +0,0 @@ -from dagster import check -from dagster.core.definitions import ( - DagsterIOExpectationFailedError, - ExpectationResult, - InputDefinition, - IOExpectationDefinition, - Output, - OutputDefinition, - Solid, - SolidHandle, -) -from dagster.core.errors import DagsterInvariantViolationError -from dagster.core.execution.context.expectation import ExpectationExecutionContext -from dagster.core.execution.context.system import SystemStepExecutionContext -from dagster.core.system_config.objects import EnvironmentConfig - -from .objects import ( - ExecutionStep, - ExecutionValueSubplan, - StepInput, - StepKind, - StepOutput, - StepOutputHandle, -) -from .utility import create_joining_subplan - -EXPECTATION_INPUT = 'expectation_input' -EXPECTATION_VALUE_OUTPUT = 'expectation_value' - - -def _create_expectation_lambda(solid, inout_def, expectation_def, internal_output_name): - check.inst_param(solid, 'solid', Solid) - check.inst_param(inout_def, 'inout_def', (InputDefinition, OutputDefinition)) - check.inst_param(expectation_def, 'expectations_def', IOExpectationDefinition) - check.str_param(internal_output_name, 'internal_output_name') - - def _do_expectation(expectation_context, inputs): - check.inst_param(expectation_context, 'step_context', SystemStepExecutionContext) - value = inputs[EXPECTATION_INPUT] - expectation_context = expectation_context.for_expectation(inout_def, expectation_def) - expt_result = expectation_def.expectation_fn( - ExpectationExecutionContext(expectation_context), value - ) - - if not isinstance(expt_result, ExpectationResult): - raise DagsterInvariantViolationError( - ( - 'Expectation for solid {solid_name} on {desc_key} {inout_name} ' - 'did not return an ExpectationResult'.format( - solid_name=solid.name, - desc_key=inout_def.descriptive_key, - inout_name=inout_def.name, - ) - ) - ) - - if expt_result.success: - yield expt_result - yield Output(output_name=internal_output_name, value=inputs[EXPECTATION_INPUT]) - else: - expectation_context.log.debug( - 'Expectation {key} failed on {value}.'.format( - key=expectation_context.step.key, value=value - ) - ) - raise DagsterIOExpectationFailedError(expectation_context, value) - - return _do_expectation - - -def create_expectations_subplan(pipeline_name, solid, inout_def, step_input, kind, handle): - check.str_param(pipeline_name, 'pipeline_name') - check.inst_param(solid, 'solid', Solid) - check.inst_param(inout_def, 'inout_def', (InputDefinition, OutputDefinition)) - check.inst_param(step_input, 'step_input', StepInput) - check.inst_param(kind, 'kind', StepKind) - - input_expect_steps = [] - for expectation_def in inout_def.expectations: - expect_step = create_expectation_step( - pipeline_name=pipeline_name, - solid=solid, - expectation_def=expectation_def, - key_suffix='{desc_key}.{inout_name}.expectation.{expectation_name}'.format( - desc_key=inout_def.descriptive_key, - inout_name=inout_def.name, - expectation_name=expectation_def.name, - ), - kind=kind, - step_input=step_input, - inout_def=inout_def, - handle=handle, - ) - input_expect_steps.append(expect_step) - - return create_joining_subplan( - pipeline_name, - solid, - '{desc_key}.{inout_name}.expectations.join'.format( - desc_key=inout_def.descriptive_key, inout_name=inout_def.name - ), - input_expect_steps, - EXPECTATION_VALUE_OUTPUT, - handle, - ) - - -def create_expectation_step( - pipeline_name, solid, expectation_def, key_suffix, kind, step_input, inout_def, handle -): - check.str_param(pipeline_name, 'pipeline_name') - check.inst_param(solid, 'solid', Solid) - check.inst_param(expectation_def, 'expectation_def', IOExpectationDefinition) - check.str_param(key_suffix, 'key_suffix') - check.inst_param(kind, 'kind', StepKind) - check.inst_param(step_input, 'step_input', StepInput) - check.inst_param(inout_def, 'inout_def', (InputDefinition, OutputDefinition)) - check.opt_inst_param(handle, 'handle', SolidHandle) - - value_type = inout_def.runtime_type - - return ExecutionStep( - pipeline_name=pipeline_name, - key_suffix=key_suffix, - step_inputs=[ - StepInput( - name=EXPECTATION_INPUT, - runtime_type=value_type, - prev_output_handle=step_input.prev_output_handle, - config_data=step_input.config_data, - ) - ], - step_outputs=[ - # Expectation value output is optional since we omit if the expectation fails - StepOutput(name=EXPECTATION_VALUE_OUTPUT, runtime_type=value_type, optional=True) - ], - compute_fn=_create_expectation_lambda( - solid, inout_def, expectation_def, EXPECTATION_VALUE_OUTPUT - ), - kind=kind, - solid_handle=handle, - logging_tags={ - 'expectation': expectation_def.name, - inout_def.descriptive_key: inout_def.name, - }, - ) - - -def decorate_with_expectations( - pipeline_name, environment_config, solid, compute_step, output_def, handle -): - check.str_param(pipeline_name, 'pipeline_name') - check.inst_param(environment_config, 'environment_config', EnvironmentConfig) - check.inst_param(solid, 'solid', Solid) - check.inst_param(compute_step, 'compute_step', ExecutionStep) - check.inst_param(output_def, 'output_def', OutputDefinition) - check.opt_inst_param(handle, 'handle', SolidHandle) - - # resolve to the output def from the compute_step's solid - output_def = solid.definition.resolve_output_to_origin(output_def.name) - terminal_step_output_handle = StepOutputHandle.from_step(compute_step, output_def.name) - - if environment_config.expectations.evaluate and output_def.expectations: - return create_expectations_subplan( - pipeline_name, - solid, - output_def, - StepInput(output_def.name, output_def.runtime_type, terminal_step_output_handle), - kind=StepKind.OUTPUT_EXPECTATION, - handle=handle, - ) - else: - return ExecutionValueSubplan.empty(terminal_step_output_handle) diff --git a/python_modules/dagster/dagster/core/execution/plan/plan.py b/python_modules/dagster/dagster/core/execution/plan/plan.py index 6dbc859a6d9bc..0c5c797793afc 100644 --- a/python_modules/dagster/dagster/core/execution/plan/plan.py +++ b/python_modules/dagster/dagster/core/execution/plan/plan.py @@ -156,7 +156,7 @@ def _build_from_sorted_solids( # Create and add execution plan steps (and output handles) for solid outputs for name, output_def in solid.definition.output_dict.items(): subplan = create_subplan_for_output( - self.pipeline_name, solid, terminal_transform_step, output_def + self.pipeline_name, solid, terminal_compute_step, output_def ) self.add_steps(subplan.steps) @@ -166,14 +166,14 @@ def _build_from_sorted_solids( return terminal_compute_step -def create_subplan_for_output(pipeline_name, solid, solid_transform_step, output_def): +def create_subplan_for_output(pipeline_name, solid, solid_compute_step, output_def): check.str_param(pipeline_name, 'pipeline_name') check.inst_param(solid, 'solid', Solid) check.inst_param(solid_compute_step, 'solid_compute_step', ExecutionStep) check.inst_param(output_def, 'output_def', OutputDefinition) output_def = solid.definition.resolve_output_to_origin(output_def.name) - terminal_step_output_handle = StepOutputHandle.from_step(solid_transform_step, output_def.name) + terminal_step_output_handle = StepOutputHandle.from_step(solid_compute_step, output_def.name) return ExecutionValueSubplan.empty(terminal_step_output_handle) diff --git a/python_modules/dagster/dagster_tests/core_tests/test_multiple_outputs.py b/python_modules/dagster/dagster_tests/core_tests/test_multiple_outputs.py index 60bee0c4ae727..25495335b95e5 100644 --- a/python_modules/dagster/dagster_tests/core_tests/test_multiple_outputs.py +++ b/python_modules/dagster/dagster_tests/core_tests/test_multiple_outputs.py @@ -4,7 +4,6 @@ DagsterInvariantViolationError, DagsterStepOutputNotFoundError, DependencyDefinition, - ExpectationResult, InputDefinition, Output, OutputDefinition, @@ -41,41 +40,6 @@ def _t_fn(*_args): solid_result.output_value('not_defined') -def test_multiple_outputs_expectations(): - called = {} - - def _expect_fn_one(*_args, **_kwargs): - called['expectation_one'] = True - return True - - def _expect_fn_two(*_args, **_kwargs): - called['expectation_two'] = True - return True - - def _compute_fn(*_args, **_kwargs): - output_one_val = 'foo' - output_two_val = 'bar' - yield ExpectationResult(label='some_expectation', success=_expect_fn_one(output_one_val)) - yield ExpectationResult(label='some_expectation', success=_expect_fn_two(output_two_val)) - yield Output(output_one_val, 'output_one') - yield Output(output_two_val, 'output_two') - - solid = SolidDefinition( - name='multiple_outputs', - input_defs=[], - output_defs=[OutputDefinition(name='output_one'), OutputDefinition(name='output_two')], - compute_fn=_compute_fn, - ) - - pipeline = PipelineDefinition(solid_defs=[solid]) - - result = execute_pipeline(pipeline) - - assert result.success - assert called['expectation_one'] - assert called['expectation_two'] - - def test_wrong_multiple_output(): def _t_fn(*_args): yield Output(output_name='mismatch', value='foo') diff --git a/python_modules/libraries/dagster-ge/dagster_ge/__init__.py b/python_modules/libraries/dagster-ge/dagster_ge/__init__.py index e69de29bb2d1d..23738078b1b3a 100644 --- a/python_modules/libraries/dagster-ge/dagster_ge/__init__.py +++ b/python_modules/libraries/dagster-ge/dagster_ge/__init__.py @@ -0,0 +1,2 @@ +# TODO: This module should be rebuilt on top of ExpectationResult +# see: https://github.com/dagster-io/dagster/issues/1546