Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TWTR][CX-17516] Requeue tasks in the queued state #27

Merged
merged 1 commit into from
Dec 18, 2019

Conversation

aoen
Copy link

@aoen aoen commented Dec 16, 2019

Requeue tasks in the queued state to prevent tasks from getting stuck in the queued state on Celery + MySQL Broker.

Currently testing this change on my devel instance that has a DAG with a large number of queued tasks constantly, and I'm making sure that all of the instances eventually get run and that the number of queued tasks does not grow over time and that that all of the worker slots are always running tasks (i.e. there are always 16 tasks in the running state on the workers). Also testing on our integration cluster which has been alerting due to the canary cluster (it should stop paging us if this fix works).

else:
self.log.info("could not queue task {}".format(key))
self.log.info("Adding to queue even though already queued or running {}".format(command, key))
self.queued_tasks[key] = (command, priority, queue, task_instance)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this make any effect? if queued_task has the key, you are just effectively refreshing the value, not sure about the motivation for this change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I did this is in case the task was in the running state. It doesn't hurt even for the queued state becase task instance/etc might have changed, and I wanted to minimize the size of the fork (i.e. could have broken out the if state == running then queue the task into a separate elif).

@aoen aoen merged commit 87fcc1c into twitter-forks:1.10+twtr Dec 18, 2019
@aoen aoen deleted the ddavydov/requeue_queued_tasks branch December 18, 2019 14:15
vshshjn7 pushed a commit to vshshjn7/airflow that referenced this pull request Mar 11, 2020
msumit pushed a commit that referenced this pull request Mar 14, 2020
* 99ee040: CP from 1.10+twtr

* 2e01c24: CP from 1.10.4 ([TWTR][AIRFLOW-4939] Fixup use of fallback kwarg in conf.getint)

* 00cb4ae: [TWTR][AIRFLOW-XXXX] Cherry-pick d4a83bc and bump version (#21)

* CP 51b1aee: Relax version requiremets (#24)

* CP 67a4d1c: [CX-16266] Change with reference to 1a4c164 commit in open source (#25)

* CP 54bd095: [TWTR][CX-17516] Queue tasks already being handled by the executor (#26)

* CP 87fcc1c: [TWTR][CX-17516] Requeue tasks in the queued state (#27)

* CP 98a1ca9: [AIRFLOW-6625] Explicitly log using utf-8 encoding (apache#7247) (#31)

* fixing models.py and jobs.py file fix after CP

* fixing typo and version bump

Co-authored-by: Vishesh Jain <visheshj@twitter.com>
ayushSethi22 pushed a commit to ayushSethi22/airflow that referenced this pull request Dec 21, 2020
CP contains [TWTR] CP from 1.10+twtr (twitter-forks#35)

* 99ee040: CP from 1.10+twtr

* 2e01c24: CP from 1.10.4 ([TWTR][AIRFLOW-4939] Fixup use of fallback kwarg in conf.getint)

* 00cb4ae: [TWTR][AIRFLOW-XXXX] Cherry-pick d4a83bc and bump version (twitter-forks#21)

* CP 51b1aee: Relax version requiremets (twitter-forks#24)

* CP 67a4d1c: [CX-16266] Change with reference to 1a4c164 commit in open source (twitter-forks#25)

* CP 54bd095: [TWTR][CX-17516] Queue tasks already being handled by the executor (twitter-forks#26)

* CP 87fcc1c: [TWTR][CX-17516] Requeue tasks in the queued state (twitter-forks#27)

* CP 98a1ca9: [AIRFLOW-6625] Explicitly log using utf-8 encoding (apache#7247) (twitter-forks#31)

* fixing models.py and jobs.py file fix after CP

* fixing typo and version bump

Co-authored-by: Vishesh Jain <visheshj@twitter.com>
msumit pushed a commit that referenced this pull request Jan 8, 2021
* EWT-569 : Initial Commit for migrations

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  76fe7ac from 1.10.4

* CP Contains fb64f2e: [TWTR][AIRFLOW-XXX] Twitter Airflow Customizations + Fixup job scheduling without explicit_defaults_for_timestamp

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00
[CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (#63)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00
CP contains [EWT-16]: Airflow fix for manual trigger during version upgrade (#13)

* [EWT-16]: Airflow fix for manual trigger during version upgrade

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00
[CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (#63)

CP of f757a54

* CP(55bb579) [AIRFLOW-5597] Linkify urls in task instance log (#16)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 94cdcf6
[CP] Contains [AIRFLOW-5597] Linkify urls in task instance log

CP of f757a54

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  4ce8d4c from 1.10.4
CP contains [TWTTR] Fix for rendering code on UI (#34)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  299b4d8 from 1.10.4
CP contains [TWTR] CP from 1.10+twtr (#35)

* 99ee040: CP from 1.10+twtr

* 2e01c24: CP from 1.10.4 ([TWTR][AIRFLOW-4939] Fixup use of fallback kwarg in conf.getint)

* 00cb4ae: [TWTR][AIRFLOW-XXXX] Cherry-pick d4a83bc and bump version (#21)

* CP 51b1aee: Relax version requiremets (#24)

* CP 67a4d1c: [CX-16266] Change with reference to 1a4c164 commit in open source (#25)

* CP 54bd095: [TWTR][CX-17516] Queue tasks already being handled by the executor (#26)

* CP 87fcc1c: [TWTR][CX-17516] Requeue tasks in the queued state (#27)

* CP 98a1ca9: [AIRFLOW-6625] Explicitly log using utf-8 encoding (apache#7247) (#31)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : f7050fb
CP Contains Experiment API path fix (#37)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  8a689af from 1.10.4
CP Contains Export scheduler env variable into worker pods. (#38)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  5875a15 from 1.10.4
Cp Contains [EWT-115][EWT-118] Initialise dag var to None and fix for DagModel.fileloc (missed in EWT-16) (#39)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  a68e2b3 from 1.10.4
[CX-16591] Fix regex to work with impersonated clusters like airflow_scheduler_ddavydov (#42)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : e9642c2
[CP][EWT-128] Fetch task logs from worker pods (19ac45a) (#43)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : d5d0a07
[CP][AIRFLOW-6561][EWT-290]: Adding priority class and default resource for worker pod. (#47)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 9b58c88
[CP][EWT-302]Patch Pool.DEFAULT_POOL_NAME in BaseOperator (apache#8587) (#49)

Open source commit id: b37ce29

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 7b52a71
[CP][AIRFLOW-3121] Define closed property on StreamLogWriter (apache#3955) (#52)

CP of 2d5b8a5

* [EWT-361] Fix broken regex pattern for extracting dataflow job id (#51)

Update the dataflow URL regex as per AIRFLOW-9323

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 4b5b977
EWT-370: Use python3 to launch the dataflow job. (#53)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 596e24f
* [EWT-450] fixing sla miss triggering duplicate alerts every minute (#56)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : b3d7fb4
[CP] Handle IntegrityErrors for trigger dagruns & add Stacktrace when DagFileProcessorManager gets killed (#57)

CP of faaf179 - from master
CP of 2102122 - from 1.10.12

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : bac4acd
[TWTR][EWT-472] Add lifecycle support while launching worker pods (#59)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 6162402
[TWTTR] Don't enqueue tasks again if already queued for K8sExecutor(#60)

Basically reverting commit 87fcc1c  and making changes specifically into the Celery Executor class only.

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 1991419
[CP][TWTR][EWT-377] Fix DagBag bug when a Dag has invalid schedule_interval (#61)

CP of 5605d10 & apache#11462

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 48be0f9
[TWTR][EWT-350] Reverting the last commit partially (#62)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : d8c473e
[CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (#63)

CP of f757a54
Sampreeth24 pushed a commit to Sampreeth24/airflow that referenced this pull request Apr 29, 2022
* 99ee040: CP from 1.10+twtr

* 2e01c24: CP from 1.10.4 ([TWTR][AIRFLOW-4939] Fixup use of fallback kwarg in conf.getint)

* 00cb4ae: [TWTR][AIRFLOW-XXXX] Cherry-pick d4a83bc and bump version (twitter-forks#21)

* CP 51b1aee: Relax version requiremets (twitter-forks#24)

* CP 67a4d1c: [CX-16266] Change with reference to 1a4c164 commit in open source (twitter-forks#25)

* CP 54bd095: [TWTR][CX-17516] Queue tasks already being handled by the executor (twitter-forks#26)

* CP 87fcc1c: [TWTR][CX-17516] Requeue tasks in the queued state (twitter-forks#27)

* CP 98a1ca9: [AIRFLOW-6625] Explicitly log using utf-8 encoding (apache#7247) (twitter-forks#31)

* fixing models.py and jobs.py file fix after CP

* fixing typo and version bump

Co-authored-by: Vishesh Jain <visheshj@twitter.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants