Skip to content

Commit

Permalink
support multiple job targets for sensors (#4745)
Browse files Browse the repository at this point in the history
* MultiJobSensorDefinition and ISensorDefinition

* format, single target properties

* mypy

* base sensor def

* get rid of base

* make external repr backcompat

* fix tests

* fix test error message

* List -> Sequence for jobs mypy type

* alex comments

* ensure backcompat deserialization from 0.12.10

* fix mypy error

* make sure pipeline name, mode are non-null in sensor targets

* populate legacy fields for external sensor data

Co-authored-by: prha <prha@elementl.com>
  • Loading branch information
yuhan and prha committed Sep 16, 2021
1 parent 7efd236 commit 941441b
Show file tree
Hide file tree
Showing 11 changed files with 518 additions and 172 deletions.
Original file line number Diff line number Diff line change
@@ -1,36 +1,31 @@
import inspect
from functools import update_wrapper
from typing import TYPE_CHECKING, Callable, Generator, List, Optional, Union
from typing import TYPE_CHECKING, Callable, Generator, List, Optional, Sequence, Union

from dagster import check
from dagster.core.definitions.sensor import RunRequest, SensorDefinition, SkipReason
from dagster.core.errors import DagsterInvariantViolationError

from ....seven import funcsigs
from ...decorator_utils import get_function_params
from ...errors import DagsterInvariantViolationError
from ..events import AssetKey
from ..graph import GraphDefinition
from ..pipeline import PipelineDefinition
from ..job import JobDefinition
from ..sensor import AssetSensorDefinition, RunRequest, SensorDefinition, SkipReason

if TYPE_CHECKING:
from ..sensor import SensorEvaluationContext
from ...events.log import EventLogEntry


def is_context_provided(params: List[funcsigs.Parameter]) -> bool:
return len(params) == 1


def sensor(
pipeline_name: Optional[str] = None,
name: Optional[str] = None,
solid_selection: Optional[List[str]] = None,
mode: Optional[str] = None,
minimum_interval_seconds: Optional[int] = None,
description: Optional[str] = None,
job: Optional[Union[PipelineDefinition, GraphDefinition]] = None,
job: Optional[Union[GraphDefinition, JobDefinition]] = None,
jobs: Optional[Sequence[Union[GraphDefinition, JobDefinition]]] = None,
) -> Callable[
[
Callable[
Expand Down Expand Up @@ -64,7 +59,8 @@ def sensor(
minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse
between sensor evaluations.
description (Optional[str]): A human-readable description of the sensor.
job (Optional[PipelineDefinition]): Experimental
job (Optional[Union[GraphDefinition, JobDefinition]]): Experimental
jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition]]]): Experimental
"""
check.opt_str_param(name, "name")

Expand All @@ -77,34 +73,16 @@ def inner(
check.callable_param(fn, "fn")
sensor_name = name or fn.__name__

def _wrapped_fn(context):
result = fn(context) if is_context_provided(get_function_params(fn)) else fn()

if inspect.isgenerator(result):
for item in result:
yield item
elif isinstance(result, (SkipReason, RunRequest)):
yield result

elif result is not None:
raise DagsterInvariantViolationError(
(
"Error in sensor {sensor_name}: Sensor unexpectedly returned output "
"{result} of type {type_}. Should only return SkipReason or "
"RunRequest objects."
).format(sensor_name=sensor_name, result=result, type_=type(result))
)

sensor_def = SensorDefinition(
name=sensor_name,
pipeline_name=pipeline_name,
evaluation_fn=_wrapped_fn,
evaluation_fn=fn,
solid_selection=solid_selection,
mode=mode,
minimum_interval_seconds=minimum_interval_seconds,
description=description,
job=job,
decorated_fn=fn,
jobs=jobs,
)

update_wrapper(sensor_def, wrapped=fn)
Expand All @@ -122,7 +100,7 @@ def asset_sensor(
mode: Optional[str] = None,
minimum_interval_seconds: Optional[int] = None,
description: Optional[str] = None,
job: Optional[Union[PipelineDefinition, GraphDefinition]] = None,
job: Optional[Union[GraphDefinition, JobDefinition]] = None,
) -> Callable[
[
Callable[
Expand Down
26 changes: 16 additions & 10 deletions python_modules/dagster/dagster/core/definitions/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
}

RepositoryLevelDefinition = TypeVar(
"RepositoryLevelDefinition", PipelineDefinition, PartitionSetDefinition, ScheduleDefinition
"RepositoryLevelDefinition",
PipelineDefinition,
PartitionSetDefinition,
ScheduleDefinition,
SensorDefinition,
)


Expand Down Expand Up @@ -523,9 +527,10 @@ def from_list(cls, repository_definitions):
)
jobs[definition.name] = definition
sensors[definition.name] = definition
if definition.has_loadable_target():
target = definition.load_target()
pipelines[target.name] = target
if definition.has_loadable_targets():
targets = definition.load_targets()
for target in targets:
pipelines[target.name] = target
elif isinstance(definition, ScheduleDefinition):
if definition.name in jobs:
raise DagsterInvalidDefinitionError(
Expand Down Expand Up @@ -750,15 +755,16 @@ def _validate_schedule(self, schedule):

def _validate_sensor(self, sensor):
pipelines = self.get_pipeline_names()
if sensor.pipeline_name is None:
if len(sensor.targets) == 0:
# skip validation when the sensor does not target a pipeline
return sensor

if sensor.pipeline_name not in pipelines:
raise DagsterInvalidDefinitionError(
f'SensorDefinition "{sensor.name}" targets pipeline "{sensor.pipeline_name}" '
"which was not found in this repository."
)
for target in sensor.targets:
if target.pipeline_name not in pipelines:
raise DagsterInvalidDefinitionError(
f'SensorDefinition "{sensor.name}" targets pipeline "{target.pipeline_name}" '
"which was not found in this repository."
)

return sensor

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __new__(cls, skip_message=None):


@whitelist_for_serdes
class RunRequest(namedtuple("_RunRequest", "run_key run_config tags")):
class RunRequest(namedtuple("_RunRequest", "run_key run_config tags job_name")):
"""
Represents all the information required to launch a single run. Must be returned by a
SensorDefinition or ScheduleDefinition's evaluation function for a run to be launched.
Expand All @@ -46,14 +46,17 @@ class RunRequest(namedtuple("_RunRequest", "run_key run_config tags")):
be launched, as a dict.
tags (Optional[Dict[str, str]]): A dictionary of tags (string key-value pairs) to attach
to the launched run.
job_name (Optional[str]): (Experimental) The name of the job this run request will launch.
Required for sensors that target multiple jobs.
"""

def __new__(cls, run_key, run_config=None, tags=None):
def __new__(cls, run_key, run_config=None, tags=None, job_name=None):
return super(RunRequest, cls).__new__(
cls,
run_key=check.opt_str_param(run_key, "run_key"),
run_config=check.opt_dict_param(run_config, "run_config"),
tags=check.opt_dict_param(tags, "tags"),
job_name=check.opt_str_param(job_name, "job_name"),
)


Expand Down
Loading

0 comments on commit 941441b

Please sign in to comment.