Skip to content

Commit

Permalink
RFC: dagstermill io use io manager for output notebook (#4490)
Browse files Browse the repository at this point in the history
depends on #4500

prior context: https://dagster.phacility.com/D6145

**User-facing changes:**

(1) Deprecating `output_notebook` in favor of `output_notebook_name`. 

- Using the old property `output_notebook` would require "file_manager" and result in a FileHandle output - no breaking change to existing users.
- Using the new property `output_notebook_name` would result in a bytes output and require "output_notebook_io_manager", see details in (2)

(2) With the new param, it requires a dedicated IO manager for output notebook. When `output_notebook` or `output_notebook_name` is specified, "output_notebook_io_manager" is required as a resource.

- we provide a built-in `local_output_notebook_io_manager` for handling local output notebook materialization.
- **New capabilities:** Users now can customize their own "output_notebook_io_manager" by extending `OutputNotebookIOManager`. This enables use cases:
    - users who want better asset key control can override `get_output_asset_key`.
    - users who want precise IO control over the output notebook can customize their own `handle_output` and `load_input`, e.g. they can control the file name of the output notebook.
    - users who want to attach more meaningful metadata can yield EventMetadataEntry in their own `handle_output` method

this is also fixing #3380
  • Loading branch information
yuhan authored Sep 16, 2021
1 parent ecec5c4 commit b678a82
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 140 deletions.
173 changes: 106 additions & 67 deletions python_modules/libraries/dagstermill/dagstermill/examples/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from dagster.core.types.dagster_type import DagsterType
from dagster.core.types.marshal import SerializationStrategy
from dagster.utils import PICKLE_PROTOCOL, file_relative_path
from dagstermill.io_managers import local_output_notebook_io_manager

try:
from dagster_pandas import DataFrame
Expand Down Expand Up @@ -66,7 +67,7 @@ def test_nb_solid(name, **kwargs):
return dagstermill.define_dagstermill_solid(
name=name,
notebook_path=nb_test_path(name),
output_notebook="notebook",
output_notebook_name="notebook",
output_defs=output_defs,
**kwargs,
)
Expand All @@ -75,7 +76,7 @@ def test_nb_solid(name, **kwargs):
default_mode_defs = [
ModeDefinition(
resource_defs={
"file_manager": local_file_manager,
"output_notebook_io_manager": local_output_notebook_io_manager,
"io_manager": fs_io_manager,
}
)
Expand All @@ -93,7 +94,7 @@ def hello_world_pipeline():
hello_world_with_custom_tags_and_description = dagstermill.define_dagstermill_solid(
name="hello_world_custom",
notebook_path=nb_test_path("hello_world"),
output_notebook="notebook",
output_notebook_name="notebook",
tags={"foo": "bar"},
description="custom description",
)
Expand All @@ -113,7 +114,7 @@ def hello_world_with_custom_tags_and_description_pipeline():
goodbye_config = dagstermill.define_dagstermill_solid(
name="goodbye_config",
notebook_path=nb_test_path("print_dagstermill_context_solid_config"),
output_notebook="notebook",
output_notebook_name="notebook",
config_schema={"farewell": Field(String, is_required=False, default_value="goodbye")},
)

Expand All @@ -130,9 +131,9 @@ def alias_config_pipeline():
goodbye_config.alias("aliased_goodbye")()


@solid(input_defs=[InputDefinition("notebook", dagster_type=FileHandle)])
@solid(input_defs=[InputDefinition("notebook")])
def load_notebook(notebook):
return os.path.exists(notebook.path_desc)
return notebook


@pipeline(mode_defs=default_mode_defs)
Expand All @@ -155,7 +156,6 @@ def hello_world_no_output_notebook_no_file_manager_pipeline():
hello_world_no_output_notebook = dagstermill.define_dagstermill_solid(
name="hello_world_no_output_notebook",
notebook_path=nb_test_path("hello_world"),
required_resource_keys={"file_manager"},
)


Expand Down Expand Up @@ -355,14 +355,14 @@ def filepicklelist_resource(init_context):
resource_defs={
"list": ResourceDefinition(lambda _: []),
"io_manager": fs_io_manager,
"file_manager": local_file_manager,
"output_notebook_io_manager": local_output_notebook_io_manager,
},
),
ModeDefinition(
name="prod",
resource_defs={
"list": filepicklelist_resource,
"file_manager": local_file_manager,
"output_notebook_io_manager": local_output_notebook_io_manager,
"io_manager": fs_io_manager,
},
),
Expand All @@ -377,7 +377,7 @@ def resource_pipeline():
ModeDefinition(
resource_defs={
"list": filepicklelist_resource,
"file_manager": local_file_manager,
"output_notebook_io_manager": local_output_notebook_io_manager,
"io_manager": fs_io_manager,
}
)
Expand Down Expand Up @@ -438,6 +438,99 @@ def failure_pipeline():
test_nb_solid("yield_failure")()


yield_something = test_nb_solid(
"yield_something",
input_defs=[InputDefinition("obj", str)],
output_defs=[OutputDefinition(str, "result")],
)


@solid
def fan_in(a, b):
return f"{a} {b}"


@pipeline(
mode_defs=[
ModeDefinition(
resource_defs={
"io_manager": fs_io_manager,
"output_notebook_io_manager": local_output_notebook_io_manager,
}
)
]
)
def fan_in_notebook_pipeline():
a, _ = yield_something.alias("solid_1")()
b, _ = yield_something.alias("solid_2")()
fan_in(a, b)


@pipeline(
mode_defs=[
ModeDefinition(
resource_defs={
"output_notebook_io_manager": local_output_notebook_io_manager,
}
)
]
)
def fan_in_notebook_pipeline_in_mem():
a, _ = yield_something.alias("solid_1")()
b, _ = yield_something.alias("solid_2")()
fan_in(a, b)


@composite_solid
def outer():
yield_something()


@pipeline(
mode_defs=[
ModeDefinition(
resource_defs={
"io_manager": fs_io_manager,
"output_notebook_io_manager": local_output_notebook_io_manager,
}
)
]
)
def composite_pipeline():
outer()


###################################################################################################
# Back compat
###################################################################################################

hello_world_legacy = dagstermill.define_dagstermill_solid(
name="hello_world_legacy",
notebook_path=nb_test_path("hello_world"),
output_notebook="notebook",
)


@solid(input_defs=[InputDefinition("notebook", dagster_type=FileHandle)])
def load_notebook_legacy(notebook):
return os.path.exists(notebook.path_desc)


@pipeline(
mode_defs=[
ModeDefinition(
resource_defs={
"io_manager": fs_io_manager,
"file_manager": local_file_manager,
}
)
]
)
def hello_world_with_output_notebook_pipeline_legacy():
notebook = hello_world_legacy()
load_notebook_legacy(notebook)


class ASerializationStrategy(SerializationStrategy):
def __init__(self, name="foo"):
super(ASerializationStrategy, self).__init__(name)
Expand Down Expand Up @@ -509,62 +602,6 @@ def composite_pipeline_legacy():
outer_legacy()


yield_something = test_nb_solid(
"yield_something",
input_defs=[InputDefinition("obj", str)],
output_defs=[OutputDefinition(str, "result")],
)


@solid
def fan_in(a, b):
return f"{a} {b}"


@pipeline(
mode_defs=[
ModeDefinition(
resource_defs={"io_manager": fs_io_manager, "file_manager": local_file_manager}
)
]
)
def fan_in_notebook_pipeline():
a, _ = yield_something.alias("solid_1")()
b, _ = yield_something.alias("solid_2")()
fan_in(a, b)


@pipeline(
mode_defs=[
ModeDefinition(
resource_defs={
"file_manager": local_file_manager,
}
)
]
)
def fan_in_notebook_pipeline_in_mem():
a, _ = yield_something.alias("solid_1")()
b, _ = yield_something.alias("solid_2")()
fan_in(a, b)


@composite_solid
def outer():
yield_something()


@pipeline(
mode_defs=[
ModeDefinition(
resource_defs={"io_manager": fs_io_manager, "file_manager": local_file_manager}
)
]
)
def composite_pipeline():
outer()


@repository
def notebook_repo():
pipelines = [
Expand All @@ -575,6 +612,7 @@ def notebook_repo():
hello_world_config_pipeline,
hello_world_explicit_yield_pipeline,
hello_world_output_pipeline,
hello_world_with_output_notebook_pipeline,
hello_logging_pipeline,
resource_pipeline,
resource_with_exception_pipeline,
Expand All @@ -585,10 +623,11 @@ def notebook_repo():
yield_obj_pipeline,
retries_pipeline,
failure_pipeline,
fan_in_notebook_pipeline_legacy,
fan_in_notebook_pipeline_in_mem,
fan_in_notebook_pipeline,
hello_world_no_output_notebook_no_file_manager_pipeline,
hello_world_with_output_notebook_pipeline_legacy,
fan_in_notebook_pipeline_legacy,
]
if DAGSTER_PANDAS_PRESENT and SKLEARN_PRESENT and MATPLOTLIB_PRESENT:
pipelines += [tutorial_pipeline]
Expand Down
74 changes: 74 additions & 0 deletions python_modules/libraries/dagstermill/dagstermill/io_managers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import os
from pathlib import Path
from typing import Any, List, Optional

from dagster import check
from dagster.config.field import Field
from dagster.core.definitions.event_metadata import EventMetadataEntry
from dagster.core.definitions.events import AssetKey
from dagster.core.execution.context.input import InputContext
from dagster.core.execution.context.output import OutputContext
from dagster.core.storage.io_manager import IOManager, io_manager
from dagster.utils import mkdir_p


class OutputNotebookIOManager(IOManager):
def __init__(self, asset_key_prefix: Optional[List[str]] = None):
self.asset_key_prefix = asset_key_prefix if asset_key_prefix else []

def get_output_asset_key(self, context: OutputContext):
return AssetKey([*self.asset_key_prefix, f"{context.step_key}_output_notebook"])

def handle_output(self, context: OutputContext, obj: bytes):
raise NotImplementedError

def load_input(self, context: InputContext) -> Any:
raise NotImplementedError


class LocalOutputNotebookIOManager(OutputNotebookIOManager):
"""Built-in IO Manager for handling output notebook."""

def __init__(self, base_dir: str, asset_key_prefix: Optional[List[str]] = None):
super(LocalOutputNotebookIOManager, self).__init__(asset_key_prefix=asset_key_prefix)
self.base_dir = base_dir
self.write_mode = "wb"
self.read_mode = "rb"

def _get_path(self, context: OutputContext) -> str:
"""Automatically construct filepath."""
keys = context.get_run_scoped_output_identifier()
return str(Path(self.base_dir, *keys).with_suffix(".ipynb"))

def handle_output(self, context: OutputContext, obj: bytes):
"""obj: bytes"""
check.inst_param(context, "context", OutputContext)

# the output notebook itself is stored at output_file_path
output_notebook_path = self._get_path(context)
mkdir_p(os.path.dirname(output_notebook_path))
with open(output_notebook_path, self.write_mode) as dest_file_obj:
dest_file_obj.write(obj)
yield EventMetadataEntry.fspath(path=output_notebook_path, label="path")

def load_input(self, context) -> bytes:
check.inst_param(context, "context", InputContext)
# pass output notebook to downstream solids as File Object
with open(self._get_path(context.upstream_output), self.read_mode) as file_obj:
return file_obj.read()


@io_manager(
config_schema={
"asset_key_prefix": Field(str, is_required=False),
"base_dir": Field(str, is_required=False),
},
)
def local_output_notebook_io_manager(init_context):
"""Built-in IO Manager that handles output notebooks."""
return LocalOutputNotebookIOManager(
base_dir=init_context.resource_config.get(
"base_dir", init_context.instance.storage_directory()
),
asset_key_prefix=init_context.resource_config.get("asset_key_prefix", []),
)
Loading

1 comment on commit b678a82

@vercel
Copy link

@vercel vercel bot commented on b678a82 Sep 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.