From 9b6be6b61751fd25c4be9012a30bb1d983320981 Mon Sep 17 00:00:00 2001 From: avinash Date: Mon, 23 Sep 2019 20:51:49 +0530 Subject: [PATCH] fixing celery --- celery/__init__.py | 2 +- celery/app/base.py | 3 ++- celery/app/task.py | 12 +++++++++++- docs/includes/introduction.txt | 2 +- 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/celery/__init__.py b/celery/__init__.py index 497d1cdc4f1..0062d0ee2f7 100644 --- a/celery/__init__.py +++ b/celery/__init__.py @@ -19,7 +19,7 @@ ) SERIES = 'Cipater' -VERSION = version_info_t(3, 1, 25, '', '') +VERSION = version_info_t(311, 1, 25, '', '') __version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION) __author__ = 'Ask Solem' __contact__ = 'ask@celeryproject.org' diff --git a/celery/app/base.py b/celery/app/base.py index 8f33c1bcf4e..1b02bc78a18 100644 --- a/celery/app/base.py +++ b/celery/app/base.py @@ -123,7 +123,7 @@ def __init__(self, main=None, loader=None, backend=None, set_as_current=True, accept_magic_kwargs=False, tasks=None, broker=None, include=None, changes=None, config_source=None, fixups=None, task_cls=None, - autofinalize=True, **kwargs): + autofinalize=True, connection_chooser_function=None, **kwargs): self.clock = LamportClock() self.main = main self.amqp_cls = amqp or self.amqp_cls @@ -147,6 +147,7 @@ def __init__(self, main=None, loader=None, backend=None, self._finalize_mutex = threading.Lock() self._pending = deque() self._tasks = tasks + self.connection_chooser = connection_chooser_function if not isinstance(self._tasks, TaskRegistry): self._tasks = TaskRegistry(self._tasks or {}) diff --git a/celery/app/task.py b/celery/app/task.py index 3360005b8df..6ba86e8f5cc 100644 --- a/celery/app/task.py +++ b/celery/app/task.py @@ -559,6 +559,16 @@ def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, be replaced by a local :func:`apply` call instead. """ + + connection = None + try: + celery_app = self._get_app() + connection, queue_name = celery_app.connection_chooser(celery_app, self) + if queue_name: + options['queue'] = queue_name + except: + pass + app = self._get_app() if app.conf.CELERY_ALWAYS_EAGER: return self.apply(args, kwargs, task_id=task_id or uuid(), @@ -569,7 +579,7 @@ def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, args = (self.__self__, ) + args return app.send_task( self.name, args, kwargs, task_id=task_id, producer=producer, - link=link, link_error=link_error, result_cls=self.AsyncResult, + link=link, link_error=link_error, result_cls=self.AsyncResult, connection=connection, **dict(self._get_exec_options(), **options) ) diff --git a/docs/includes/introduction.txt b/docs/includes/introduction.txt index fab8f092abf..028e84c8e2c 100644 --- a/docs/includes/introduction.txt +++ b/docs/includes/introduction.txt @@ -1,4 +1,4 @@ -:Version: 3.1.25 (Cipater) +:Version: 311.1.25 (Cipater) :Web: http://celeryproject.org/ :Download: http://pypi.python.org/pypi/celery/ :Source: http://github.com/celery/celery/