From b678a824aa48901e2f610a46698377082cfcf92e Mon Sep 17 00:00:00 2001 From: Yuhan Luo <4531914+yuhan@users.noreply.github.com> Date: Thu, 16 Sep 2021 21:16:48 +0800 Subject: [PATCH] RFC: dagstermill io use io manager for output notebook (#4490) depends on https://github.com/dagster-io/dagster/pull/4500 prior context: https://dagster.phacility.com/D6145 **User-facing changes:** (1) Deprecating `output_notebook` in favor of `output_notebook_name`. - Using the old property `output_notebook` would require "file_manager" and result in a FileHandle output - no breaking change to existing users. - Using the new property `output_notebook_name` would result in a bytes output and require "output_notebook_io_manager", see details in (2) (2) With the new param, it requires a dedicated IO manager for output notebook. When `output_notebook` or `output_notebook_name` is specified, "output_notebook_io_manager" is required as a resource. - we provide a built-in `local_output_notebook_io_manager` for handling local output notebook materialization. - **New capabilities:** Users now can customize their own "output_notebook_io_manager" by extending `OutputNotebookIOManager`. This enables use cases: - users who want better asset key control can override `get_output_asset_key`. - users who want precise IO control over the output notebook can customize their own `handle_output` and `load_input`, e.g. they can control the file name of the output notebook. - users who want to attach more meaningful metadata can yield EventMetadataEntry in their own `handle_output` method this is also fixing https://github.com/dagster-io/dagster/issues/3380 --- .../dagstermill/examples/repository.py | 173 +++++++++++------- .../dagstermill/dagstermill/io_managers.py | 74 ++++++++ .../dagstermill/dagstermill/solids.py | 121 +++++++----- .../dagstermill/dagstermill_tests/test_io.py | 56 ++++-- .../dagstermill_tests/test_logging.py | 4 +- .../dagstermill_tests/test_manager.py | 5 +- 6 files changed, 293 insertions(+), 140 deletions(-) create mode 100644 python_modules/libraries/dagstermill/dagstermill/io_managers.py diff --git a/python_modules/libraries/dagstermill/dagstermill/examples/repository.py b/python_modules/libraries/dagstermill/dagstermill/examples/repository.py index f6fe96d5db2a8..15683c7e9707a 100644 --- a/python_modules/libraries/dagstermill/dagstermill/examples/repository.py +++ b/python_modules/libraries/dagstermill/dagstermill/examples/repository.py @@ -25,6 +25,7 @@ from dagster.core.types.dagster_type import DagsterType from dagster.core.types.marshal import SerializationStrategy from dagster.utils import PICKLE_PROTOCOL, file_relative_path +from dagstermill.io_managers import local_output_notebook_io_manager try: from dagster_pandas import DataFrame @@ -66,7 +67,7 @@ def test_nb_solid(name, **kwargs): return dagstermill.define_dagstermill_solid( name=name, notebook_path=nb_test_path(name), - output_notebook="notebook", + output_notebook_name="notebook", output_defs=output_defs, **kwargs, ) @@ -75,7 +76,7 @@ def test_nb_solid(name, **kwargs): default_mode_defs = [ ModeDefinition( resource_defs={ - "file_manager": local_file_manager, + "output_notebook_io_manager": local_output_notebook_io_manager, "io_manager": fs_io_manager, } ) @@ -93,7 +94,7 @@ def hello_world_pipeline(): hello_world_with_custom_tags_and_description = dagstermill.define_dagstermill_solid( name="hello_world_custom", notebook_path=nb_test_path("hello_world"), - output_notebook="notebook", + output_notebook_name="notebook", tags={"foo": "bar"}, description="custom description", ) @@ -113,7 +114,7 @@ def hello_world_with_custom_tags_and_description_pipeline(): goodbye_config = dagstermill.define_dagstermill_solid( name="goodbye_config", notebook_path=nb_test_path("print_dagstermill_context_solid_config"), - output_notebook="notebook", + output_notebook_name="notebook", config_schema={"farewell": Field(String, is_required=False, default_value="goodbye")}, ) @@ -130,9 +131,9 @@ def alias_config_pipeline(): goodbye_config.alias("aliased_goodbye")() -@solid(input_defs=[InputDefinition("notebook", dagster_type=FileHandle)]) +@solid(input_defs=[InputDefinition("notebook")]) def load_notebook(notebook): - return os.path.exists(notebook.path_desc) + return notebook @pipeline(mode_defs=default_mode_defs) @@ -155,7 +156,6 @@ def hello_world_no_output_notebook_no_file_manager_pipeline(): hello_world_no_output_notebook = dagstermill.define_dagstermill_solid( name="hello_world_no_output_notebook", notebook_path=nb_test_path("hello_world"), - required_resource_keys={"file_manager"}, ) @@ -355,14 +355,14 @@ def filepicklelist_resource(init_context): resource_defs={ "list": ResourceDefinition(lambda _: []), "io_manager": fs_io_manager, - "file_manager": local_file_manager, + "output_notebook_io_manager": local_output_notebook_io_manager, }, ), ModeDefinition( name="prod", resource_defs={ "list": filepicklelist_resource, - "file_manager": local_file_manager, + "output_notebook_io_manager": local_output_notebook_io_manager, "io_manager": fs_io_manager, }, ), @@ -377,7 +377,7 @@ def resource_pipeline(): ModeDefinition( resource_defs={ "list": filepicklelist_resource, - "file_manager": local_file_manager, + "output_notebook_io_manager": local_output_notebook_io_manager, "io_manager": fs_io_manager, } ) @@ -438,6 +438,99 @@ def failure_pipeline(): test_nb_solid("yield_failure")() +yield_something = test_nb_solid( + "yield_something", + input_defs=[InputDefinition("obj", str)], + output_defs=[OutputDefinition(str, "result")], +) + + +@solid +def fan_in(a, b): + return f"{a} {b}" + + +@pipeline( + mode_defs=[ + ModeDefinition( + resource_defs={ + "io_manager": fs_io_manager, + "output_notebook_io_manager": local_output_notebook_io_manager, + } + ) + ] +) +def fan_in_notebook_pipeline(): + a, _ = yield_something.alias("solid_1")() + b, _ = yield_something.alias("solid_2")() + fan_in(a, b) + + +@pipeline( + mode_defs=[ + ModeDefinition( + resource_defs={ + "output_notebook_io_manager": local_output_notebook_io_manager, + } + ) + ] +) +def fan_in_notebook_pipeline_in_mem(): + a, _ = yield_something.alias("solid_1")() + b, _ = yield_something.alias("solid_2")() + fan_in(a, b) + + +@composite_solid +def outer(): + yield_something() + + +@pipeline( + mode_defs=[ + ModeDefinition( + resource_defs={ + "io_manager": fs_io_manager, + "output_notebook_io_manager": local_output_notebook_io_manager, + } + ) + ] +) +def composite_pipeline(): + outer() + + +################################################################################################### +# Back compat +################################################################################################### + +hello_world_legacy = dagstermill.define_dagstermill_solid( + name="hello_world_legacy", + notebook_path=nb_test_path("hello_world"), + output_notebook="notebook", +) + + +@solid(input_defs=[InputDefinition("notebook", dagster_type=FileHandle)]) +def load_notebook_legacy(notebook): + return os.path.exists(notebook.path_desc) + + +@pipeline( + mode_defs=[ + ModeDefinition( + resource_defs={ + "io_manager": fs_io_manager, + "file_manager": local_file_manager, + } + ) + ] +) +def hello_world_with_output_notebook_pipeline_legacy(): + notebook = hello_world_legacy() + load_notebook_legacy(notebook) + + class ASerializationStrategy(SerializationStrategy): def __init__(self, name="foo"): super(ASerializationStrategy, self).__init__(name) @@ -509,62 +602,6 @@ def composite_pipeline_legacy(): outer_legacy() -yield_something = test_nb_solid( - "yield_something", - input_defs=[InputDefinition("obj", str)], - output_defs=[OutputDefinition(str, "result")], -) - - -@solid -def fan_in(a, b): - return f"{a} {b}" - - -@pipeline( - mode_defs=[ - ModeDefinition( - resource_defs={"io_manager": fs_io_manager, "file_manager": local_file_manager} - ) - ] -) -def fan_in_notebook_pipeline(): - a, _ = yield_something.alias("solid_1")() - b, _ = yield_something.alias("solid_2")() - fan_in(a, b) - - -@pipeline( - mode_defs=[ - ModeDefinition( - resource_defs={ - "file_manager": local_file_manager, - } - ) - ] -) -def fan_in_notebook_pipeline_in_mem(): - a, _ = yield_something.alias("solid_1")() - b, _ = yield_something.alias("solid_2")() - fan_in(a, b) - - -@composite_solid -def outer(): - yield_something() - - -@pipeline( - mode_defs=[ - ModeDefinition( - resource_defs={"io_manager": fs_io_manager, "file_manager": local_file_manager} - ) - ] -) -def composite_pipeline(): - outer() - - @repository def notebook_repo(): pipelines = [ @@ -575,6 +612,7 @@ def notebook_repo(): hello_world_config_pipeline, hello_world_explicit_yield_pipeline, hello_world_output_pipeline, + hello_world_with_output_notebook_pipeline, hello_logging_pipeline, resource_pipeline, resource_with_exception_pipeline, @@ -585,10 +623,11 @@ def notebook_repo(): yield_obj_pipeline, retries_pipeline, failure_pipeline, - fan_in_notebook_pipeline_legacy, fan_in_notebook_pipeline_in_mem, fan_in_notebook_pipeline, hello_world_no_output_notebook_no_file_manager_pipeline, + hello_world_with_output_notebook_pipeline_legacy, + fan_in_notebook_pipeline_legacy, ] if DAGSTER_PANDAS_PRESENT and SKLEARN_PRESENT and MATPLOTLIB_PRESENT: pipelines += [tutorial_pipeline] diff --git a/python_modules/libraries/dagstermill/dagstermill/io_managers.py b/python_modules/libraries/dagstermill/dagstermill/io_managers.py new file mode 100644 index 0000000000000..5553593a78f53 --- /dev/null +++ b/python_modules/libraries/dagstermill/dagstermill/io_managers.py @@ -0,0 +1,74 @@ +import os +from pathlib import Path +from typing import Any, List, Optional + +from dagster import check +from dagster.config.field import Field +from dagster.core.definitions.event_metadata import EventMetadataEntry +from dagster.core.definitions.events import AssetKey +from dagster.core.execution.context.input import InputContext +from dagster.core.execution.context.output import OutputContext +from dagster.core.storage.io_manager import IOManager, io_manager +from dagster.utils import mkdir_p + + +class OutputNotebookIOManager(IOManager): + def __init__(self, asset_key_prefix: Optional[List[str]] = None): + self.asset_key_prefix = asset_key_prefix if asset_key_prefix else [] + + def get_output_asset_key(self, context: OutputContext): + return AssetKey([*self.asset_key_prefix, f"{context.step_key}_output_notebook"]) + + def handle_output(self, context: OutputContext, obj: bytes): + raise NotImplementedError + + def load_input(self, context: InputContext) -> Any: + raise NotImplementedError + + +class LocalOutputNotebookIOManager(OutputNotebookIOManager): + """Built-in IO Manager for handling output notebook.""" + + def __init__(self, base_dir: str, asset_key_prefix: Optional[List[str]] = None): + super(LocalOutputNotebookIOManager, self).__init__(asset_key_prefix=asset_key_prefix) + self.base_dir = base_dir + self.write_mode = "wb" + self.read_mode = "rb" + + def _get_path(self, context: OutputContext) -> str: + """Automatically construct filepath.""" + keys = context.get_run_scoped_output_identifier() + return str(Path(self.base_dir, *keys).with_suffix(".ipynb")) + + def handle_output(self, context: OutputContext, obj: bytes): + """obj: bytes""" + check.inst_param(context, "context", OutputContext) + + # the output notebook itself is stored at output_file_path + output_notebook_path = self._get_path(context) + mkdir_p(os.path.dirname(output_notebook_path)) + with open(output_notebook_path, self.write_mode) as dest_file_obj: + dest_file_obj.write(obj) + yield EventMetadataEntry.fspath(path=output_notebook_path, label="path") + + def load_input(self, context) -> bytes: + check.inst_param(context, "context", InputContext) + # pass output notebook to downstream solids as File Object + with open(self._get_path(context.upstream_output), self.read_mode) as file_obj: + return file_obj.read() + + +@io_manager( + config_schema={ + "asset_key_prefix": Field(str, is_required=False), + "base_dir": Field(str, is_required=False), + }, +) +def local_output_notebook_io_manager(init_context): + """Built-in IO Manager that handles output notebooks.""" + return LocalOutputNotebookIOManager( + base_dir=init_context.resource_config.get( + "base_dir", init_context.instance.storage_directory() + ), + asset_key_prefix=init_context.resource_config.get("asset_key_prefix", []), + ) diff --git a/python_modules/libraries/dagstermill/dagstermill/solids.py b/python_modules/libraries/dagstermill/dagstermill/solids.py index 48993386d2828..7c9d5f80b9842 100644 --- a/python_modules/libraries/dagstermill/dagstermill/solids.py +++ b/python_modules/libraries/dagstermill/dagstermill/solids.py @@ -7,17 +7,9 @@ import nbformat import papermill -from dagster import ( - AssetMaterialization, - EventMetadataEntry, - InputDefinition, - Output, - OutputDefinition, - SolidDefinition, - check, - seven, -) -from dagster.core.definitions.events import Failure, RetryRequested +from dagster import InputDefinition, Output, OutputDefinition, SolidDefinition, check, seven +from dagster.core.definitions.event_metadata import EventMetadataEntry +from dagster.core.definitions.events import AssetMaterialization, Failure, RetryRequested from dagster.core.definitions.reconstructable import ReconstructablePipeline from dagster.core.definitions.utils import validate_tags from dagster.core.execution.context.compute import SolidExecutionContext @@ -28,6 +20,7 @@ from dagster.serdes import pack_value from dagster.seven import get_system_temp_directory from dagster.utils import mkdir_p, safe_tempfile_path +from dagster.utils.backcompat import rename_warning from dagster.utils.error import serializable_error_info_from_exc_info from papermill.engines import papermill_engines from papermill.iorw import load_notebook_node, write_ipynb @@ -144,11 +137,14 @@ def get_papermill_parameters(step_context, inputs, output_log_path): return parameters -def _dm_solid_compute(name, notebook_path, output_notebook=None, asset_key_prefix=None): +def _dm_solid_compute( + name, notebook_path, output_notebook_name=None, asset_key_prefix=None, output_notebook=None +): check.str_param(name, "name") check.str_param(notebook_path, "notebook_path") - check.opt_str_param(output_notebook, "output_notebook") + check.opt_str_param(output_notebook_name, "output_notebook_name") check.opt_list_param(asset_key_prefix, "asset_key_prefix") + check.opt_str_param(output_notebook, "output_notebook") def _t_fn(step_context, inputs): check.inst_param(step_context, "step_context", SolidExecutionContext) @@ -212,40 +208,47 @@ def _t_fn(step_context, inputs): executed_notebook_path=executed_notebook_path, ) ) - - executed_notebook_file_handle = None - try: - # use binary mode when when moving the file since certain file_managers such as S3 - # may try to hash the contents + if output_notebook_name is not None: + # yield output notebook binary stream as a solid output with open(executed_notebook_path, "rb") as fd: - executed_notebook_file_handle = step_context.resources.file_manager.write( - fd, mode="wb", ext="ipynb" + yield Output(fd.read(), output_notebook_name) + else: + # backcompat + executed_notebook_file_handle = None + try: + # use binary mode when when moving the file since certain file_managers such as S3 + # may try to hash the contents + with open(executed_notebook_path, "rb") as fd: + executed_notebook_file_handle = step_context.resources.file_manager.write( + fd, mode="wb", ext="ipynb" + ) + executed_notebook_materialization_path = ( + executed_notebook_file_handle.path_desc + ) + + yield AssetMaterialization( + asset_key=(asset_key_prefix + [f"{name}_output_notebook"]), + description="Location of output notebook in file manager", + metadata_entries=[ + EventMetadataEntry.fspath(executed_notebook_materialization_path) + ], ) - executed_notebook_materialization_path = executed_notebook_file_handle.path_desc - - yield AssetMaterialization( - asset_key=(asset_key_prefix + [f"{name}_output_notebook"]), - description="Location of output notebook in file manager", - metadata_entries=[ - EventMetadataEntry.fspath(executed_notebook_materialization_path) - ], - ) - except Exception: # pylint: disable=broad-except - # if file manager writing errors, e.g. file manager is not provided, we throw a warning - # and fall back to the previously stored temp executed notebook. - step_context.log.warning( - "Error when attempting to materialize executed notebook using file manager: " - f"{str(serializable_error_info_from_exc_info(sys.exc_info()))}" - f"\nNow falling back to local: notebook execution was temporarily materialized at {executed_notebook_path}" - "\nIf you have supplied a file manager and expect to use it for materializing the " - 'notebook, please include "file_manager" in the `required_resource_keys` argument ' - "to `define_dagstermill_solid`" - ) - executed_notebook_materialization_path = executed_notebook_path + except Exception: # pylint: disable=broad-except + # if file manager writing errors, e.g. file manager is not provided, we throw a warning + # and fall back to the previously stored temp executed notebook. + step_context.log.warning( + "Error when attempting to materialize executed notebook using file manager: " + f"{str(serializable_error_info_from_exc_info(sys.exc_info()))}" + f"\nNow falling back to local: notebook execution was temporarily materialized at {executed_notebook_path}" + "\nIf you have supplied a file manager and expect to use it for materializing the " + 'notebook, please include "file_manager" in the `required_resource_keys` argument ' + "to `define_dagstermill_solid`" + ) + executed_notebook_materialization_path = executed_notebook_path - if output_notebook is not None: - yield Output(executed_notebook_file_handle, output_notebook) + if output_notebook is not None: + yield Output(executed_notebook_file_handle, output_notebook) # deferred import for perf import scrapbook @@ -287,6 +290,7 @@ def define_dagstermill_solid( config_schema=None, required_resource_keys=None, output_notebook=None, + output_notebook_name=None, asset_key_prefix=None, description=None, tags=None, @@ -307,6 +311,11 @@ def define_dagstermill_solid( the pipeline resources via the "file_manager" resource key, so, e.g., if :py:class:`~dagster_aws.s3.s3_file_manager` is configured, the output will be a : py:class:`~dagster_aws.s3.S3FileHandle`. + output_notebook_name: (Optional[str]): If set, will be used as the name of an injected output + of type of :py:class:`~dagster.BufferedIOBase` that is the file object of the executed + notebook (in addition to the :py:class:`~dagster.AssetMaterialization` that is always + created). It allows the downstream solids to access the executed notebook via a file + object. asset_key_prefix (Optional[Union[List[str], str]]): If set, will be used to prefix the asset keys for materialized notebooks. description (Optional[str]): If set, description used for solid. @@ -324,8 +333,21 @@ def define_dagstermill_solid( required_resource_keys = check.opt_set_param( required_resource_keys, "required_resource_keys", of_type=str ) + + extra_output_defs = [] + if output_notebook_name is not None: + required_resource_keys.add("output_notebook_io_manager") + extra_output_defs.append( + OutputDefinition(name=output_notebook_name, io_manager_key="output_notebook_io_manager") + ) + # backcompact if output_notebook is not None: + rename_warning( + new_name="output_notebook_name", old_name="output_notebook", breaking_version="0.14.0" + ) required_resource_keys.add("file_manager") + extra_output_defs.append(OutputDefinition(dagster_type=FileHandle, name=output_notebook)) + if isinstance(asset_key_prefix, str): asset_key_prefix = [asset_key_prefix] @@ -350,14 +372,13 @@ def define_dagstermill_solid( name=name, input_defs=input_defs, compute_fn=_dm_solid_compute( - name, notebook_path, output_notebook, asset_key_prefix=asset_key_prefix - ), - output_defs=output_defs - + ( - [OutputDefinition(dagster_type=FileHandle, name=output_notebook)] - if output_notebook - else [] + name, + notebook_path, + output_notebook_name, + asset_key_prefix=asset_key_prefix, + output_notebook=output_notebook, # backcompact ), + output_defs=output_defs + extra_output_defs, config_schema=config_schema, required_resource_keys=required_resource_keys, description=description, diff --git a/python_modules/libraries/dagstermill/dagstermill_tests/test_io.py b/python_modules/libraries/dagstermill/dagstermill_tests/test_io.py index 3196535996ada..55e25dcd42b89 100644 --- a/python_modules/libraries/dagstermill/dagstermill_tests/test_io.py +++ b/python_modules/libraries/dagstermill/dagstermill_tests/test_io.py @@ -14,37 +14,35 @@ def test_yes_output_notebook_yes_file_manager(): # * persist the output notebook in a temp file # * yield AssetMaterialization # * yield Output(value=FileHandle(...), name=output_notebook) - with exec_for_test("hello_world_with_output_notebook_pipeline") as result: + with exec_for_test("hello_world_with_output_notebook_pipeline_legacy") as result: assert result.success materializations = [ x for x in result.event_list if x.event_type_value == "ASSET_MATERIALIZATION" ] assert len(materializations) == 1 - assert result.result_for_solid("hello_world").success - assert "notebook" in result.result_for_solid("hello_world").output_values + assert result.result_for_solid("hello_world_legacy").success + assert "notebook" in result.result_for_solid("hello_world_legacy").output_values assert isinstance( - result.result_for_solid("hello_world").output_values["notebook"], FileHandle + result.result_for_solid("hello_world_legacy").output_values["notebook"], FileHandle ) assert os.path.exists( - result.result_for_solid("hello_world").output_values["notebook"].path_desc + result.result_for_solid("hello_world_legacy").output_values["notebook"].path_desc ) assert ( materializations[0] .event_specific_data.materialization.metadata_entries[0] .entry_data.path - == result.result_for_solid("hello_world").output_values["notebook"].path_desc + == result.result_for_solid("hello_world_legacy").output_values["notebook"].path_desc ) - assert result.result_for_solid("load_notebook").success - assert result.result_for_solid("load_notebook").output_value() is True + assert result.result_for_solid("load_notebook_legacy").success + assert result.result_for_solid("load_notebook_legacy").output_value() is True def test_yes_output_notebook_no_file_manager(): - with pytest.raises( - DagsterInvalidDefinitionError, match='Resource key "file_manager" is required' - ): + with pytest.raises(DagsterInvalidDefinitionError): @pipeline def _pipe(): @@ -55,18 +53,12 @@ def _pipe(): def test_no_output_notebook_yes_file_manager(): # when output_notebook is not set and file_manager is provided: # * persist output notebook (but no solid output) - # * yield AssetMaterialization with exec_for_test("hello_world_no_output_notebook_pipeline") as result: assert result.success materializations = [ x for x in result.event_list if x.event_type_value == "ASSET_MATERIALIZATION" ] - assert len(materializations) == 1 - assert os.path.exists( - materializations[0] - .event_specific_data.materialization.metadata_entries[0] - .entry_data.path - ) + assert len(materializations) == 0 assert result.result_for_solid("hello_world_no_output_notebook").success assert not result.result_for_solid("hello_world_no_output_notebook").output_values @@ -87,3 +79,31 @@ def test_no_output_notebook_no_file_manager(): assert not result.result_for_solid( "hello_world_no_output_notebook_no_file_manager" ).output_values + + +@pytest.mark.notebook_test +def test_yes_output_notebook_yes_io_manager(): + # when output_notebook is set and io manager is provided: + # * persist the output notebook via notebook_io_manager + # * yield AssetMaterialization + # * yield Output(value=bytes, name=output_notebook) + with exec_for_test("hello_world_with_output_notebook_pipeline") as result: + assert result.success + materializations = [ + x for x in result.event_list if x.event_type_value == "ASSET_MATERIALIZATION" + ] + assert len(materializations) == 1 + + assert result.result_for_solid("hello_world").success + assert "notebook" in result.result_for_solid("hello_world").output_values + + output_path = ( + materializations[0] + .event_specific_data.materialization.metadata_entries[0] + .entry_data.path + ) + assert os.path.exists(output_path) + + assert result.result_for_solid("load_notebook").success + with open(output_path, "rb") as f: + assert f.read() == result.result_for_solid("load_notebook").output_value() diff --git a/python_modules/libraries/dagstermill/dagstermill_tests/test_logging.py b/python_modules/libraries/dagstermill/dagstermill_tests/test_logging.py index 9b5317d1202ed..b9ebb4a7bf969 100644 --- a/python_modules/libraries/dagstermill/dagstermill_tests/test_logging.py +++ b/python_modules/libraries/dagstermill/dagstermill_tests/test_logging.py @@ -6,7 +6,6 @@ ModeDefinition, String, execute_pipeline, - local_file_manager, logger, pipeline, reconstructable, @@ -15,6 +14,7 @@ from dagster.core.test_utils import instance_for_test from dagster.utils import safe_tempfile_path from dagstermill.examples.repository import hello_logging +from dagstermill.io_managers import local_output_notebook_io_manager class LogTestFileHandler(logging.Handler): @@ -50,7 +50,7 @@ def test_file_logger(init_context): "critical": test_file_logger, }, resource_defs={ - "file_manager": local_file_manager, + "output_notebook_io_manager": local_output_notebook_io_manager, }, ) ] diff --git a/python_modules/libraries/dagstermill/dagstermill_tests/test_manager.py b/python_modules/libraries/dagstermill/dagstermill_tests/test_manager.py index ad60636907294..c4c8a6599abbb 100644 --- a/python_modules/libraries/dagstermill/dagstermill_tests/test_manager.py +++ b/python_modules/libraries/dagstermill/dagstermill_tests/test_manager.py @@ -144,9 +144,8 @@ def test_out_of_pipeline_yield_event(): def test_in_pipeline_manager_resources(): with in_pipeline_manager() as manager: - assert "file_manager" in manager.context.resources._asdict() - assert "io_manager" in manager.context.resources._asdict() - assert len(manager.context.resources._asdict()) == 2 + assert "output_notebook_io_manager" in manager.context.resources._asdict() + assert len(manager.context.resources._asdict()) == 1 def test_in_pipeline_manager_solid_config():