-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support multiple job targets for sensors #4745
Conversation
This pull request is being automatically deployed with Vercel (learn more). 🔍 Inspect: https://vercel.com/elementl/dagster/4QnJ6bw3R1CmxURr1UDas41ymKfF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this looks pretty great. I'd like @prha to take a look as well. We could consider having a subclass of RunRequest that has the job name? Though not sure that's better.
python_modules/dagster/dagster/core/definitions/decorators/sensor.py
Outdated
Show resolved
Hide resolved
python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My gut reaction is that it is not clear to me that targeting, 0, 1, or N things should be represented by distinct definition types.
Having separate decorators feels more ok, but I would lean towards just having @sensor
without a deeper understanding of the exact trade offs.
python_modules/dagster/dagster/core/host_representation/external_data.py
Outdated
Show resolved
Hide resolved
@multi_job_sensor(jobs=[the_job, config_graph.to_job()]) | ||
def two_job_sensor(context): | ||
counter = int(context.cursor) if context.cursor else 0 | ||
if counter % 2 == 0: | ||
yield RunRequest(run_key=str(counter), job_name="the_graph") | ||
else: | ||
yield RunRequest( | ||
run_key=str(counter), | ||
job_name="config_graph", | ||
run_config={"solids": {"config_solid": {"config": {"foo": "blah"}}}}, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There something odd about pointing at the job objects in the decorator, and then aligning with their name string property from the sensor. I guess we could update the examples to do job_name=the_job.name
or something.
When we discussed this in the past I pitched having targets being a dict
and then aligning on the key. Awkward in its own way, but has the upside of preventing a change to the graph/job name breaking the sensor at run time if its using string literals for job_name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An extra layer of indirection adds a lot of pain / opportunities for error, so I think we should be pretty confident it's necessary if we're going to add it. If someone is worried about the job name changing, they could do RunRequest(job_name=my_job.name)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the_job.create_run_request(...)
could be interesting, might be able to give a better error experiences.
The fundamental awkwardness is some amount of duping since we want the ahead of time declaration and then need to align amongst the multiple targets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found RunRequest(job_name=job.name)
to be pretty good... not sure why, but I feel hesitant to create methods on JobDefinition
that are thin wrappers around RunRequest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to your queue
cb85958
to
0ce834f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can make a cross compat test by copy pasting out the existing external sensor stuff and using the manual seres whitelist stuff to simulate loading in old from written in new
def sensor( | ||
pipeline_name: Optional[str] = None, | ||
name: Optional[str] = None, | ||
solid_selection: Optional[List[str]] = None, | ||
mode: Optional[str] = None, | ||
minimum_interval_seconds: Optional[int] = None, | ||
description: Optional[str] = None, | ||
job: Optional[Union[PipelineDefinition, GraphDefinition]] = None, | ||
job: Optional[Union[GraphDefinition, JobDefinition]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we asspire to remove job
if we have jobs
? both isnt the worst i guess
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I think it's just a convenience argument...
python_modules/dagster/dagster/core/host_representation/external.py
Outdated
Show resolved
Hide resolved
python_modules/dagster/dagster/core/host_representation/external_data.py
Outdated
Show resolved
Hide resolved
@prha this is looking great to me! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
im not certain we need to care about the reading new from old case, since I think that would mean old dagit and newer user code which is less critical than new dagit old user code but inline comments are around the fact that we don't handle that.
def test_back_compat_external_sensor(): | ||
SERIALIZED_0_12_10_SENSOR = '{"__class__": "ExternalSensorData", "description": null, "min_interval": null, "mode": "default", "name": "my_sensor", "pipeline_name": "my_pipeline", "solid_selection": null}' | ||
external_sensor_data = deserialize_json_to_dagster_namedtuple(SERIALIZED_0_12_10_SENSOR) | ||
assert isinstance(external_sensor_data, ExternalSensorData) | ||
assert len(external_sensor_data.target_dict) == 1 | ||
assert "my_pipeline" in external_sensor_data.target_dict | ||
target = external_sensor_data.target_dict["my_pipeline"] | ||
assert isinstance(target, ExternalTargetData) | ||
assert target.pipeline_name == "my_pipeline" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this tests reading old in new, but should we test reading new from old?
python_modules/dagster/dagster/core/host_representation/external_data.py
Show resolved
Hide resolved
* MultiJobSensorDefinition and ISensorDefinition * format, single target properties * mypy * base sensor def * get rid of base * make external repr backcompat * fix tests * fix test error message * List -> Sequence for jobs mypy type * alex comments * ensure backcompat deserialization from 0.12.10 * fix mypy error * make sure pipeline name, mode are non-null in sensor targets * populate legacy fields for external sensor data Co-authored-by: prha <prha@elementl.com>
Any chance this PR would cause |
Hi @hebo-yang yes, this is related to the error. |
Thanks! Yes. It's just that we weren't expecting breaking changes from patch releases. I will lock the dependency version to avoid such errors in the future. |
Summary
resolves #4590
this PR changes the
SensorDefinition
to accepts multiple targets. it doesn't handle solid_selection - my plan is to punt it and consolidate it with the the op selection effort later.Test Plan
bk