Skip to content

Commit

Permalink
fixing celery
Browse files Browse the repository at this point in the history
  • Loading branch information
avinashdunzo committed Sep 23, 2019
1 parent d08b105 commit 9b6be6b
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 4 deletions.
2 changes: 1 addition & 1 deletion celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)

SERIES = 'Cipater'
VERSION = version_info_t(3, 1, 25, '', '')
VERSION = version_info_t(311, 1, 25, '', '')
__version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
__author__ = 'Ask Solem'
__contact__ = 'ask@celeryproject.org'
Expand Down
3 changes: 2 additions & 1 deletion celery/app/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def __init__(self, main=None, loader=None, backend=None,
set_as_current=True, accept_magic_kwargs=False,
tasks=None, broker=None, include=None, changes=None,
config_source=None, fixups=None, task_cls=None,
autofinalize=True, **kwargs):
autofinalize=True, connection_chooser_function=None, **kwargs):
self.clock = LamportClock()
self.main = main
self.amqp_cls = amqp or self.amqp_cls
Expand All @@ -147,6 +147,7 @@ def __init__(self, main=None, loader=None, backend=None,
self._finalize_mutex = threading.Lock()
self._pending = deque()
self._tasks = tasks
self.connection_chooser = connection_chooser_function
if not isinstance(self._tasks, TaskRegistry):
self._tasks = TaskRegistry(self._tasks or {})

Expand Down
12 changes: 11 additions & 1 deletion celery/app/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,16 @@ def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
be replaced by a local :func:`apply` call instead.
"""

connection = None
try:
celery_app = self._get_app()
connection, queue_name = celery_app.connection_chooser(celery_app, self)
if queue_name:
options['queue'] = queue_name
except:
pass

app = self._get_app()
if app.conf.CELERY_ALWAYS_EAGER:
return self.apply(args, kwargs, task_id=task_id or uuid(),
Expand All @@ -569,7 +579,7 @@ def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
args = (self.__self__, ) + args
return app.send_task(
self.name, args, kwargs, task_id=task_id, producer=producer,
link=link, link_error=link_error, result_cls=self.AsyncResult,
link=link, link_error=link_error, result_cls=self.AsyncResult, connection=connection,
**dict(self._get_exec_options(), **options)
)

Expand Down
2 changes: 1 addition & 1 deletion docs/includes/introduction.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
:Version: 3.1.25 (Cipater)
:Version: 311.1.25 (Cipater)
:Web: http://celeryproject.org/
:Download: http://pypi.python.org/pypi/celery/
:Source: http://github.com/celery/celery/
Expand Down

0 comments on commit 9b6be6b

Please sign in to comment.