Data aware scheduling and dynamic task mapping #44015
Unanswered
pdavidovicmsft
asked this question in
General
Replies: 1 comment 3 replies
-
I am not sure, but it seems you can't produce datasets with mapped tasks. As I understand that (@uranusjr @cmarteepants ?) the mapped tasks are literally a multiple instances of the same task, and they cannot individually produce dataset each. You could likely have one other task that depends on those mapped tasks (taking their outputs in) and produce multiple datasets as single set of outlets. But I am not 100% sure. If I am right, this might be a feature to add in Airflow 3 where we improve dataset support - but again - I am not a big expert on that, that's what I tink is happening here. |
Beta Was this translation helpful? Give feedback.
3 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hey folks,
I'd like to utilize data aware scheduling to trigger downstream DAG based on upstream DAG files updates.
It seems like a perfect feature for my use case, but I encountered some issues. Essentially I have dynamic task mapping and if I want to provide outlets as a parameter in set I am sending through .expand_kwargs , I am getting following error: unmappable or already specified argument: outlets.
Then, I tried to pass outlets in .partial method and it works perfectly when values are hardcoded, but if I try to create them dynamically it doesn't work again (my understanding is that arguments which we're sending in .partial method should not be computed in runtime). However, this works just for map_index_template (e.g. map_index_template="{{task.script_args['--provider_name']}}").
Does somebody know how this should be done or it's just a limitation of Airflow?
Additional info: I am using GlueJobOperator, and here is the code:
provider_ingest_jobs = GlueJobOperator.partial(
....
verbose=True,
update_config=True,
create_job_kwargs={
"GlueVersion": "4.0",
"NumberOfWorkers": 2,
"WorkerType": "G.1X",
"Timeout": 300,
},
map_index_template="{{task.script_args['--provider_name']}}",
outlets = datasets,
).expand_kwargs(provider_ingest_mappings)
provider_ingest_mappings is computed in separated task and it's essentially returning a list of tasks.
Beta Was this translation helpful? Give feedback.
All reactions