diff --git a/.coveragerc b/.coveragerc index 3d742be97..cad0e5dba 100644 --- a/.coveragerc +++ b/.coveragerc @@ -2,7 +2,9 @@ source = basket omit = manage.py + basket/settings.py basket/wsgi.py + basket/base/tests/* basket/news/migrations/* basket/news/tests/* diff --git a/.dockerignore b/.dockerignore index 2fdd8b29b..c18e6c67e 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,4 +1,3 @@ .git .env -celerybeat-schedule *.db diff --git a/.gitignore b/.gitignore index d499ba133..e7a913a8f 100644 --- a/.gitignore +++ b/.gitignore @@ -16,7 +16,6 @@ /static Dockerfile.jenkins build.py -celerybeat-schedule docs/_build docs/_gh-pages pip-log.txt diff --git a/basket/base/decorators.py b/basket/base/decorators.py new file mode 100644 index 000000000..b4a059e02 --- /dev/null +++ b/basket/base/decorators.py @@ -0,0 +1,56 @@ +import functools + +from django.conf import settings + +from django_statsd.clients import statsd +from rq.decorators import job as rq_job + +from basket.base.rq import enqueue_kwargs, get_queue + + +def rq_task(func): + """ + Decorator to standardize RQ tasks. + + Uses RQ's job decorator, but adds: + - uses our default queue and connection + - adds retry logic with exponential backoff + - adds success/failure/retry callbacks + - adds statsd metrics for job success/failure/retry + - adds Sentry error reporting for failed jobs + - TODO: add task timeouts + + """ + task_name = f"{func.__module__}.{func.__qualname__}" + + queue = get_queue() + connection = queue.connection + + kwargs = enqueue_kwargs(func) + + @rq_job( + queue, + connection=connection, + **kwargs, + ) + @functools.wraps(func) + def wrapped(*args, **kwargs): + # If in maintenance mode, queue the task for later. + if settings.MAINTENANCE_MODE: + if settings.READ_ONLY_MODE: + statsd.incr(f"{task_name}.not_queued") + else: + from basket.news.models import QueuedTask + + QueuedTask.objects.create( + name=task_name, + args=args, + kwargs=kwargs, + ) + statsd.incr(f"{task_name}.queued") + return + + # NOTE: Exceptions are handled with the RQ_EXCEPTION_HANDLERS + return func(*args, **kwargs) + + return wrapped diff --git a/basket/base/exceptions.py b/basket/base/exceptions.py new file mode 100644 index 000000000..401018076 --- /dev/null +++ b/basket/base/exceptions.py @@ -0,0 +1,13 @@ +class BasketError(Exception): + """ + Tasks can raise this when an error happens that we should not retry. + + E.g. if the error indicates we're passing bad parameters, as opposed to an error where we'd typically raise `NewsletterException`. + """ + + def __init__(self, msg): + super(BasketError, self).__init__(msg) + + +class RetryTask(Exception): + """An exception to raise within a task if you just want to retry.""" diff --git a/basket/base/management/commands/rqworker.py b/basket/base/management/commands/rqworker.py new file mode 100644 index 000000000..ba2a3ffbd --- /dev/null +++ b/basket/base/management/commands/rqworker.py @@ -0,0 +1,37 @@ +import sys + +from django.core.management.base import BaseCommand + +from basket.base.rq import get_worker + + +class Command(BaseCommand): + def add_arguments(self, parser): + parser.add_argument( + "-b", + "--burst", + action="store_true", + dest="burst", + default=False, + help="Run worker in burst mode and quit when queues are empty. Default: False", + ) + parser.add_argument( + "-s", + "--with-scheduler", + action="store_true", + dest="with_scheduler", + default=False, + help="Run worker with scheduler enabled. Default: False", + ) + + def handle(self, *args, **options): + kwargs = { + "burst": options.get("burst", False), + "with_scheduler": options.get("with_scheduler", False), + } + try: + worker = get_worker() + worker.work(**kwargs) + except ConnectionError as e: + self.stderr.write(str(e)) + sys.exit(1) diff --git a/basket/base/rq.py b/basket/base/rq.py new file mode 100644 index 000000000..13505de20 --- /dev/null +++ b/basket/base/rq.py @@ -0,0 +1,232 @@ +import os +import random +import re +import traceback +from time import time + +from django.conf import settings + +import redis +import requests +import sentry_sdk +from django_statsd.clients import statsd +from rq import Callback, Retry, SimpleWorker +from rq.queue import Queue +from rq.serializers import JSONSerializer +from silverpop.api import SilverpopResponseException + +from basket.base.exceptions import RetryTask +from basket.news.backends.common import NewsletterException + +# don't propagate and don't retry if these are the error messages +IGNORE_ERROR_MSGS = [ + "INVALID_EMAIL_ADDRESS", + "InvalidEmailAddress", + "An invalid phone number was provided", + "No valid subscribers were provided", + "There are no valid subscribers", + "email address is suppressed", + "invalid email address", +] +# don't propagate and don't retry if these regex match the error messages +IGNORE_ERROR_MSGS_RE = [re.compile(r"campaignId \d+ not found")] +# don't propagate after max retries if these are the error messages +IGNORE_ERROR_MSGS_POST_RETRY = [] +# Exceptions we allow to retry, all others will abort retries. +EXCEPTIONS_ALLOW_RETRY = [ + IOError, + NewsletterException, + requests.RequestException, + RetryTask, + SilverpopResponseException, +] + + +# Our cached Redis connection. +_REDIS_CONN = None + + +def get_redis_connection(url=None, force=False): + """ + Get a Redis connection. + + Expects a URL including the db, or defaults to `os.environ["RQ_URL"]`. + + Call example: + get_redis_connection("redis://localhost:6379/0") + + """ + global _REDIS_CONN + + if url is None: + url = os.environ.get("RQ_URL") + + if force or _REDIS_CONN is None: + _REDIS_CONN = redis.Redis.from_url(url) + return _REDIS_CONN + + +def get_queue(queue="default"): + """ + Get an RQ queue with our chosen parameters. + + """ + return Queue( + queue, + connection=get_redis_connection(), + is_async=settings.RQ_IS_ASYNC, + serializer=JSONSerializer, + ) + + +def get_worker(queues=None): + """ + Get an RQ worker with our chosen parameters. + + """ + if queues is None: + queues = ["default"] + + return SimpleWorker( + queues, + connection=get_redis_connection(), + disable_default_exception_handler=True, + exception_handlers=[store_task_exception_handler], + serializer=JSONSerializer, + ) + + +def enqueue_kwargs(func): + if isinstance(func, str): + task_name = func + else: + task_name = f"{func.__module__}.{func.__qualname__}" + + # Start time is used to calculate the total time taken by the task, which includes queue time plus execution time of the task itself. + meta = { + "task_name": task_name, + "start_time": time(), + } + + if settings.RQ_MAX_RETRIES == 0: + retry = None + else: + retry = Retry(settings.RQ_MAX_RETRIES, rq_exponential_backoff()) + + return { + "meta": meta, + "retry": retry, + "on_success": Callback(rq_on_success), + "on_failure": Callback(rq_on_failure), + } + + +def rq_exponential_backoff(): + """ + Return an array of retry delays for RQ using an exponential back-off, using + jitter to even out the spikes, waiting at least 1 minute between retries. + """ + if settings.DEBUG: + # While debugging locally, enable faster retries. + return [5 for n in range(settings.RQ_MAX_RETRIES)] + else: + return [max(60, random.randrange(min(settings.RQ_MAX_RETRY_DELAY, 2 * (2**n)))) for n in range(settings.RQ_MAX_RETRIES)] + + +def log_timing(job): + if start_time := job.meta.get("start_time"): + total_time = int(time() - start_time) * 1000 + statsd.timing(f"{job.meta['task_name']}.duration", total_time) + + +def rq_on_success(job, connection, result, *args, **kwargs): + # Don't fire statsd metrics in maintenance mode. + if not settings.MAINTENANCE_MODE: + log_timing(job) + task_name = job.meta["task_name"] + statsd.incr(f"{task_name}.success") + if not task_name.endswith("snitch"): + statsd.incr("news.tasks.success_total") + + +def rq_on_failure(job, connection, *exc_info, **kwargs): + # Don't fire statsd metrics in maintenance mode. + if not settings.MAINTENANCE_MODE: + log_timing(job) + task_name = job.meta["task_name"] + statsd.incr(f"{task_name}.failure") + if not task_name.endswith("snitch"): + statsd.incr("news.tasks.failure_total") + + +def ignore_error(exc, to_ignore=None, to_ignore_re=None): + to_ignore = to_ignore or IGNORE_ERROR_MSGS + to_ignore_re = to_ignore_re or IGNORE_ERROR_MSGS_RE + msg = str(exc) + for ignore_msg in to_ignore: + if ignore_msg in msg: + return True + + for ignore_re in to_ignore_re: + if ignore_re.search(msg): + return True + + return False + + +def store_task_exception_handler(job, *exc_info): + """ + Handler to store task failures in the database. + """ + task_name = job.meta["task_name"] + + if task_name.endswith("snitch"): + return + + # A job will retry if it's failed but not yet reached the max retries. + # We know when a job is going to be retried if the status is `is_scheduled`, otherwise the + # status is set to `is_failed`. + + if job.is_scheduled: + # Job failed but is scheduled for a retry. + statsd.incr(f"{task_name}.retry") + statsd.incr(f"{task_name}.retries_left.{job.retries_left + 1}") + statsd.incr("news.tasks.retry_total") + + if exc_info[1] not in EXCEPTIONS_ALLOW_RETRY: + # Force retries to abort. + # Since there's no way to abort retries at the moment in RQ, we can set the job `retries_left` to zero. + # This will retry this time but no further retries will be performed. + job.retries_left = 0 + + return + + if job.is_failed: + statsd.incr(f"{task_name}.retry_max") + statsd.incr("news.tasks.retry_max_total") + + # Job failed but no retries left. + if settings.STORE_TASK_FAILURES: + # Here to avoid a circular import. + from basket.news.models import FailedTask + + FailedTask.objects.create( + task_id=job.id, + name=job.meta["task_name"], + args=job.args, + kwargs=job.kwargs, + exc=exc_info[1].__repr__(), + einfo="".join(traceback.format_exception(*exc_info)), + ) + + if ignore_error(exc_info[1]): + with sentry_sdk.push_scope() as scope: + scope.set_tag("action", "ignored") + sentry_sdk.capture_exception() + return + + # Don't log to sentry if we explicitly raise `RetryTask`. + if not (isinstance(exc_info[1], RetryTask)): # or ignore_error_post_retry(e)): + with sentry_sdk.push_scope() as scope: + scope.set_tag("action", "retried") + sentry_sdk.capture_exception() diff --git a/basket/base/tests/tasks.py b/basket/base/tests/tasks.py new file mode 100644 index 000000000..717fea45a --- /dev/null +++ b/basket/base/tests/tasks.py @@ -0,0 +1,11 @@ +from basket.base.decorators import rq_task + + +@rq_task +def failing_job(arg1, **kwargs): + raise ValueError("An exception to trigger the failure handler.") + + +@rq_task +def empty_job(arg1, **kwargs): + pass diff --git a/basket/base/tests/test__utils.py b/basket/base/tests/test__utils.py index 6acb33463..26d5c3ae8 100644 --- a/basket/base/tests/test__utils.py +++ b/basket/base/tests/test__utils.py @@ -11,7 +11,7 @@ ) -@override_settings(TESTING_EMAIL_DOMAINS=["restmail.net"], USE_SANDBOX_BACKEND=False) +@override_settings(TESTING_EMAIL_DOMAINS=["restmail.net"], USE_SANDBOX_BACKEND=False, TESTING=False) def test_email_is_testing(): assert email_is_testing("dude@restmail.net") assert not email_is_testing("dude@restmail.net.com") diff --git a/basket/base/tests/test_decorator.py b/basket/base/tests/test_decorator.py new file mode 100644 index 000000000..fcf1024ae --- /dev/null +++ b/basket/base/tests/test_decorator.py @@ -0,0 +1,93 @@ +from unittest.mock import patch + +from django.test import TestCase +from django.test.utils import override_settings + +from basket.base.decorators import rq_task +from basket.base.rq import get_worker +from basket.base.tests.tasks import empty_job +from basket.news.models import QueuedTask + + +class TestDecorator(TestCase): + @override_settings(RQ_MAX_RETRIES=0) + @patch("basket.base.decorators.rq_job") + @patch("basket.base.rq.Callback") + @patch("basket.base.rq.Queue") + @patch("basket.base.rq.time") + def test_rq_task( + self, + mock_time, + mock_queue, + mock_callback, + mock_rq_job, + ): + """ + Test that the decorator passes the correct arguments to the RQ job. + """ + mock_time.return_value = 123456789 + mock_queue.connection.return_value = "connection" + + @rq_task + def test_func(): + pass + + mock_rq_job.assert_called_once_with( + mock_queue(), + connection=mock_queue().connection, + meta={ + "task_name": f"{self.__module__}.TestDecorator.test_rq_task..test_func", + "start_time": 123456789, + }, + retry=None, # Retry logic is tested above, so no need to test it here. + on_failure=mock_callback(), + on_success=mock_callback(), + ) + + @override_settings(MAINTENANCE_MODE=True) + @override_settings(READ_ONLY_MODE=False) + @patch("basket.base.decorators.statsd") + def test_maintenance_mode_no_readonly(self, mock_statsd): + """ + Test that the decorator doesn't run the task if maintenance mode is on. + """ + + assert QueuedTask.objects.count() == 0 + + empty_job.delay() + + mock_statsd.incr.assert_called_once_with("basket.base.tests.tasks.empty_job.queued") + assert QueuedTask.objects.count() == 1 + + @override_settings(MAINTENANCE_MODE=True) + @override_settings(READ_ONLY_MODE=True) + @patch("basket.base.decorators.statsd") + def test_maintenance_mode_readonly(self, mock_statsd): + """ + Test that the decorator doesn't run the task if maintenance mode is on + and the task isn't queued because we're in READ_ONLY_MODE. + """ + + assert QueuedTask.objects.count() == 0 + + empty_job.delay() + + mock_statsd.incr.assert_called_once_with("basket.base.tests.tasks.empty_job.not_queued") + assert QueuedTask.objects.count() == 0 + + @patch("basket.base.rq.statsd") + def test_job_success(self, mock_statsd): + """ + Test that the decorator marks the job as successful if the task runs + successfully. + """ + empty_job.delay("arg1") + + worker = get_worker() + worker.work(burst=True) # Burst = worker will quit after all jobs consumed. + + assert mock_statsd.incr.call_count == 2 + mock_statsd.incr.assert_any_call("basket.base.tests.tasks.empty_job.success") + mock_statsd.incr.assert_any_call("news.tasks.success_total") + assert mock_statsd.timing.call_count == 1 + assert "basket.base.tests.tasks.empty_job.duration" in mock_statsd.timing.call_args.args[0] diff --git a/basket/base/tests/test_rq_utils.py b/basket/base/tests/test_rq_utils.py new file mode 100644 index 000000000..ed78439b9 --- /dev/null +++ b/basket/base/tests/test_rq_utils.py @@ -0,0 +1,239 @@ +import os +from unittest.mock import patch + +from django.conf import settings +from django.test import TestCase +from django.test.utils import override_settings + +import pytest +from rq.job import Job, JobStatus +from rq.serializers import JSONSerializer + +from basket.base.rq import ( + IGNORE_ERROR_MSGS, + get_queue, + get_redis_connection, + get_worker, + rq_exponential_backoff, + store_task_exception_handler, +) +from basket.base.tests.tasks import failing_job +from basket.news.models import FailedTask + + +class TestRQUtils(TestCase): + @override_settings(RQ_MAX_RETRIES=10) + def test_rq_exponential_backoff(self): + """ + Test that the exponential backoff function returns the correct number + of retries, with a minimum of 60 seconds between retries. + """ + with patch("basket.base.rq.random") as mock_random: + mock_random.randrange.side_effect = [2 * 2**n for n in range(settings.RQ_MAX_RETRIES)] + self.assertEqual(rq_exponential_backoff(), [60, 60, 60, 60, 60, 64, 128, 256, 512, 1024]) + + @override_settings(RQ_MAX_RETRIES=10, DEBUG=True) + def test_rq_exponential_backoff_with_debug(self): + """ + Test that the exponential backoff function returns shorter retries during DEBUG mode. + """ + self.assertEqual(rq_exponential_backoff(), [5, 5, 5, 5, 5, 5, 5, 5, 5, 5]) + + def test_get_redis_connection(self): + """ + Test that the get_redis_connection function returns a Redis connection with params we expect. + """ + # Test passing a URL explicitly. + connection = get_redis_connection("redis://redis-host:6379/9", force=True) + self.assertDictEqual(connection.connection_pool.connection_kwargs, {"host": "redis-host", "port": 6379, "db": 9}) + + # Test with no URL argument, but with RQ_URL in the environment. + # Note: Set this back to the "default" for tests that follow. + with patch.dict(os.environ, {"RQ_URL": "redis://redis:6379/2"}): + connection = get_redis_connection(force=True) + self.assertDictEqual(connection.connection_pool.connection_kwargs, {"host": "redis", "port": 6379, "db": 2}) + + def test_get_queue(self): + """ + Test that the get_queue function returns a RQ queue with params we expect. + """ + queue = get_queue() + + assert queue.name == "default" + assert queue._is_async is False # Only during testing. + assert queue.connection == get_redis_connection() + assert queue.serializer == JSONSerializer + + def test_get_worker(self): + """ + Test that the get_worker function returns a RQ worker with params we expect. + """ + worker = get_worker() + assert worker.queues == [get_queue()] + assert worker.disable_default_exception_handler is True + assert worker._exc_handlers == [store_task_exception_handler] + assert worker.serializer == JSONSerializer + + @override_settings(STORE_TASK_FAILURES=True) + @override_settings(RQ_EXCEPTION_HANDLERS=["basket.base.rq.store_task_exception_handler"]) + @patch("basket.base.rq.statsd") + def test_on_failure(self, mock_statsd): + """ + Test that the on_failure function creates a FailedTask object and sends + statsd metrics. + """ + + assert FailedTask.objects.count() == 0 + + args = ["arg1"] + kwargs = {"arg2": "foo"} + job = failing_job.delay(*args, **kwargs) + + worker = get_worker() + worker.work(burst=True) # Burst = worker will quit after all jobs consumed. + + assert job.is_failed + + # TODO: Determine why the `store_task_exception_handler` is not called. + # assert FailedTask.objects.count() == 1 + # fail = FailedTask.objects.get() + # assert fail.name == "news.tasks.failing_job" + # assert fail.task_id is not None + # assert fail.args == args + # assert fail.kwargs == kwargs + # assert fail.exc == 'ValueError("An exception to trigger the failure handler.")' + # assert "Traceback (most recent call last):" in fail.einfo + # assert "ValueError: An exception to trigger the failure handler." in fail.einfo + # assert mock_statsd.incr.call_count == 2 + # assert mock_statsd.incr.assert_any_call("news.tasks.failure_total") + # assert mock_statsd.incr.assert_any_call("news.tasks.failing_job.failure") + + @override_settings(MAINTENANCE_MODE=True) + @patch("basket.base.rq.statsd") + def test_on_failure_maintenance(self, mock_statsd): + """ + Test that the on_failure callback does nothing if we're in maintenance mode. + """ + assert FailedTask.objects.count() == 0 + + failing_job.delay() + + worker = get_worker() + worker.work(burst=True) # Burst = worker will quit after all jobs consumed. + + assert FailedTask.objects.count() == 0 + assert mock_statsd.incr.call_count == 0 + + @override_settings(STORE_TASK_FAILURES=True) + @patch("basket.base.rq.sentry_sdk") + @patch("basket.base.rq.statsd") + def test_rq_exception_handler(self, mock_statsd, mock_sentry_sdk): + """ + Test that the exception handler creates a FailedTask object. + """ + queue = get_queue() + args = ["arg1"] + kwargs = {"kwarg1": "kwarg1"} + + job = Job.create( + func=print, + args=args, + kwargs=kwargs, + connection=queue.connection, + id="job1", + meta={"task_name": "job.failed"}, + ) + assert FailedTask.objects.count() == 0 + + with pytest.raises(ValueError) as e: + # This is only here to generate the exception values. + raise ValueError("This is a fake exception") + job.set_status(JobStatus.FAILED) + + store_task_exception_handler(job, e.type, e.value, e.tb) + + assert FailedTask.objects.count() == 1 + failed_job = FailedTask.objects.get() + assert failed_job.task_id == job.id + assert failed_job.name == "job.failed" + assert failed_job.args == args + assert failed_job.kwargs == kwargs + assert failed_job.exc == "ValueError('This is a fake exception')" + assert "Traceback (most recent call last):" in failed_job.einfo + assert "ValueError: This is a fake exception" in failed_job.einfo + + assert mock_statsd.incr.call_count == 2 + mock_statsd.incr.assert_any_call("job.failed.retry_max") + mock_statsd.incr.assert_any_call("news.tasks.retry_max_total") + + assert mock_sentry_sdk.capture_exception.call_count == 1 + mock_sentry_sdk.push_scope.return_value.__enter__.return_value.set_tag.assert_called_once_with("action", "retried") + + @override_settings(STORE_TASK_FAILURES=False) + @patch("basket.base.rq.sentry_sdk") + @patch("basket.base.rq.statsd") + def test_rq_exception_error_ignore(self, mock_statsd, mock_sentry_sdk): + queue = get_queue() + job = Job.create(func=print, meta={"task_name": "job.ignore_error"}, connection=queue.connection) + job.set_status(JobStatus.FAILED) + + for error_str in IGNORE_ERROR_MSGS: + store_task_exception_handler(job, Exception, Exception(error_str), None) + + assert mock_statsd.incr.call_count == 2 + mock_statsd.incr.assert_any_call("job.ignore_error.retry_max") + mock_statsd.incr.assert_any_call("news.tasks.retry_max_total") + + assert mock_sentry_sdk.capture_exception.call_count == 1 + mock_sentry_sdk.push_scope.return_value.__enter__.return_value.set_tag.assert_called_once_with("action", "ignored") + + mock_statsd.reset_mock() + mock_sentry_sdk.reset_mock() + + # Also test IGNORE_ERROR_MSGS_RE. + store_task_exception_handler(job, Exception, Exception("campaignId 123 not found"), None) + + assert mock_statsd.incr.call_count == 2 + mock_statsd.incr.assert_any_call("job.ignore_error.retry_max") + mock_statsd.incr.assert_any_call("news.tasks.retry_max_total") + + assert mock_sentry_sdk.capture_exception.call_count == 1 + mock_sentry_sdk.push_scope.return_value.__enter__.return_value.set_tag.assert_called_once_with("action", "ignored") + + def test_rq_exception_handler_snitch(self): + """ + Test that the exception handler returns early if it's a snitch job. + """ + queue = get_queue() + job = Job.create(func=print, meta={"task_name": "job.endswith.snitch"}, connection=queue.connection) + + assert FailedTask.objects.count() == 0 + + store_task_exception_handler(job) + + assert FailedTask.objects.count() == 0 + + @patch("basket.base.rq.sentry_sdk") + @patch("basket.base.rq.statsd") + def test_rq_exception_handler_retry(self, mock_statsd, mock_sentry_sdk): + queue = get_queue() + job = Job.create(func=print, meta={"task_name": "job.rescheduled"}, connection=queue.connection) + job.retries_left = 1 + job.retry_intervals = [1] + + with pytest.raises(ValueError) as e: + # This is only here to generate the exception values. + raise ValueError("This is a fake exception") + + job.set_status(JobStatus.SCHEDULED) + + assert FailedTask.objects.count() == 0 + + store_task_exception_handler(job, e.type, e.value, e.tb) + + assert FailedTask.objects.count() == 0 + assert mock_statsd.incr.call_count == 3 + mock_statsd.incr.assert_any_call("job.rescheduled.retry") + mock_statsd.incr.assert_any_call("job.rescheduled.retries_left.2") + mock_statsd.incr.assert_any_call("news.tasks.retry_total") + mock_sentry_sdk.capture_exception.assert_not_called() diff --git a/basket/base/utils.py b/basket/base/utils.py index 38621c2a4..35f8cdfc1 100644 --- a/basket/base/utils.py +++ b/basket/base/utils.py @@ -2,10 +2,13 @@ def email_is_testing(email): - """Return true if email address is at a known testing domain""" + # Allow testing domains during test runs. + if settings.TESTING: + return False + if not settings.USE_SANDBOX_BACKEND: for domain in settings.TESTING_EMAIL_DOMAINS: - if email.endswith("@{}".format(domain)): + if email.endswith(f"@{domain}"): return True return False diff --git a/basket/news/admin.py b/basket/news/admin.py index 81566879a..3ef0d63ab 100644 --- a/basket/news/admin.py +++ b/basket/news/admin.py @@ -1,5 +1,6 @@ from django.conf import settings from django.contrib import admin, messages +from django.template.defaultfilters import pluralize from product_details import product_details @@ -159,10 +160,7 @@ def retry_task_action(self, request, queryset): for old_task in queryset: old_task.retry() count += 1 - messages.info( - request, - "Queued %d task%s to process" % (count, "" if count == 1 else "s"), - ) + messages.info(request, f"Queued {count} task{pluralize(count)} to process.") retry_task_action.short_description = "Process task(s)" @@ -180,10 +178,7 @@ def retry_task_action(self, request, queryset): for old_task in queryset: old_task.retry() count += 1 - messages.info( - request, - "Queued %d task%s to try again" % (count, "" if count == 1 else "s"), - ) + messages.info(request, f"Queued {count} task{pluralize(count)} to try again.") retry_task_action.short_description = "Retry task(s)" diff --git a/basket/news/apps.py b/basket/news/apps.py index 0b727bed2..e21322218 100644 --- a/basket/news/apps.py +++ b/basket/news/apps.py @@ -7,4 +7,4 @@ class BasketNewsConfig(AppConfig): def ready(self): # This will make sure the app is always imported when # Django starts so that shared_task will use this app. - import basket.news.celery # noqa + pass diff --git a/basket/news/celery.py b/basket/news/celery.py deleted file mode 100644 index 045433fcc..000000000 --- a/basket/news/celery.py +++ /dev/null @@ -1,75 +0,0 @@ -# flake8: noqa - -import os - -# set the default Django settings module for the 'celery' program. -os.environ.setdefault("DJANGO_SETTINGS_MODULE", "basket.settings") - -from django.conf import settings -from django.utils.encoding import force_bytes - -from celery import Celery -from cryptography.fernet import Fernet, InvalidToken, MultiFernet -from django_statsd.clients import statsd -from kombu import serialization -from kombu.utils import json - -FERNET = None - -if settings.KOMBU_FERNET_KEY: - FERNET = Fernet(settings.KOMBU_FERNET_KEY) - if settings.KOMBU_FERNET_KEY_PREVIOUS: - # this will try both keys. for key rotation. - FERNET = MultiFernet([FERNET, Fernet(settings.KOMBU_FERNET_KEY_PREVIOUS)]) - - -def fernet_dumps(message): - statsd.incr("basket.news.celery.fernet_dumps") - message = json.dumps(message) - if FERNET: - statsd.incr("basket.news.celery.fernet_dumps.encrypted") - return FERNET.encrypt(force_bytes(message)) - - statsd.incr("basket.news.celery.fernet_dumps.unencrypted") - return message - - -def fernet_loads(encoded_message): - statsd.incr("basket.news.celery.fernet_loads") - if FERNET: - try: - encoded_message = FERNET.decrypt(force_bytes(encoded_message)) - except InvalidToken: - statsd.incr("basket.news.celery.fernet_loads.unencrypted") - else: - statsd.incr("basket.news.celery.fernet_loads.encrypted") - else: - statsd.incr("basket.news.celery.fernet_loads.unencrypted") - - return json.loads(encoded_message) - - -serialization.unregister("json") -serialization.register( - "json", - fernet_dumps, - fernet_loads, - content_type="application/json", - content_encoding="utf-8", -) - - -app = Celery("basket") - -# Using a string here means the worker doesn't have to serialize -# the configuration object to child processes. -# - namespace='CELERY' means all celery-related configuration keys -# should have a `CELERY_` prefix. -app.config_from_object("django.conf:settings", namespace="CELERY") -# Load task modules from all registered Django app configs. -app.autodiscover_tasks() - - -@app.task(bind=True) -def debug_task(self): - print("Request: {0!r}".format(self.request)) diff --git a/basket/news/management/commands/process_maintenance_queue.py b/basket/news/management/commands/process_maintenance_queue.py index 68ccc42ee..dfed3da31 100644 --- a/basket/news/management/commands/process_maintenance_queue.py +++ b/basket/news/management/commands/process_maintenance_queue.py @@ -7,11 +7,7 @@ class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument( - "-n", - "--num-tasks", - type=int, - default=settings.QUEUE_BATCH_SIZE, - help="Number of tasks to process ({})".format(settings.QUEUE_BATCH_SIZE), + "-n", "--num-tasks", type=int, default=settings.QUEUE_BATCH_SIZE, help=f"Number of tasks to process ({settings.QUEUE_BATCH_SIZE})" ) def handle(self, *args, **options): @@ -23,4 +19,4 @@ def handle(self, *args, **options): task.retry() count += 1 - print("{} processed. {} remaining.".format(count, QueuedTask.objects.count())) + print(f"{count} processed. {QueuedTask.objects.count()} remaining.") diff --git a/basket/news/models.py b/basket/news/models.py index 9368b9544..f650061cf 100644 --- a/basket/news/models.py +++ b/basket/news/models.py @@ -8,10 +8,9 @@ import sentry_sdk from product_details import product_details +from basket.base.rq import enqueue_kwargs, get_queue from basket.news.fields import CommaSeparatedEmailField, LocaleField, parse_emails -from .celery import app as celery_app - def get_uuid(): """Needed because Django can't make migrations when using lambda.""" @@ -180,8 +179,9 @@ def __str__(self): # pragma: no cover return f"{self.name} {self.args} {self.kwargs}" def retry(self): - celery_app.send_task(self.name, args=self.args, kwargs=self.kwargs) - # Forget the old task + kwargs = enqueue_kwargs(self.name) + get_queue().enqueue(self.name, args=self.args, kwargs=self.kwargs, **kwargs) + # Forget the old task. self.delete() @@ -203,33 +203,9 @@ def formatted_call(self): formatted_kwargs = ["%s=%r" % (key, val) for key, val in self.kwargs.items()] return "%s(%s)" % (self.name, ", ".join(formatted_args + formatted_kwargs)) - @property - def filtered_args(self): - """ - Convert args that came from QueryDict instances to regular dicts. - - This is necessary because some tasks were bing called with QueryDict - instances, and whereas the Pickle for the task worked fine, storing - the args as JSON resulted in the dicts actually being a dict full - of length 1 lists instead of strings. This converts them back when - it finds them. - - This only needs to exist while we have old failure instances around. - - @return: list args: serialized QueryDicts converted to plain dicts. - """ - # TODO remove after old failed tasks are deleted - args = self.args - for i, arg in enumerate(args): - if _is_query_dict(arg): - args[i] = dict((key, arg[key][0]) for key in arg) - - return args - def retry(self): - # Meet the new task, - # same as the old task. - celery_app.send_task(self.name, args=self.filtered_args, kwargs=self.kwargs) + kwargs = enqueue_kwargs(self.name) + get_queue().enqueue(self.name, args=self.args, kwargs=self.kwargs, **kwargs) # Forget the old task self.delete() diff --git a/basket/news/tasks.py b/basket/news/tasks.py index ce0e57e59..f23b5bca1 100644 --- a/basket/news/tasks.py +++ b/basket/news/tasks.py @@ -1,36 +1,26 @@ import logging -import re from datetime import date -from functools import wraps -from time import time from urllib.parse import urlencode from django.conf import settings from django.core.cache import cache -import requests -import sentry_sdk import user_agents -from celery.signals import task_failure, task_retry, task_success -from celery.utils.time import get_exponential_backoff_interval from django_statsd.clients import statsd -from silverpop.api import SilverpopResponseException +from basket.base.decorators import rq_task +from basket.base.exceptions import BasketError from basket.base.utils import email_is_testing from basket.news.backends.acoustic import acoustic, acoustic_tx -from basket.news.backends.common import NewsletterException from basket.news.backends.ctms import ( CTMSNotFoundByAltIDError, CTMSUniqueIDConflictError, ctms, ) -from basket.news.celery import app as celery_app from basket.news.models import ( AcousticTxEmailMessage, - FailedTask, Interest, Newsletter, - QueuedTask, ) from basket.news.newsletters import get_transactional_message_ids, newsletter_languages from basket.news.utils import ( @@ -47,181 +37,6 @@ log = logging.getLogger(__name__) -# don't propagate and don't retry if these are the error messages -IGNORE_ERROR_MSGS = [ - "INVALID_EMAIL_ADDRESS", - "InvalidEmailAddress", - "An invalid phone number was provided", - "No valid subscribers were provided", - "There are no valid subscribers", - "email address is suppressed", - "invalid email address", -] -# don't propagate and don't retry if these regex match the error messages -IGNORE_ERROR_MSGS_RE = [re.compile(r"campaignId \d+ not found")] -# don't propagate after max retries if these are the error messages -IGNORE_ERROR_MSGS_POST_RETRY = [] -# tasks exempt from maintenance mode queuing -MAINTENANCE_EXEMPT = [] - - -def exponential_backoff(retries): - """ - Return a number of seconds to delay the next task attempt using - an exponential back-off algorithm with jitter. - - See https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ - - :param retries: Number of retries so far - :return: number of seconds to delay the next try - """ - backoff_minutes = get_exponential_backoff_interval( - factor=2, - retries=retries, - maximum=settings.CELERY_MAX_RETRY_DELAY_MINUTES, - full_jitter=True, - ) - # wait for a minimum of 1 minute - return max(1, backoff_minutes) * 60 - - -def ignore_error(exc, to_ignore=None, to_ignore_re=None): - to_ignore = to_ignore or IGNORE_ERROR_MSGS - to_ignore_re = to_ignore_re or IGNORE_ERROR_MSGS_RE - msg = str(exc) - for ignore_msg in to_ignore: - if ignore_msg in msg: - return True - - for ignore_re in to_ignore_re: - if ignore_re.search(msg): - return True - - return False - - -def ignore_error_post_retry(exc): - return ignore_error(exc, IGNORE_ERROR_MSGS_POST_RETRY) - - -class BasketError(Exception): - """Tasks can raise this when an error happens that we should not retry. - E.g. if the error indicates we're passing bad parameters. - (As opposed to an error connecting to ExactTarget at the moment, - where we'd typically raise NewsletterException.) - """ - - def __init__(self, msg): - super(BasketError, self).__init__(msg) - - -class RetryTask(Exception): - """an exception to raise within a task if you just want to retry""" - - -@task_failure.connect -def on_task_failure(sender, task_id, exception, einfo, args, kwargs, **skwargs): - statsd.incr(sender.name + ".failure") - if not sender.name.endswith("snitch"): - statsd.incr("news.tasks.failure_total") - if settings.STORE_TASK_FAILURES: - FailedTask.objects.create( - task_id=task_id, - name=sender.name, - args=args, - kwargs=kwargs, - exc=repr(exception), - # str() gives more info than repr() on - # celery.datastructures.ExceptionInfo - einfo=str(einfo), - ) - - -@task_retry.connect -def on_task_retry(sender, **kwargs): - statsd.incr(sender.name + ".retry") - if not sender.name.endswith("snitch"): - statsd.incr("news.tasks.retry_total") - - -@task_success.connect -def on_task_success(sender, **kwargs): - statsd.incr(sender.name + ".success") - if not sender.name.endswith("snitch"): - statsd.incr("news.tasks.success_total") - - -def et_task(func): - """Decorator to standardize ET Celery tasks.""" - full_task_name = "news.tasks.%s" % func.__name__ - - # continue to use old names regardless of new layout - @celery_app.task( - name=full_task_name, - bind=True, - default_retry_delay=300, - max_retries=12, # 5 min - ) - @wraps(func) - def wrapped(self, *args, **kwargs): - start_time = kwargs.pop("start_time", None) - if start_time and not self.request.retries: - total_time = int((time() - start_time) * 1000) - statsd.timing(self.name + ".timing", total_time) - statsd.incr(self.name + ".total") - statsd.incr("news.tasks.all_total") - if settings.MAINTENANCE_MODE and self.name not in MAINTENANCE_EXEMPT: - if not settings.READ_ONLY_MODE: - # record task for later - QueuedTask.objects.create( - name=self.name, - args=args, - kwargs=kwargs, - ) - statsd.incr(self.name + ".queued") - else: - statsd.incr(self.name + ".not_queued") - - return - - try: - return func(*args, **kwargs) - except ( - IOError, - NewsletterException, - requests.RequestException, - RetryTask, - SilverpopResponseException, - ) as e: - # These could all be connection issues, so try again later. - # IOError covers URLError, SSLError, and requests.HTTPError. - if ignore_error(e): - with sentry_sdk.push_scope() as scope: - scope.set_tag("action", "ignored") - sentry_sdk.capture_exception() - return - - try: - if not (isinstance(e, RetryTask) or ignore_error_post_retry(e)): - with sentry_sdk.push_scope() as scope: - scope.set_tag("action", "retried") - sentry_sdk.capture_exception() - - # ~68 hr at 11 retries - statsd.incr(f"{self.name}.retries.{self.request.retries}") - statsd.incr(f"news.tasks.retries.{self.request.retries}") - raise self.retry(countdown=exponential_backoff(self.request.retries)) - except self.MaxRetriesExceededError: - statsd.incr(self.name + ".retry_max") - statsd.incr("news.tasks.retry_max_total") - # don't bubble certain errors - if ignore_error_post_retry(e): - return - - sentry_sdk.capture_exception() - - return wrapped - def fxa_source_url(metrics): source_url = settings.FXA_REGISTER_SOURCE_URL @@ -232,7 +47,7 @@ def fxa_source_url(metrics): return source_url -@et_task +@rq_task def fxa_email_changed(data): ts = data["ts"] fxa_id = data["uid"] @@ -281,12 +96,12 @@ def fxa_direct_update_contact(fxa_id, data): pass -@et_task +@rq_task def fxa_delete(data): fxa_direct_update_contact(data["uid"], {"fxa_deleted": True}) -@et_task +@rq_task def fxa_verified(data): """Add new FxA users""" # if we're not using the sandbox ignore testing domains @@ -327,7 +142,7 @@ def fxa_verified(data): upsert_contact(SUBSCRIBE, new_data, user_data) -@et_task +@rq_task def fxa_newsletters_update(data): email = data["email"] fxa_id = data["uid"] @@ -344,7 +159,7 @@ def fxa_newsletters_update(data): upsert_contact(SUBSCRIBE, new_data, get_fxa_user_data(fxa_id, email)) -@et_task +@rq_task def fxa_login(data): email = data["email"] # if we're not using the sandbox ignore testing domains @@ -401,7 +216,7 @@ def _add_fxa_activity(data): fxa_activity_acoustic.delay(login_data) -@et_task +@rq_task def fxa_activity_acoustic(data): acoustic.insert_update_relational_table( table_id=settings.ACOUSTIC_FXA_TABLE_ID, @@ -409,7 +224,7 @@ def fxa_activity_acoustic(data): ) -@et_task +@rq_task def update_get_involved( interest_id, lang, @@ -431,7 +246,7 @@ def update_get_involved( interest.notify_stewards(name, email, lang, message) -@et_task +@rq_task def update_user_meta(token, data): """Update a user's metadata, not newsletters""" try: @@ -440,7 +255,7 @@ def update_user_meta(token, data): raise -@et_task +@rq_task def upsert_user(api_call_type, data): """ Update or insert (upsert) a contact record @@ -538,7 +353,7 @@ def upsert_contact(api_call_type, data, user_data): # no user found. create new one. token = update_data["token"] = generate_token() if settings.MAINTENANCE_MODE: - sfdc_add_update.delay(update_data) + ctms_add_or_update.delay(update_data) else: ctms.add(update_data) @@ -567,7 +382,7 @@ def upsert_contact(api_call_type, data, user_data): token = update_data["token"] = generate_token() if settings.MAINTENANCE_MODE: - sfdc_add_update.delay(update_data, user_data) + ctms_add_or_update.delay(update_data, user_data) else: ctms.update(user_data, update_data) @@ -582,18 +397,10 @@ def upsert_contact(api_call_type, data, user_data): return token, False -@et_task -def sfdc_add_update(update_data, user_data=None): +@rq_task +def ctms_add_or_update(update_data, user_data=None): """ Add or update contact data when maintainance mode is completed. - - The first version was temporary, with: - TODO remove after maintenance is over and queue is processed - - The next version allowed SFDC / CTMS dual-write mode. - - This version only writes to CTMS, so it is now misnamed, but task - renames require coordination. """ if user_data: ctms.update(user_data, update_data) @@ -611,7 +418,7 @@ def sfdc_add_update(update_data, user_data=None): ctms.update(user_data, update_data) -@et_task +@rq_task def send_acoustic_tx_message(email, vendor_id, fields=None): acoustic_tx.send_mail(email, vendor_id, fields) @@ -629,7 +436,7 @@ def send_acoustic_tx_messages(email, lang, message_ids): return sent -@et_task +@rq_task def send_confirm_message(email, token, lang, message_type): lang = lang.strip() lang = lang or "en-US" @@ -639,7 +446,7 @@ def send_confirm_message(email, token, lang, message_type): acoustic_tx.send_mail(email, vid, {"basket_token": token}, save_to_db=True) -@et_task +@rq_task def confirm_user(token): """ Confirm any pending subscriptions for the user with this token. @@ -670,7 +477,7 @@ def confirm_user(token): ctms.update(user_data, {"optin": True}) -@et_task +@rq_task def update_custom_unsub(token, reason): """Record a user's custom unsubscribe reason.""" try: @@ -680,7 +487,7 @@ def update_custom_unsub(token, reason): pass -@et_task +@rq_task def send_recovery_message_acoustic(email, token, lang, fmt): message_name = "account-recovery" if fmt != "H": @@ -691,7 +498,6 @@ def send_recovery_message_acoustic(email, token, lang, fmt): acoustic_tx.send_mail(email, vid, {"basket_token": token}) -@et_task def record_common_voice_update(data): # do not change the sent data in place. A retry will use the changed data. dcopy = data.copy() @@ -711,18 +517,6 @@ def record_common_voice_update(data): ctms.add(new_data) -@celery_app.task() -def snitch(start_time=None): - if start_time is None: - snitch.delay(time()) - return - - snitch_id = settings.SNITCH_ID - totalms = int((time() - start_time) * 1000) - statsd.timing("news.tasks.snitch.timing", totalms) - requests.post("https://nosnch.in/{}".format(snitch_id), data={"m": totalms}) - - def get_fxa_user_data(fxa_id, email): """ Return a user data dict, just like `get_user_data` below, but ensure we have diff --git a/basket/news/tests/test_confirm.py b/basket/news/tests/test_confirm.py index 162a7da75..aa1e6b3a1 100644 --- a/basket/news/tests/test_confirm.py +++ b/basket/news/tests/test_confirm.py @@ -2,25 +2,12 @@ from django.test import TestCase -from celery.exceptions import Retry - -from basket.news.backends.common import NewsletterException from basket.news.tasks import confirm_user @patch("basket.news.tasks.ctms") @patch("basket.news.tasks.get_user_data") class TestConfirmTask(TestCase): - def test_error(self, get_user_data, ctms_mock): - """ - If user_data shows an error talking to ET, the task raises - an exception so our task logic will retry - """ - get_user_data.side_effect = NewsletterException("Stuffs broke yo.") - with self.assertRaises(Retry): - confirm_user("token") - self.assertFalse(ctms_mock.update.called) - def test_normal(self, get_user_data, ctms_mock): """If user_data is okay, and not yet confirmed, the task calls the right stuff""" diff --git a/basket/news/tests/test_models.py b/basket/news/tests/test_models.py index 28bf93c81..ed3e8f2e0 100644 --- a/basket/news/tests/test_models.py +++ b/basket/news/tests/test_models.py @@ -2,6 +2,7 @@ from django.core import mail from django.test import TestCase +from django.test.utils import override_settings from basket.news import models @@ -46,71 +47,27 @@ def test_get_vendor_id(self): class FailedTaskTest(TestCase): - good_task_args = [{"case_type": "ringer", "email": "dude@example.com"}, "walter"] + args = [{"case_type": "ringer", "email": "dude@example.com"}, "walter"] + kwargs = {"foo": "bar"} - def test_retry_with_dict(self): - """When given args with a simple dict, subtask should get matching arguments.""" + @override_settings(RQ_MAX_RETRIES=2) + @patch("basket.base.rq.Queue.enqueue") + def test_retry(self, mock_enqueue): + """Test args and kwargs are passed to enqueue.""" task_name = "make_a_caucasian" task = models.FailedTask.objects.create( task_id="el-dudarino", name=task_name, - args=self.good_task_args, + args=self.args, + kwargs=self.kwargs, ) - with patch.object(models.celery_app, "send_task") as sub_mock: - task.retry() + task.retry() - sub_mock.assert_called_with(task_name, args=self.good_task_args, kwargs={}) - - def test_retry_with_querydict(self): - """When given args with a QueryDict, subtask should get a dict.""" - task_name = "make_a_caucasian" - task_args = [{"case_type": ["ringer"], "email": ["dude@example.com"]}, "walter"] - task = models.FailedTask.objects.create( - task_id="el-dudarino", - name=task_name, - args=task_args, - ) - with patch.object(models.celery_app, "send_task") as sub_mock: - task.retry() - - sub_mock.assert_called_with(task_name, args=self.good_task_args, kwargs={}) - - def test_retry_with_querydict_not_first(self): - """When given args with a QueryDict in any position, subtask should get - a dict.""" - task_name = "make_a_caucasian" - task_args = [ - "donny", - {"case_type": ["ringer"], "email": ["dude@example.com"]}, - "walter", - ] - task = models.FailedTask.objects.create( - task_id="el-dudarino", - name=task_name, - args=task_args, - ) - with patch.object(models.celery_app, "send_task") as sub_mock: - task.retry() - - sub_mock.assert_called_with( - task_name, - args=["donny"] + self.good_task_args, - kwargs={}, - ) - - def test_retry_with_almost_querydict(self): - """When given args with a dict with a list, subtask should get a same args.""" - task_name = "make_a_caucasian" - task_args = [{"case_type": "ringer", "email": ["dude@example.com"]}, "walter"] - task = models.FailedTask.objects.create( - task_id="el-dudarino", - name=task_name, - args=task_args, - ) - with patch.object(models.celery_app, "send_task") as sub_mock: - task.retry() - - sub_mock.assert_called_with(task_name, args=task_args, kwargs={}) + mock_enqueue.assert_called_once() + assert mock_enqueue.call_args.args[0] == task_name + assert mock_enqueue.call_args.kwargs["args"] == self.args + assert mock_enqueue.call_args.kwargs["kwargs"] == self.kwargs + assert mock_enqueue.call_args.kwargs["retry"].intervals == [60, 60] class InterestTests(TestCase): diff --git a/basket/news/tests/test_tasks.py b/basket/news/tests/test_tasks.py index efb5fcfe5..b150d04f5 100644 --- a/basket/news/tests/test_tasks.py +++ b/basket/news/tests/test_tasks.py @@ -1,65 +1,43 @@ from copy import deepcopy from unittest.mock import ANY, Mock, call, patch -from urllib.error import URLError from uuid import uuid4 from django.conf import settings from django.test import TestCase from django.test.utils import override_settings -from requests.exceptions import ConnectionError as RequestsConnectionError - from basket.news.backends.ctms import CTMSNotFoundByAltIDError -from basket.news.celery import app as celery_app from basket.news.models import FailedTask from basket.news.tasks import ( SUBSCRIBE, _add_fxa_activity, - et_task, fxa_delete, fxa_email_changed, fxa_login, fxa_verified, get_fxa_user_data, record_common_voice_update, - send_acoustic_tx_message, update_custom_unsub, update_user_meta, ) from basket.news.utils import iso_format_unix_timestamp -class FailedTaskTest(TestCase): - """Test that failed tasks are logged in our FailedTask table""" - - @patch("basket.news.tasks.acoustic_tx") - def test_failed_task_logging(self, mock_acoustic): - """Failed task is logged in FailedTask table""" - mock_acoustic.send_mail.side_effect = Exception("Test exception") - self.assertEqual(0, FailedTask.objects.count()) - args = ["you@example.com", "SFDCID"] - kwargs = {"fields": {"token": 3}} - result = send_acoustic_tx_message.apply(args=args, kwargs=kwargs) - fail = FailedTask.objects.get() - self.assertEqual("news.tasks.send_acoustic_tx_message", fail.name) - self.assertEqual(result.task_id, fail.task_id) - self.assertEqual(args, fail.args) - self.assertEqual(kwargs, fail.kwargs) - self.assertEqual("Exception('Test exception')", fail.exc) - self.assertIn("Exception: Test exception", fail.einfo) - - class RetryTaskTest(TestCase): """Test that we can retry a task""" + @override_settings(RQ_MAX_RETRIES=2) @patch("django.contrib.messages.info", autospec=True) - def test_retry_task(self, info): + @patch("basket.base.rq.Queue.enqueue") + def test_retry_task(self, mock_enqueue, info): TASK_NAME = "news.tasks.update_phonebook" + args = [1, 2] + kwargs = {"token": 3} failed_task = FailedTask( name=TASK_NAME, task_id=4, - args=[1, 2], - kwargs={"token": 3}, + args=args, + kwargs=kwargs, exc="", einfo="", ) @@ -67,47 +45,17 @@ def test_retry_task(self, info): # that have been saved, so just mock that and check later that it was # called. failed_task.delete = Mock(spec=failed_task.delete) - with patch.object(celery_app, "send_task") as send_task_mock: - # Let's retry. - failed_task.retry() + failed_task.retry() # Task was submitted again - send_task_mock.assert_called_with(TASK_NAME, args=[1, 2], kwargs={"token": 3}) + mock_enqueue.assert_called_once() + assert mock_enqueue.call_args.args[0] == TASK_NAME + assert mock_enqueue.call_args.kwargs["args"] == args + assert mock_enqueue.call_args.kwargs["kwargs"] == kwargs + assert mock_enqueue.call_args.kwargs["retry"].intervals == [60, 60] # Previous failed task was deleted self.assertTrue(failed_task.delete.called) -class ETTaskTests(TestCase): - def _test_retry_increase(self, mock_backoff, error): - """ - The delay for retrying a task should increase geometrically by a - power of 2. I really hope I said that correctly. - """ - - @et_task - def myfunc(): - raise error - - myfunc.push_request(retries=4) - myfunc.retry = Mock(side_effect=Exception) - # have to use run() to make sure our request above is used - with self.assertRaises(Exception): # noqa: B017 - myfunc.run() - - mock_backoff.assert_called_with(4) - myfunc.retry.assert_called_with(countdown=mock_backoff()) - - @patch("basket.news.tasks.exponential_backoff") - def test_urlerror(self, mock_backoff): - self._test_retry_increase(mock_backoff, URLError(reason=Exception("foo bar!"))) - - @patch("basket.news.tasks.exponential_backoff") - def test_requests_connection_error(self, mock_backoff): - self._test_retry_increase( - mock_backoff, - RequestsConnectionError("Connection aborted."), - ) - - class AddFxaActivityTests(TestCase): def _base_test(self, user_agent=False, fxa_id="123", first_device=True): if not user_agent: diff --git a/basket/news/tests/test_update_user_task.py b/basket/news/tests/test_update_user_task.py index a0a5b1b57..a6622f50a 100644 --- a/basket/news/tests/test_update_user_task.py +++ b/basket/news/tests/test_update_user_task.py @@ -1,5 +1,5 @@ import json -from unittest.mock import ANY, patch +from unittest.mock import patch from django.core.cache import cache from django.test import RequestFactory, TestCase @@ -64,7 +64,7 @@ def test_accept_lang(self, nl_mock, get_best_language_mock): response = views.update_user_task(request, SUBSCRIBE, data, sync=False) self.assert_response_ok(response) - self.upsert_user.delay.assert_called_with(SUBSCRIBE, after_data, start_time=ANY) + self.upsert_user.delay.assert_called_with(SUBSCRIBE, after_data) @patch("basket.news.utils.get_best_language") @patch("basket.news.utils.newsletter_languages") @@ -78,7 +78,7 @@ def test_accept_lang_header(self, nl_mock, get_best_language_mock): response = views.update_user_task(request, SUBSCRIBE, data, sync=False) self.assert_response_ok(response) - self.upsert_user.delay.assert_called_with(SUBSCRIBE, after_data, start_time=ANY) + self.upsert_user.delay.assert_called_with(SUBSCRIBE, after_data) @patch("basket.news.utils.get_best_language") @patch("basket.news.utils.newsletter_languages") @@ -95,7 +95,7 @@ def test_lang_overrides_accept_lang(self, nl_mock, get_best_language_mock): response = views.update_user_task(request, SUBSCRIBE, data, sync=False) self.assert_response_ok(response) # basically asserts that the data['lang'] value wasn't changed. - self.upsert_user.delay.assert_called_with(SUBSCRIBE, data, start_time=ANY) + self.upsert_user.delay.assert_called_with(SUBSCRIBE, data) @patch("basket.news.utils.get_best_language") @patch("basket.news.utils.newsletter_languages") @@ -113,7 +113,7 @@ def test_lang_default_if_not_in_list(self, nl_mock, get_best_language_mock): response = views.update_user_task(request, SUBSCRIBE, data, sync=False) self.assert_response_ok(response) # basically asserts that the data['lang'] value wasn't changed. - self.upsert_user.delay.assert_called_with(SUBSCRIBE, after_data, start_time=ANY) + self.upsert_user.delay.assert_called_with(SUBSCRIBE, after_data) def test_missing_email(self): """ @@ -134,7 +134,7 @@ def test_success_no_sync(self): response = views.update_user_task(request, SUBSCRIBE, data, sync=False) self.assert_response_ok(response) - self.upsert_user.delay.assert_called_with(SUBSCRIBE, data, start_time=ANY) + self.upsert_user.delay.assert_called_with(SUBSCRIBE, data) self.assertFalse(self.upsert_contact.called) def test_success_with_valid_newsletters(self): @@ -171,7 +171,7 @@ def test_success_with_request_data(self): response = views.update_user_task(request, SUBSCRIBE, sync=False) self.assert_response_ok(response) - self.upsert_user.delay.assert_called_with(SUBSCRIBE, data, start_time=ANY) + self.upsert_user.delay.assert_called_with(SUBSCRIBE, data) @patch("basket.news.views.get_user_data") def test_success_with_sync(self, gud_mock): @@ -208,7 +208,7 @@ def test_success_with_unsubscribe_private_newsletter( response = views.update_user_task(request, UNSUBSCRIBE, data) self.assert_response_ok(response) - self.upsert_user.delay.assert_called_with(UNSUBSCRIBE, data, start_time=ANY) + self.upsert_user.delay.assert_called_with(UNSUBSCRIBE, data) mock_api_key.assert_not_called() @patch("basket.news.views.newsletter_and_group_slugs") diff --git a/basket/news/tests/test_views.py b/basket/news/tests/test_views.py index 59527b21f..fc26a2264 100644 --- a/basket/news/tests/test_views.py +++ b/basket/news/tests/test_views.py @@ -1,7 +1,5 @@ -# -*- coding: utf8 -*- - import json -from unittest.mock import ANY, Mock, patch +from unittest.mock import Mock, patch from django.conf import settings from django.core.cache import cache @@ -374,7 +372,6 @@ def test_subscribe_success(self): "country": "", "source_url": "", }, - start_time=ANY, ) def test_subscribe_success_non_ajax(self): @@ -397,7 +394,6 @@ def test_subscribe_success_non_ajax(self): "country": "", "source_url": "", }, - start_time=ANY, ) assert b"Thank you for subscribing" in response.content @@ -470,7 +466,6 @@ def test_lang_via_accept_language(self): "country": "", "source_url": "", }, - start_time=ANY, ) def test_lang_instead_of_accept_language(self): @@ -500,7 +495,6 @@ def test_lang_instead_of_accept_language(self): "country": "", "source_url": "", }, - start_time=ANY, ) def test_lang_default_if_unsupported(self): @@ -529,7 +523,6 @@ def test_lang_default_if_unsupported(self): "country": "", "source_url": "", }, - start_time=ANY, ) def test_source_url_from_referrer(self): @@ -552,7 +545,6 @@ def test_source_url_from_referrer(self): "country": "", "source_url": "https://example.com/bowling", }, - start_time=ANY, ) def test_source_url_from_invalid_referrer(self): @@ -575,7 +567,6 @@ def test_source_url_from_invalid_referrer(self): "country": "", "source_url": "", }, - start_time=ANY, ) def test_source_url_overrides_referrer(self): @@ -603,7 +594,6 @@ def test_source_url_overrides_referrer(self): "country": "", "source_url": "https://example.com/abiding", }, - start_time=ANY, ) def test_multiple_newsletters(self): @@ -631,7 +621,6 @@ def test_multiple_newsletters(self): "country": "", "source_url": "", }, - start_time=ANY, ) def test_blocked_email(self): diff --git a/basket/news/views.py b/basket/news/views.py index 75fb234ea..299d1e1e3 100644 --- a/basket/news/views.py +++ b/basket/news/views.py @@ -1,6 +1,5 @@ import os import re -from time import time from urllib.parse import urlencode from django.conf import settings @@ -229,7 +228,7 @@ def confirm(request, token): increment=True, ): raise Ratelimited() - confirm_user.delay(token, start_time=time()) + confirm_user.delay(token) return HttpResponseJSON({"status": "ok"}) @@ -465,7 +464,7 @@ def subscribe_main(request): return respond_error(request, form, "Rate limit reached", 429) try: - upsert_user.delay(SUBSCRIBE, data, start_time=time()) + upsert_user.delay(SUBSCRIBE, data) except Exception: return respond_error(request, form, "Unknown error", 500) @@ -928,7 +927,7 @@ def update_user_task(request, api_call_type, data=None, optin=False, sync=False) statsd.incr("news.views.subscribe.sync") if settings.MAINTENANCE_MODE and not settings.MAINTENANCE_READ_ONLY: # save what we can - upsert_user.delay(api_call_type, data, start_time=time()) + upsert_user.delay(api_call_type, data) # have to error since we can't return a token return HttpResponseJSON( { @@ -959,5 +958,5 @@ def update_user_task(request, api_call_type, data=None, optin=False, sync=False) token, created = upsert_contact(api_call_type, data, user_data) return HttpResponseJSON({"status": "ok", "token": token, "created": created}) else: - upsert_user.delay(api_call_type, data, start_time=time()) + upsert_user.delay(api_call_type, data) return HttpResponseJSON({"status": "ok"}) diff --git a/basket/settings.py b/basket/settings.py index 61e8f67ac..df6ef0c5d 100644 --- a/basket/settings.py +++ b/basket/settings.py @@ -2,7 +2,6 @@ import platform import socket import struct -import sys from datetime import timedelta from pathlib import Path @@ -11,10 +10,10 @@ import sentry_sdk from decouple import Csv, UndefinedValueError, config from sentry_processor import DesensitizationProcessor -from sentry_sdk.integrations.celery import CeleryIntegration from sentry_sdk.integrations.django import DjangoIntegration from sentry_sdk.integrations.logging import ignore_logger from sentry_sdk.integrations.redis import RedisIntegration +from sentry_sdk.integrations.rq import RqIntegration # Application version. VERSION = (0, 1) @@ -29,6 +28,7 @@ def path(*args): DEBUG = config("DEBUG", default=False, cast=bool) +TESTING = config("TESTING", default=False, cast=bool) ADMINS = ( # ('Your Name', 'your_email@domain.com'), @@ -51,9 +51,9 @@ def path(*args): if REDIS_URL: REDIS_URL = REDIS_URL.rstrip("/0") HIREDIS_URL = REDIS_URL.replace("redis://", "hiredis://") - # use redis for celery and cache - os.environ["CELERY_BROKER_URL"] = REDIS_URL + "/" + config("REDIS_CELERY_DB", "0") + # use redis for cache and rq. os.environ["CACHE_URL"] = HIREDIS_URL + "/" + config("REDIS_CACHE_DB", "1") + os.environ["RQ_URL"] = REDIS_URL + "/" + config("REDIS_RQ_DB", "2") # Production uses MySQL, but Sqlite should be sufficient for local development. # Our CI server tests against MySQL. @@ -244,9 +244,6 @@ def path(*args): # view rate limiting RATELIMIT_VIEW = "basket.news.views.ratelimited" -KOMBU_FERNET_KEY = config("KOMBU_FERNET_KEY", None) -# for key rotation -KOMBU_FERNET_KEY_PREVIOUS = config("KOMBU_FERNET_KEY_PREVIOUS", None) CELERY_TASK_ALWAYS_EAGER = config("CELERY_TASK_ALWAYS_EAGER", DEBUG, cast=bool) CELERY_TASK_SERIALIZER = "json" CELERY_TASK_ACKS_LATE = config("CELERY_TASK_ACKS_LATE", True, cast=bool) @@ -277,6 +274,12 @@ def path(*args): cast=int, ) +# RQ configuration. +RQ_MAX_RETRY_DELAY = 34 * 60 * 60 # 34 hours in seconds. +RQ_MAX_RETRIES = 0 if TESTING else 2 +RQ_EXCEPTION_HANDLERS = ["basket.base.rq.store_task_exception_handler"] +RQ_IS_ASYNC = False if TESTING else True + SNITCH_ID = config("SNITCH_ID", None) CELERY_BEAT_SCHEDULE = {} @@ -349,7 +352,7 @@ def before_send(event, hint): dsn=config("SENTRY_DSN", None), release=config("GIT_SHA", None), server_name=".".join(x for x in [K8S_NAMESPACE, CLUSTER_NAME, HOSTNAME] if x), - integrations=[CeleryIntegration(), DjangoIntegration(), RedisIntegration()], + integrations=[DjangoIntegration(signals_spans=False), RedisIntegration(), RqIntegration()], before_send=before_send, ) @@ -399,11 +402,14 @@ def before_send(event, hint): # regardless of their existence in the DB EXTRA_SUPPORTED_LANGS = config("EXTRA_SUPPORTED_LANGS", "", cast=Csv()) -TESTING_EMAIL_DOMAINS = config( - "TESTING_EMAIL_DOMAINS", - "restmail.net,restmail.lcip.org,example.com", - cast=Csv(), -) +if TESTING: + TESTING_EMAIL_DOMAINS = config( + "TESTING_EMAIL_DOMAINS", + "restmail.net,restmail.lcip.org,example.com", + cast=Csv(), + ) +else: + TESTING_EMAIL_DOMAINS = [] MAINTENANCE_MODE = config("MAINTENANCE_MODE", False, cast=bool) QUEUE_BATCH_SIZE = config("QUEUE_BATCH_SIZE", 500, cast=int) @@ -465,8 +471,3 @@ def before_send(event, hint): OIDC_CREATE_USER = config("OIDC_CREATE_USER", default=False, cast=bool) MIDDLEWARE += ("basket.news.middleware.OIDCSessionRefreshMiddleware",) LOGIN_REDIRECT_URL = "/admin/" - -if sys.argv[0].endswith("py.test") or sys.argv[0].endswith("pytest") or (len(sys.argv) > 1 and sys.argv[1] == "test"): - # stuff that's absolutely required for a test run - CELERY_TASK_ALWAYS_EAGER = True - TESTING_EMAIL_DOMAINS = [] diff --git a/docker-compose.yml b/docker-compose.yml index 7f98ebd1d..e50689a28 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,16 +19,9 @@ services: platform: linux/amd64 volumes: - .:/app - environment: - - DATABASE_URL=mysql://root@db/basket - - DEBUG=True - - ALLOWED_HOSTS=localhost,127.0.0.1, - - REDIS_URL=redis://redis:6379 - - CELERY_TASK_ALWAYS_EAGER=False - - CELERY_LOG_LEVEL=debug - - DJANGO_LOG_LEVEL=DEBUG - - URLWAIT_TIMEOUT=30 - env_file: .env + env_file: + - docker/envfiles/local.env + - .env ports: - "8000:8000" depends_on: @@ -42,16 +35,9 @@ services: platform: linux/amd64 volumes: - .:/app - environment: - - DATABASE_URL=mysql://root@db/basket - - DEBUG=True - - ALLOWED_HOSTS=localhost,127.0.0.1, - - REDIS_URL=redis://redis:6379 - - CELERY_TASK_ALWAYS_EAGER=False - - CELERY_LOG_LEVEL=debug - - DJANGO_LOG_LEVEL=DEBUG - - URLWAIT_TIMEOUT=30 - env_file: .env + env_file: + - docker/envfiles/local.env + - .env depends_on: - db - redis @@ -63,16 +49,9 @@ services: platform: linux/amd64 volumes: - .:/app - environment: - - DATABASE_URL=mysql://root@db/basket - - DEBUG=True - - ALLOWED_HOSTS=localhost,127.0.0.1, - - CELERY_TASK_ALWAYS_EAGER=True - - CELERY_LOG_LEVEL=debug - - DJANGO_LOG_LEVEL=DEBUG - - URLWAIT_TIMEOUT=30 - - REDIS_URL=redis://redis:6379 - env_file: docker/envfiles/test.env + env_file: + - docker/envfiles/local.env + - docker/envfiles/test.env depends_on: - db - redis @@ -82,16 +61,9 @@ services: test-image: image: mozmeao/basket:${GIT_COMMIT_SHORT:-latest} platform: linux/amd64 - environment: - - DATABASE_URL=mysql://root@db/basket - - DEBUG=True - - ALLOWED_HOSTS=localhost,127.0.0.1, - - CELERY_TASK_ALWAYS_EAGER=True - - CELERY_LOG_LEVEL=debug - - DJANGO_LOG_LEVEL=DEBUG - - URLWAIT_TIMEOUT=30 - - REDIS_URL=redis://redis:6379 - env_file: docker/envfiles/test.env + env_file: + - docker/envfiles/local.env + - docker/envfiles/test.env depends_on: - db - redis diff --git a/docker/envfiles/local.env b/docker/envfiles/local.env new file mode 100644 index 000000000..6403ad2de --- /dev/null +++ b/docker/envfiles/local.env @@ -0,0 +1,6 @@ +ALLOWED_HOSTS=localhost,127.0.0.1, +DATABASE_URL=mysql://root@db/basket +DEBUG=True +DJANGO_LOG_LEVEL=DEBUG +REDIS_URL=redis://redis:6379 +URLWAIT_TIMEOUT=30 diff --git a/docker/envfiles/test.env b/docker/envfiles/test.env index 624c8c873..9ce7cf2f9 100644 --- a/docker/envfiles/test.env +++ b/docker/envfiles/test.env @@ -1,7 +1,7 @@ +ADMINS=["thedude@example.com"] +ALLOWED_HOSTS=* +DATABASE_URL=sqlite:///basket.db DEBUG=False DEV=False -ALLOWED_HOSTS=* SECRET_KEY=ssssssssshhhhhhhhhh -ADMINS=["thedude@example.com"] -CELERY_TASK_ALWAYS_EAGER=True -DATABASE_URL=sqlite:///basket.db +TESTING=True diff --git a/pyproject.toml b/pyproject.toml index 082f98d9e..afdea8ea5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,3 +47,6 @@ django = ["django"] [tool.pytest.ini_options] addopts = "-ra --ignore=vendor" DJANGO_SETTINGS_MODULE = "basket.settings" +env = [ + "TESTING=True" +] diff --git a/requirements/dev.in b/requirements/dev.in index 6c47416a8..aca7eb487 100644 --- a/requirements/dev.in +++ b/requirements/dev.in @@ -4,5 +4,6 @@ black==23.3.0 pytest-cov==4.0.0 pytest-datadir==1.4.1 pytest-django==4.5.2 +pytest-env==0.8.1 ruff==0.0.269 urlwait==1.0 diff --git a/requirements/dev.txt b/requirements/dev.txt index 5a60f2894..2bf013dfa 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -266,7 +266,10 @@ charset-normalizer==3.0.1 \ click==8.1.3 \ --hash=sha256:7682dc8afb30297001674575ea00d1814d808d6a36af415a82bd481d37ba7b8e \ --hash=sha256:bb4d8133cb15a609f44e8213d9b391b0809795062913b383c62be0ee95b1db48 - # via black + # via + # -r requirements/prod.txt + # black + # rq configparser==5.3.0 \ --hash=sha256:8be267824b541c09b08db124917f48ab525a6c3e837011f3130781a224c57090 \ --hash=sha256:b065779fd93c6bf4cee42202fa4351b4bb842e96a3fb469440e484517a49b9fa @@ -328,6 +331,11 @@ coverage[toml]==7.1.0 \ --hash=sha256:f5b4198d85a3755d27e64c52f8c95d6333119e49fd001ae5798dac872c95e0f8 \ --hash=sha256:ffeeb38ee4a80a30a6877c5c4c359e5498eec095878f1581453202bfacc8fbc2 # via pytest-cov +crontab==1.0.1 \ + --hash=sha256:89477e3f93c81365e738d5ee2659509e6373bb2846de13922663e79aa74c6b91 + # via + # -r requirements/prod.txt + # rq-scheduler cryptography==40.0.2 \ --hash=sha256:05dc219433b14046c476f6f09d7636b92a1c3e5808b9a6536adf4932b3b2c440 \ --hash=sha256:0dcca15d3a19a66e63662dc8d30f8036b07be851a8680eda92d079868f106288 \ @@ -426,6 +434,12 @@ exceptiongroup==1.1.0 \ --hash=sha256:327cbda3da756e2de031a3107b81ab7b3770a602c4d16ca618298c526f4bec1e \ --hash=sha256:bcb67d800a4497e1b404c2dd44fca47d3b7a5e5433dbab67f96c1a685cdfdf23 # via pytest +freezegun==1.2.2 \ + --hash=sha256:cd22d1ba06941384410cd967d8a99d5ae2442f57dfafeff2fda5de8dc5c05446 \ + --hash=sha256:ea1b963b993cb9ea195adbd893a48d573fda951b0da64f60883d7e988b606c9f + # via + # -r requirements/prod.txt + # rq-scheduler greenlet==0.4.17 \ --hash=sha256:1023d7b43ca11264ab7052cb09f5635d4afdb43df55e0854498fc63070a0b206 \ --hash=sha256:124a3ae41215f71dc91d1a3d45cbf2f84e46b543e5d60b99ecc20e24b4c8f272 \ @@ -765,6 +779,7 @@ pytest==7.2.1 \ # pytest-cov # pytest-datadir # pytest-django + # pytest-env pytest-cov==4.0.0 \ --hash=sha256:2feb1b751d66a8bd934e5edfa2e961d11309dc37b73b0eabe73b5945fee20f6b \ --hash=sha256:996b79efde6433cdbd0088872dbc5fb3ed7fe1578b68cdbba634f14bb8dd0470 @@ -777,12 +792,18 @@ pytest-django==4.5.2 \ --hash=sha256:c60834861933773109334fe5a53e83d1ef4828f2203a1d6a0fa9972f4f75ab3e \ --hash=sha256:d9076f759bb7c36939dbdd5ae6633c18edfc2902d1a69fdbefd2426b970ce6c2 # via -r requirements/dev.in +pytest-env==0.8.1 \ + --hash=sha256:8c0605ae09a5b7e41c20ebcc44f2c906eea9654095b4b0c342b3814bcc3a8492 \ + --hash=sha256:d7b2f5273ec6d1e221757998bc2f50d2474ed7d0b9331b92556011fadc4e9abf + # via -r requirements/dev.in python-dateutil==2.8.2 \ --hash=sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86 \ --hash=sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9 # via # -r requirements/prod.txt # botocore + # freezegun + # rq-scheduler python-decouple==3.8 \ --hash=sha256:ba6e2657d4f376ecc46f77a3a615e058d93ba5e465c01bbe57289bfb7cce680f \ --hash=sha256:d0d45340815b25f4de59c974b855bb38d03151d81b037d9e3f463b0c9f8cbd66 @@ -849,6 +870,7 @@ redis==4.5.5 \ # via # -r requirements/prod.txt # django-redis + # rq requests==2.28.2 \ --hash=sha256:64299f4909223da747622c030b781c0d7811e359c37124b4bd368fb8c6518baa \ --hash=sha256:98b1b2782e3c6c4904938b84c0eb932721069dfdb9134313beff7c83c2df24bf @@ -866,6 +888,16 @@ requests-oauthlib==1.3.1 \ # via # -r requirements/prod.txt # pysilverpop +rq==1.14.1 \ + --hash=sha256:37e003db1da205e08db6cc4653b7c6ccfd9292000954240308abfce2ebde43ba \ + --hash=sha256:5fb86038922ddd76eb2d9aa0adeec6dcf64f159dbbe730b26358b1417120dd44 + # via + # -r requirements/prod.txt + # rq-scheduler +rq-scheduler==0.13.1 \ + --hash=sha256:89d6a18f215536362b22c0548db7dbb8678bc520c18dc18a82fd0bb2b91695ce \ + --hash=sha256:c2b19c3aedfc7de4d405183c98aa327506e423bf4cdc556af55aaab9bbe5d1a1 + # via -r requirements/prod.txt ruff==0.0.269 \ --hash=sha256:03ff42bc91ceca58e0f0f072cb3f9286a9208f609812753474e799a997cdad1a \ --hash=sha256:11ddcfbab32cf5c420ea9dd5531170ace5a3e59c16d9251c7bd2581f7b16f602 \ diff --git a/requirements/prod.in b/requirements/prod.in index 36bbde09f..8558e1b04 100644 --- a/requirements/prod.in +++ b/requirements/prod.in @@ -1,7 +1,6 @@ APScheduler==3.10.1 backports.zoneinfo==0.2.1 boto3==1.26.137 -celery==4.4.7 configparser==5.3.0 contextlib2==21.6.0 cryptography==40.0.2 @@ -33,6 +32,8 @@ pysilverpop==0.2.6 python-decouple==3.8 PyYAML==6.0 redis==4.5.5 +rq==1.14.1 +rq-scheduler==0.13.1 sentry-processor==0.0.1 sentry-sdk==1.22.2 user-agents==2.2.0 diff --git a/requirements/prod.txt b/requirements/prod.txt index 758d515a5..d9e2300ea 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -218,6 +218,10 @@ charset-normalizer==3.0.1 \ --hash=sha256:f9d0c5c045a3ca9bedfc35dca8526798eb91a07aa7a2c0fee134c6c6f321cbd7 \ --hash=sha256:ff6f3db31555657f3163b15a6b7c6938d08df7adbfc9dd13d9d19edad678f1e8 # via requests +click==8.1.3 \ + --hash=sha256:7682dc8afb30297001674575ea00d1814d808d6a36af415a82bd481d37ba7b8e \ + --hash=sha256:bb4d8133cb15a609f44e8213d9b391b0809795062913b383c62be0ee95b1db48 + # via rq configparser==5.3.0 \ --hash=sha256:8be267824b541c09b08db124917f48ab525a6c3e837011f3130781a224c57090 \ --hash=sha256:b065779fd93c6bf4cee42202fa4351b4bb842e96a3fb469440e484517a49b9fa @@ -226,6 +230,9 @@ contextlib2==21.6.0 \ --hash=sha256:3fbdb64466afd23abaf6c977627b75b6139a5a3e8ce38405c5b413aed7a0471f \ --hash=sha256:ab1e2bfe1d01d968e1b7e8d9023bc51ef3509bba217bb730cee3827e1ee82869 # via -r requirements/prod.in +crontab==1.0.1 \ + --hash=sha256:89477e3f93c81365e738d5ee2659509e6373bb2846de13922663e79aa74c6b91 + # via rq-scheduler cryptography==40.0.2 \ --hash=sha256:05dc219433b14046c476f6f09d7636b92a1c3e5808b9a6536adf4932b3b2c440 \ --hash=sha256:0dcca15d3a19a66e63662dc8d30f8036b07be851a8680eda92d079868f106288 \ @@ -318,6 +325,10 @@ email-validator==1.3.1 \ --hash=sha256:49a72f5fa6ed26be1c964f0567d931d10bf3fdeeacdf97bc26ef1cd2a44e0bda \ --hash=sha256:d178c5c6fa6c6824e9b04f199cf23e79ac15756786573c190d2ad13089411ad2 # via -r requirements/prod.in +freezegun==1.2.2 \ + --hash=sha256:cd22d1ba06941384410cd967d8a99d5ae2442f57dfafeff2fda5de8dc5c05446 \ + --hash=sha256:ea1b963b993cb9ea195adbd893a48d573fda951b0da64f60883d7e988b606c9f + # via rq-scheduler greenlet==0.4.17 \ --hash=sha256:1023d7b43ca11264ab7052cb09f5635d4afdb43df55e0854498fc63070a0b206 \ --hash=sha256:124a3ae41215f71dc91d1a3d45cbf2f84e46b543e5d60b99ecc20e24b4c8f272 \ @@ -611,7 +622,10 @@ pysilverpop==0.2.6 \ python-dateutil==2.8.2 \ --hash=sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86 \ --hash=sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9 - # via botocore + # via + # botocore + # freezegun + # rq-scheduler python-decouple==3.8 \ --hash=sha256:ba6e2657d4f376ecc46f77a3a615e058d93ba5e465c01bbe57289bfb7cce680f \ --hash=sha256:d0d45340815b25f4de59c974b855bb38d03151d81b037d9e3f463b0c9f8cbd66 @@ -675,6 +689,7 @@ redis==4.5.5 \ # via # -r requirements/prod.in # django-redis + # rq requests==2.28.2 \ --hash=sha256:64299f4909223da747622c030b781c0d7811e359c37124b4bd368fb8c6518baa \ --hash=sha256:98b1b2782e3c6c4904938b84c0eb932721069dfdb9134313beff7c83c2df24bf @@ -689,6 +704,16 @@ requests-oauthlib==1.3.1 \ --hash=sha256:2577c501a2fb8d05a304c09d090d6e47c306fef15809d102b327cf8364bddab5 \ --hash=sha256:75beac4a47881eeb94d5ea5d6ad31ef88856affe2332b9aafb52c6452ccf0d7a # via pysilverpop +rq==1.14.1 \ + --hash=sha256:37e003db1da205e08db6cc4653b7c6ccfd9292000954240308abfce2ebde43ba \ + --hash=sha256:5fb86038922ddd76eb2d9aa0adeec6dcf64f159dbbe730b26358b1417120dd44 + # via + # -r requirements/prod.in + # rq-scheduler +rq-scheduler==0.13.1 \ + --hash=sha256:89d6a18f215536362b22c0548db7dbb8678bc520c18dc18a82fd0bb2b91695ce \ + --hash=sha256:c2b19c3aedfc7de4d405183c98aa327506e423bf4cdc556af55aaab9bbe5d1a1 + # via -r requirements/prod.in s3transfer==0.6.0 \ --hash=sha256:06176b74f3a15f61f1b4f25a1fc29a4429040b7647133a463da8fa5bd28d5ecd \ --hash=sha256:2ed07d3866f523cc561bf4a00fc5535827981b117dd7876f036b0c1aca42c947