Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support multiple job targets for sensors #4745

Merged
merged 14 commits into from
Sep 16, 2021
Original file line number Diff line number Diff line change
@@ -1,36 +1,31 @@
import inspect
from functools import update_wrapper
from typing import TYPE_CHECKING, Callable, Generator, List, Optional, Union
from typing import TYPE_CHECKING, Callable, Generator, List, Optional, Sequence, Union

from dagster import check
from dagster.core.definitions.sensor import RunRequest, SensorDefinition, SkipReason
from dagster.core.errors import DagsterInvariantViolationError

from ....seven import funcsigs
from ...decorator_utils import get_function_params
from ...errors import DagsterInvariantViolationError
from ..events import AssetKey
from ..graph import GraphDefinition
from ..pipeline import PipelineDefinition
from ..job import JobDefinition
from ..sensor import AssetSensorDefinition, RunRequest, SensorDefinition, SkipReason

if TYPE_CHECKING:
from ..sensor import SensorEvaluationContext
from ...events.log import EventLogEntry


def is_context_provided(params: List[funcsigs.Parameter]) -> bool:
return len(params) == 1


def sensor(
pipeline_name: Optional[str] = None,
name: Optional[str] = None,
solid_selection: Optional[List[str]] = None,
mode: Optional[str] = None,
minimum_interval_seconds: Optional[int] = None,
description: Optional[str] = None,
job: Optional[Union[PipelineDefinition, GraphDefinition]] = None,
job: Optional[Union[GraphDefinition, JobDefinition]] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we asspire to remove job if we have jobs ? both isnt the worst i guess

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think it's just a convenience argument...

jobs: Optional[Sequence[Union[GraphDefinition, JobDefinition]]] = None,
) -> Callable[
[
Callable[
Expand Down Expand Up @@ -64,7 +59,8 @@ def sensor(
minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse
between sensor evaluations.
description (Optional[str]): A human-readable description of the sensor.
job (Optional[PipelineDefinition]): Experimental
job (Optional[Union[GraphDefinition, JobDefinition]]): Experimental
jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition]]]): Experimental
"""
check.opt_str_param(name, "name")

Expand All @@ -77,34 +73,16 @@ def inner(
check.callable_param(fn, "fn")
sensor_name = name or fn.__name__

def _wrapped_fn(context):
result = fn(context) if is_context_provided(get_function_params(fn)) else fn()

if inspect.isgenerator(result):
for item in result:
yield item
elif isinstance(result, (SkipReason, RunRequest)):
yield result

elif result is not None:
raise DagsterInvariantViolationError(
(
"Error in sensor {sensor_name}: Sensor unexpectedly returned output "
"{result} of type {type_}. Should only return SkipReason or "
"RunRequest objects."
).format(sensor_name=sensor_name, result=result, type_=type(result))
)

sensor_def = SensorDefinition(
name=sensor_name,
pipeline_name=pipeline_name,
evaluation_fn=_wrapped_fn,
evaluation_fn=fn,
solid_selection=solid_selection,
mode=mode,
minimum_interval_seconds=minimum_interval_seconds,
description=description,
job=job,
decorated_fn=fn,
jobs=jobs,
)

update_wrapper(sensor_def, wrapped=fn)
Expand All @@ -122,7 +100,7 @@ def asset_sensor(
mode: Optional[str] = None,
minimum_interval_seconds: Optional[int] = None,
description: Optional[str] = None,
job: Optional[Union[PipelineDefinition, GraphDefinition]] = None,
job: Optional[Union[GraphDefinition, JobDefinition]] = None,
) -> Callable[
[
Callable[
Expand Down
26 changes: 16 additions & 10 deletions python_modules/dagster/dagster/core/definitions/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
}

RepositoryLevelDefinition = TypeVar(
"RepositoryLevelDefinition", PipelineDefinition, PartitionSetDefinition, ScheduleDefinition
"RepositoryLevelDefinition",
PipelineDefinition,
PartitionSetDefinition,
ScheduleDefinition,
SensorDefinition,
)


Expand Down Expand Up @@ -523,9 +527,10 @@ def from_list(cls, repository_definitions):
)
jobs[definition.name] = definition
sensors[definition.name] = definition
if definition.has_loadable_target():
target = definition.load_target()
pipelines[target.name] = target
if definition.has_loadable_targets():
targets = definition.load_targets()
for target in targets:
pipelines[target.name] = target
elif isinstance(definition, ScheduleDefinition):
if definition.name in jobs:
raise DagsterInvalidDefinitionError(
Expand Down Expand Up @@ -750,15 +755,16 @@ def _validate_schedule(self, schedule):

def _validate_sensor(self, sensor):
pipelines = self.get_pipeline_names()
if sensor.pipeline_name is None:
if len(sensor.targets) == 0:
# skip validation when the sensor does not target a pipeline
return sensor

if sensor.pipeline_name not in pipelines:
raise DagsterInvalidDefinitionError(
f'SensorDefinition "{sensor.name}" targets pipeline "{sensor.pipeline_name}" '
"which was not found in this repository."
)
for target in sensor.targets:
if target.pipeline_name not in pipelines:
raise DagsterInvalidDefinitionError(
f'SensorDefinition "{sensor.name}" targets pipeline "{target.pipeline_name}" '
"which was not found in this repository."
)

return sensor

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __new__(cls, skip_message=None):


@whitelist_for_serdes
class RunRequest(namedtuple("_RunRequest", "run_key run_config tags")):
class RunRequest(namedtuple("_RunRequest", "run_key run_config tags job_name")):
"""
Represents all the information required to launch a single run. Must be returned by a
SensorDefinition or ScheduleDefinition's evaluation function for a run to be launched.
Expand All @@ -46,14 +46,17 @@ class RunRequest(namedtuple("_RunRequest", "run_key run_config tags")):
be launched, as a dict.
tags (Optional[Dict[str, str]]): A dictionary of tags (string key-value pairs) to attach
to the launched run.
job_name (Optional[str]): (Experimental) The name of the job this run request will launch.
Required for sensors that target multiple jobs.
"""

def __new__(cls, run_key, run_config=None, tags=None):
def __new__(cls, run_key, run_config=None, tags=None, job_name=None):
return super(RunRequest, cls).__new__(
cls,
run_key=check.opt_str_param(run_key, "run_key"),
run_config=check.opt_dict_param(run_config, "run_config"),
tags=check.opt_dict_param(tags, "tags"),
job_name=check.opt_str_param(job_name, "job_name"),
)


Expand Down
Loading