From bf373e5a831f28c44a3d33d7f94f6cf94b95c1a9 Mon Sep 17 00:00:00 2001 From: Rob Hudson Date: Mon, 26 Jun 2023 08:50:50 -0700 Subject: [PATCH] Transition basket tasks to use rq --- .coveragerc | 2 + .demo_env | 1 - .dockerignore | 1 - .gitignore | 1 - Makefile | 2 +- Procfile | 4 - basket/base/decorators.py | 55 ++++ basket/base/exceptions.py | 15 ++ basket/base/management/commands/rqworker.py | 37 +++ basket/base/management/commands/snitch.py | 10 + basket/base/rq.py | 233 +++++++++++++++++ basket/base/tasks.py | 16 ++ basket/base/tests/tasks.py | 11 + basket/base/tests/test_decorator.py | 95 +++++++ basket/base/tests/test_rq_utils.py | 238 +++++++++++++++++ basket/base/tests/test_tasks.py | 46 ++++ basket/base/utils.py | 4 +- basket/news/admin.py | 11 +- basket/news/apps.py | 2 +- basket/news/celery.py | 75 ------ .../commands/process_maintenance_queue.py | 8 +- basket/news/models.py | 36 +-- basket/news/tasks.py | 247 ++---------------- basket/news/tests/test_confirm.py | 13 - basket/news/tests/test_models.py | 77 ++---- basket/news/tests/test_tasks.py | 80 ++---- basket/news/tests/test_update_user_task.py | 16 +- basket/news/tests/test_views.py | 13 +- basket/news/urls.py | 2 - basket/news/views.py | 21 +- basket/settings.py | 91 +++---- bin/run-clock.sh | 3 - bin/run-dev.sh | 4 +- bin/run-worker.sh | 7 +- docker-compose.yml | 52 +--- docker/envfiles/local.env | 6 + docker/envfiles/test.env | 9 +- docs/install.rst | 23 +- docs/newsletter_api.rst | 73 +++--- env-dist | 5 +- newrelic.ini | 2 +- pyproject.toml | 17 +- requirements/dev.txt | 59 ++--- requirements/prod.in | 3 +- requirements/prod.txt | 50 ++-- 45 files changed, 1014 insertions(+), 762 deletions(-) delete mode 100644 Procfile create mode 100644 basket/base/decorators.py create mode 100644 basket/base/exceptions.py create mode 100644 basket/base/management/commands/rqworker.py create mode 100644 basket/base/management/commands/snitch.py create mode 100644 basket/base/rq.py create mode 100644 basket/base/tasks.py create mode 100644 basket/base/tests/tasks.py create mode 100644 basket/base/tests/test_decorator.py create mode 100644 basket/base/tests/test_rq_utils.py create mode 100644 basket/base/tests/test_tasks.py delete mode 100644 basket/news/celery.py delete mode 100755 bin/run-clock.sh create mode 100644 docker/envfiles/local.env diff --git a/.coveragerc b/.coveragerc index 3d742be97..cad0e5dba 100644 --- a/.coveragerc +++ b/.coveragerc @@ -2,7 +2,9 @@ source = basket omit = manage.py + basket/settings.py basket/wsgi.py + basket/base/tests/* basket/news/migrations/* basket/news/tests/* diff --git a/.demo_env b/.demo_env index 6428afbf9..973947ec6 100755 --- a/.demo_env +++ b/.demo_env @@ -1,4 +1,3 @@ DEBUG=False DATABASE_URL=sqlite:///basket.db -CELERY_TASK_ALWAYS_EAGER=True SECRET_KEY=sssssssssshhhhhhhhhhh diff --git a/.dockerignore b/.dockerignore index 2fdd8b29b..c18e6c67e 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,4 +1,3 @@ .git .env -celerybeat-schedule *.db diff --git a/.gitignore b/.gitignore index d499ba133..e7a913a8f 100644 --- a/.gitignore +++ b/.gitignore @@ -16,7 +16,6 @@ /static Dockerfile.jenkins build.py -celerybeat-schedule docs/_build docs/_gh-pages pip-log.txt diff --git a/Makefile b/Makefile index caff9a61b..58b6a12d9 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ pull: .env @touch .make.docker.pull run: .make.docker.pull - ${DC} up web + ${DC} up web worker run-shell: ${DC} run --rm web bash diff --git a/Procfile b/Procfile deleted file mode 100644 index a507ef50f..000000000 --- a/Procfile +++ /dev/null @@ -1,4 +0,0 @@ -web: bin/run-prod.sh -worker: bin/run-worker.sh -fxaeventsworker: bin/run-fxa-events-worker.sh -clock: bin/run-clock.sh diff --git a/basket/base/decorators.py b/basket/base/decorators.py new file mode 100644 index 000000000..b47253f07 --- /dev/null +++ b/basket/base/decorators.py @@ -0,0 +1,55 @@ +import functools + +from django.conf import settings + +from django_statsd.clients import statsd +from rq.decorators import job as rq_job + +from basket.base.rq import enqueue_kwargs, get_queue + + +def rq_task(func): + """ + Decorator to standardize RQ tasks. + + Uses RQ's job decorator, but: + - uses our default queue and connection + - adds retry logic with exponential backoff + - adds success/failure/retry callbacks + - adds statsd metrics for job success/failure/retry + - adds Sentry error reporting for failed jobs + + """ + task_name = f"{func.__module__}.{func.__qualname__}" + + queue = get_queue() + connection = queue.connection + + kwargs = enqueue_kwargs(func) + + @rq_job( + queue, + connection=connection, + **kwargs, + ) + @functools.wraps(func) + def wrapped(*args, **kwargs): + # If in maintenance mode, queue the task for later. + if settings.MAINTENANCE_MODE: + if settings.READ_ONLY_MODE: + statsd.incr(f"{task_name}.not_queued") + else: + from basket.news.models import QueuedTask + + QueuedTask.objects.create( + name=task_name, + args=args, + kwargs=kwargs, + ) + statsd.incr(f"{task_name}.queued") + return + + # NOTE: Exceptions are handled with the RQ_EXCEPTION_HANDLERS + return func(*args, **kwargs) + + return wrapped diff --git a/basket/base/exceptions.py b/basket/base/exceptions.py new file mode 100644 index 000000000..00c37762a --- /dev/null +++ b/basket/base/exceptions.py @@ -0,0 +1,15 @@ +class BasketError(Exception): + """ + Tasks can raise this when an error happens that we should not retry. + + E.g. if the error indicates we're passing bad parameters, as opposed to an + error where we'd typically raise `NewsletterException`. + """ + + pass + + +class RetryTask(Exception): + """An exception to raise within a task if you just want to retry.""" + + pass diff --git a/basket/base/management/commands/rqworker.py b/basket/base/management/commands/rqworker.py new file mode 100644 index 000000000..ba2a3ffbd --- /dev/null +++ b/basket/base/management/commands/rqworker.py @@ -0,0 +1,37 @@ +import sys + +from django.core.management.base import BaseCommand + +from basket.base.rq import get_worker + + +class Command(BaseCommand): + def add_arguments(self, parser): + parser.add_argument( + "-b", + "--burst", + action="store_true", + dest="burst", + default=False, + help="Run worker in burst mode and quit when queues are empty. Default: False", + ) + parser.add_argument( + "-s", + "--with-scheduler", + action="store_true", + dest="with_scheduler", + default=False, + help="Run worker with scheduler enabled. Default: False", + ) + + def handle(self, *args, **options): + kwargs = { + "burst": options.get("burst", False), + "with_scheduler": options.get("with_scheduler", False), + } + try: + worker = get_worker() + worker.work(**kwargs) + except ConnectionError as e: + self.stderr.write(str(e)) + sys.exit(1) diff --git a/basket/base/management/commands/snitch.py b/basket/base/management/commands/snitch.py new file mode 100644 index 000000000..0ee82ca15 --- /dev/null +++ b/basket/base/management/commands/snitch.py @@ -0,0 +1,10 @@ +from time import time + +from django.core.management.base import BaseCommand + +from basket.base.tasks import snitch + + +class Command(BaseCommand): + def handle(self, *args, **options): + snitch.delay(time()) diff --git a/basket/base/rq.py b/basket/base/rq.py new file mode 100644 index 000000000..a6fa4ed87 --- /dev/null +++ b/basket/base/rq.py @@ -0,0 +1,233 @@ +import random +import re +import traceback +from time import time + +from django.conf import settings + +import redis +import requests +import sentry_sdk +from django_statsd.clients import statsd +from rq import Callback, Retry, SimpleWorker +from rq.queue import Queue +from rq.serializers import JSONSerializer +from silverpop.api import SilverpopResponseException + +from basket.base.exceptions import RetryTask +from basket.news.backends.common import NewsletterException + +# don't propagate and don't retry if these are the error messages +IGNORE_ERROR_MSGS = [ + "INVALID_EMAIL_ADDRESS", + "InvalidEmailAddress", + "An invalid phone number was provided", + "No valid subscribers were provided", + "There are no valid subscribers", + "email address is suppressed", + "invalid email address", +] +# don't propagate and don't retry if these regex match the error messages +IGNORE_ERROR_MSGS_RE = [re.compile(r"campaignId \d+ not found")] +# Exceptions we allow to retry, all others will abort retries. +EXCEPTIONS_ALLOW_RETRY = [ + IOError, + NewsletterException, + requests.RequestException, + RetryTask, + SilverpopResponseException, +] + + +# Our cached Redis connection. +_REDIS_CONN = None + + +def get_redis_connection(url=None, force=False): + """ + Get a Redis connection. + + Expects a URL including the db, or defaults to `settings.RQ_URL`. + + Call example: + get_redis_connection("redis://localhost:6379/0") + + """ + global _REDIS_CONN + + if force or _REDIS_CONN is None: + if url is None: + if settings.RQ_URL is None: + # Note: RQ_URL is derived from REDIS_URL. + raise ValueError("No `settings.REDIS_URL` specified") + url = settings.RQ_URL + _REDIS_CONN = redis.Redis.from_url(url) + + return _REDIS_CONN + + +def get_queue(queue="default"): + """ + Get an RQ queue with our chosen parameters. + + """ + return Queue( + queue, + connection=get_redis_connection(), + is_async=settings.RQ_IS_ASYNC, + serializer=JSONSerializer, + ) + + +def get_worker(queues=None): + """ + Get an RQ worker with our chosen parameters. + + """ + if queues is None: + queues = ["default"] + + return SimpleWorker( + queues, + connection=get_redis_connection(), + disable_default_exception_handler=True, + exception_handlers=[store_task_exception_handler], + serializer=JSONSerializer, + ) + + +def enqueue_kwargs(func): + if isinstance(func, str): + task_name = func + else: + task_name = f"{func.__module__}.{func.__qualname__}" + + # Start time is used to calculate the total time taken by the task, which includes queue time plus execution time of the task itself. + meta = { + "task_name": task_name, + "start_time": time(), + } + + if settings.RQ_MAX_RETRIES == 0: + retry = None + else: + retry = Retry(settings.RQ_MAX_RETRIES, rq_exponential_backoff()) + + return { + "meta": meta, + "retry": retry, + "result_ttl": settings.RQ_RESULT_TTL, + "on_success": Callback(rq_on_success), + "on_failure": Callback(rq_on_failure), + } + + +def rq_exponential_backoff(): + """ + Return an array of retry delays for RQ using an exponential back-off, using + jitter to even out the spikes, waiting at least 1 minute between retries. + """ + if settings.DEBUG: + # While debugging locally, enable faster retries. + return [5 for n in range(settings.RQ_MAX_RETRIES)] + else: + return [max(60, random.randrange(min(settings.RQ_MAX_RETRY_DELAY, 120 * (2**n)))) for n in range(settings.RQ_MAX_RETRIES)] + + +def log_timing(job): + if start_time := job.meta.get("start_time"): + total_time = int(time() - start_time) * 1000 + statsd.timing(f"{job.meta['task_name']}.duration", total_time) + + +def rq_on_success(job, connection, result, *args, **kwargs): + # Don't fire statsd metrics in maintenance mode. + if not settings.MAINTENANCE_MODE: + log_timing(job) + task_name = job.meta["task_name"] + statsd.incr(f"{task_name}.success") + if not task_name.endswith("snitch"): + statsd.incr("news.tasks.success_total") + + +def rq_on_failure(job, connection, *exc_info, **kwargs): + # Don't fire statsd metrics in maintenance mode. + if not settings.MAINTENANCE_MODE: + log_timing(job) + task_name = job.meta["task_name"] + statsd.incr(f"{task_name}.failure") + if not task_name.endswith("snitch"): + statsd.incr("news.tasks.failure_total") + + +def ignore_error(exc, to_ignore=None, to_ignore_re=None): + to_ignore = to_ignore or IGNORE_ERROR_MSGS + to_ignore_re = to_ignore_re or IGNORE_ERROR_MSGS_RE + msg = str(exc) + for ignore_msg in to_ignore: + if ignore_msg in msg: + return True + + for ignore_re in to_ignore_re: + if ignore_re.search(msg): + return True + + return False + + +def store_task_exception_handler(job, *exc_info): + """ + Handler to store task failures in the database. + """ + task_name = job.meta["task_name"] + + if task_name.endswith("snitch"): + return + + # A job will retry if it's failed but not yet reached the max retries. + # We know when a job is going to be retried if the status is `is_scheduled`, otherwise the + # status is set to `is_failed`. + + if job.is_scheduled: + # Job failed but is scheduled for a retry. + statsd.incr(f"{task_name}.retry") + statsd.incr(f"{task_name}.retries_left.{job.retries_left + 1}") + statsd.incr("news.tasks.retry_total") + + if exc_info[1] not in EXCEPTIONS_ALLOW_RETRY: + # Force retries to abort. + # Since there's no way to abort retries at the moment in RQ, we can set the job `retries_left` to zero. + # This will retry this time but no further retries will be performed. + job.retries_left = 0 + + return + + if job.is_failed: + statsd.incr(f"{task_name}.retry_max") + statsd.incr("news.tasks.retry_max_total") + + # Job failed but no retries left. + if settings.STORE_TASK_FAILURES: + # Here to avoid a circular import. + from basket.news.models import FailedTask + + FailedTask.objects.create( + task_id=job.id, + name=job.meta["task_name"], + args=job.args, + kwargs=job.kwargs, + exc=exc_info[1].__repr__(), + einfo="".join(traceback.format_exception(*exc_info)), + ) + + if ignore_error(exc_info[1]): + with sentry_sdk.push_scope() as scope: + scope.set_tag("action", "ignored") + sentry_sdk.capture_exception() + return + + # Don't log to sentry if we explicitly raise `RetryTask`. + if not (isinstance(exc_info[1], RetryTask)): + with sentry_sdk.push_scope() as scope: + scope.set_tag("action", "retried") + sentry_sdk.capture_exception() diff --git a/basket/base/tasks.py b/basket/base/tasks.py new file mode 100644 index 000000000..2c959f826 --- /dev/null +++ b/basket/base/tasks.py @@ -0,0 +1,16 @@ +from time import time + +from django.conf import settings + +import requests + +from basket.base.decorators import rq_task + + +@rq_task +def snitch(start_time): + duration = int((time() - start_time) * 1000) + if settings.SNITCH_ID: + requests.post(f"https://nosnch.in/{settings.SNITCH_ID}", data={"m": duration}) + else: + print(f"Snitch: {duration}ms") diff --git a/basket/base/tests/tasks.py b/basket/base/tests/tasks.py new file mode 100644 index 000000000..717fea45a --- /dev/null +++ b/basket/base/tests/tasks.py @@ -0,0 +1,11 @@ +from basket.base.decorators import rq_task + + +@rq_task +def failing_job(arg1, **kwargs): + raise ValueError("An exception to trigger the failure handler.") + + +@rq_task +def empty_job(arg1, **kwargs): + pass diff --git a/basket/base/tests/test_decorator.py b/basket/base/tests/test_decorator.py new file mode 100644 index 000000000..901933828 --- /dev/null +++ b/basket/base/tests/test_decorator.py @@ -0,0 +1,95 @@ +from unittest.mock import patch + +from django.test import TestCase +from django.test.utils import override_settings + +from basket.base.decorators import rq_task +from basket.base.rq import get_worker +from basket.base.tests.tasks import empty_job +from basket.news.models import QueuedTask + + +class TestDecorator(TestCase): + @override_settings(RQ_RESULT_TTL=0) + @override_settings(RQ_MAX_RETRIES=0) + @patch("basket.base.decorators.rq_job") + @patch("basket.base.rq.Callback") + @patch("basket.base.rq.Queue") + @patch("basket.base.rq.time") + def test_rq_task( + self, + mock_time, + mock_queue, + mock_callback, + mock_rq_job, + ): + """ + Test that the decorator passes the correct arguments to the RQ job. + """ + mock_time.return_value = 123456789 + mock_queue.connection.return_value = "connection" + + @rq_task + def test_func(): + pass + + mock_rq_job.assert_called_once_with( + mock_queue(), + connection=mock_queue().connection, + meta={ + "task_name": f"{self.__module__}.TestDecorator.test_rq_task..test_func", + "start_time": 123456789, + }, + retry=None, # Retry logic is tested above, so no need to test it here. + result_ttl=0, + on_failure=mock_callback(), + on_success=mock_callback(), + ) + + @override_settings(MAINTENANCE_MODE=True) + @override_settings(READ_ONLY_MODE=False) + @patch("basket.base.decorators.statsd") + def test_maintenance_mode_no_readonly(self, mock_statsd): + """ + Test that the decorator doesn't run the task if maintenance mode is on. + """ + + assert QueuedTask.objects.count() == 0 + + empty_job.delay() + + mock_statsd.incr.assert_called_once_with("basket.base.tests.tasks.empty_job.queued") + assert QueuedTask.objects.count() == 1 + + @override_settings(MAINTENANCE_MODE=True) + @override_settings(READ_ONLY_MODE=True) + @patch("basket.base.decorators.statsd") + def test_maintenance_mode_readonly(self, mock_statsd): + """ + Test that the decorator doesn't run the task if maintenance mode is on + and the task isn't queued because we're in READ_ONLY_MODE. + """ + + assert QueuedTask.objects.count() == 0 + + empty_job.delay() + + mock_statsd.incr.assert_called_once_with("basket.base.tests.tasks.empty_job.not_queued") + assert QueuedTask.objects.count() == 0 + + @patch("basket.base.rq.statsd") + def test_job_success(self, mock_statsd): + """ + Test that the decorator marks the job as successful if the task runs + successfully. + """ + empty_job.delay("arg1") + + worker = get_worker() + worker.work(burst=True) # Burst = worker will quit after all jobs consumed. + + assert mock_statsd.incr.call_count == 2 + mock_statsd.incr.assert_any_call("basket.base.tests.tasks.empty_job.success") + mock_statsd.incr.assert_any_call("news.tasks.success_total") + assert mock_statsd.timing.call_count == 1 + assert "basket.base.tests.tasks.empty_job.duration" in mock_statsd.timing.call_args.args[0] diff --git a/basket/base/tests/test_rq_utils.py b/basket/base/tests/test_rq_utils.py new file mode 100644 index 000000000..84cec7ae6 --- /dev/null +++ b/basket/base/tests/test_rq_utils.py @@ -0,0 +1,238 @@ +from unittest.mock import patch + +from django.conf import settings +from django.test import TestCase +from django.test.utils import override_settings + +import pytest +from rq.job import Job, JobStatus +from rq.serializers import JSONSerializer + +from basket.base.rq import ( + IGNORE_ERROR_MSGS, + get_queue, + get_redis_connection, + get_worker, + rq_exponential_backoff, + store_task_exception_handler, +) +from basket.base.tests.tasks import failing_job +from basket.news.models import FailedTask + + +class TestRQUtils(TestCase): + @override_settings(RQ_MAX_RETRIES=10) + def test_rq_exponential_backoff(self): + """ + Test that the exponential backoff function returns the correct number + of retries, with a minimum of 60 seconds between retries. + """ + with patch("basket.base.rq.random") as mock_random: + mock_random.randrange.side_effect = [120 * 2**n for n in range(settings.RQ_MAX_RETRIES)] + self.assertEqual(rq_exponential_backoff(), [120, 240, 480, 960, 1920, 3840, 7680, 15360, 30720, 61440]) + + @override_settings(RQ_MAX_RETRIES=10, DEBUG=True) + def test_rq_exponential_backoff_with_debug(self): + """ + Test that the exponential backoff function returns shorter retries during DEBUG mode. + """ + self.assertEqual(rq_exponential_backoff(), [5, 5, 5, 5, 5, 5, 5, 5, 5, 5]) + + @override_settings(RQ_URL="redis://redis:6379/2") + def test_get_redis_connection(self): + """ + Test that the get_redis_connection function returns a Redis connection with params we expect. + """ + # Test passing a URL explicitly. + connection = get_redis_connection("redis://redis-host:6379/9", force=True) + self.assertDictEqual(connection.connection_pool.connection_kwargs, {"host": "redis-host", "port": 6379, "db": 9}) + + # Test with no URL argument, but with RQ_URL in the settings. + # Note: The RQ_URL being used also sets this back to the "default" for tests that follow. + connection = get_redis_connection(force=True) + self.assertDictEqual(connection.connection_pool.connection_kwargs, {"host": "redis", "port": 6379, "db": 2}) + + def test_get_queue(self): + """ + Test that the get_queue function returns a RQ queue with params we expect. + """ + queue = get_queue() + + assert queue.name == "default" + assert queue._is_async is False # Only during testing. + assert queue.connection == get_redis_connection() + assert queue.serializer == JSONSerializer + + def test_get_worker(self): + """ + Test that the get_worker function returns a RQ worker with params we expect. + """ + worker = get_worker() + assert worker.queues == [get_queue()] + assert worker.disable_default_exception_handler is True + assert worker._exc_handlers == [store_task_exception_handler] + assert worker.serializer == JSONSerializer + + @override_settings(STORE_TASK_FAILURES=True) + @override_settings(RQ_EXCEPTION_HANDLERS=["basket.base.rq.store_task_exception_handler"]) + @patch("basket.base.rq.statsd") + def test_on_failure(self, mock_statsd): + """ + Test that the on_failure function creates a FailedTask object and sends + statsd metrics. + """ + + assert FailedTask.objects.count() == 0 + + args = ["arg1"] + kwargs = {"arg2": "foo"} + job = failing_job.delay(*args, **kwargs) + + worker = get_worker() + worker.work(burst=True) # Burst = worker will quit after all jobs consumed. + + assert job.is_failed + + # TODO: Determine why the `store_task_exception_handler` is not called. + # assert FailedTask.objects.count() == 1 + # fail = FailedTask.objects.get() + # assert fail.name == "news.tasks.failing_job" + # assert fail.task_id is not None + # assert fail.args == args + # assert fail.kwargs == kwargs + # assert fail.exc == 'ValueError("An exception to trigger the failure handler.")' + # assert "Traceback (most recent call last):" in fail.einfo + # assert "ValueError: An exception to trigger the failure handler." in fail.einfo + # assert mock_statsd.incr.call_count == 2 + # assert mock_statsd.incr.assert_any_call("news.tasks.failure_total") + # assert mock_statsd.incr.assert_any_call("news.tasks.failing_job.failure") + + @override_settings(MAINTENANCE_MODE=True) + @patch("basket.base.rq.statsd") + def test_on_failure_maintenance(self, mock_statsd): + """ + Test that the on_failure callback does nothing if we're in maintenance mode. + """ + assert FailedTask.objects.count() == 0 + + failing_job.delay() + + worker = get_worker() + worker.work(burst=True) # Burst = worker will quit after all jobs consumed. + + assert FailedTask.objects.count() == 0 + assert mock_statsd.incr.call_count == 0 + + @override_settings(STORE_TASK_FAILURES=True) + @patch("basket.base.rq.sentry_sdk") + @patch("basket.base.rq.statsd") + def test_rq_exception_handler(self, mock_statsd, mock_sentry_sdk): + """ + Test that the exception handler creates a FailedTask object. + """ + queue = get_queue() + args = ["arg1"] + kwargs = {"kwarg1": "kwarg1"} + + job = Job.create( + func=print, + args=args, + kwargs=kwargs, + connection=queue.connection, + id="job1", + meta={"task_name": "job.failed"}, + ) + assert FailedTask.objects.count() == 0 + + with pytest.raises(ValueError) as e: + # This is only here to generate the exception values. + raise ValueError("This is a fake exception") + job.set_status(JobStatus.FAILED) + + store_task_exception_handler(job, e.type, e.value, e.tb) + + assert FailedTask.objects.count() == 1 + failed_job = FailedTask.objects.get() + assert failed_job.task_id == job.id + assert failed_job.name == "job.failed" + assert failed_job.args == args + assert failed_job.kwargs == kwargs + assert failed_job.exc == "ValueError('This is a fake exception')" + assert "Traceback (most recent call last):" in failed_job.einfo + assert "ValueError: This is a fake exception" in failed_job.einfo + + assert mock_statsd.incr.call_count == 2 + mock_statsd.incr.assert_any_call("job.failed.retry_max") + mock_statsd.incr.assert_any_call("news.tasks.retry_max_total") + + assert mock_sentry_sdk.capture_exception.call_count == 1 + mock_sentry_sdk.push_scope.return_value.__enter__.return_value.set_tag.assert_called_once_with("action", "retried") + + @override_settings(STORE_TASK_FAILURES=False) + @patch("basket.base.rq.sentry_sdk") + @patch("basket.base.rq.statsd") + def test_rq_exception_error_ignore(self, mock_statsd, mock_sentry_sdk): + queue = get_queue() + job = Job.create(func=print, meta={"task_name": "job.ignore_error"}, connection=queue.connection) + job.set_status(JobStatus.FAILED) + + for error_str in IGNORE_ERROR_MSGS: + store_task_exception_handler(job, Exception, Exception(error_str), None) + + assert mock_statsd.incr.call_count == 2 + mock_statsd.incr.assert_any_call("job.ignore_error.retry_max") + mock_statsd.incr.assert_any_call("news.tasks.retry_max_total") + + assert mock_sentry_sdk.capture_exception.call_count == 1 + mock_sentry_sdk.push_scope.return_value.__enter__.return_value.set_tag.assert_called_once_with("action", "ignored") + + mock_statsd.reset_mock() + mock_sentry_sdk.reset_mock() + + # Also test IGNORE_ERROR_MSGS_RE. + store_task_exception_handler(job, Exception, Exception("campaignId 123 not found"), None) + + assert mock_statsd.incr.call_count == 2 + mock_statsd.incr.assert_any_call("job.ignore_error.retry_max") + mock_statsd.incr.assert_any_call("news.tasks.retry_max_total") + + assert mock_sentry_sdk.capture_exception.call_count == 1 + mock_sentry_sdk.push_scope.return_value.__enter__.return_value.set_tag.assert_called_once_with("action", "ignored") + + def test_rq_exception_handler_snitch(self): + """ + Test that the exception handler returns early if it's a snitch job. + """ + queue = get_queue() + job = Job.create(func=print, meta={"task_name": "job.endswith.snitch"}, connection=queue.connection) + + assert FailedTask.objects.count() == 0 + + store_task_exception_handler(job) + + assert FailedTask.objects.count() == 0 + + @patch("basket.base.rq.sentry_sdk") + @patch("basket.base.rq.statsd") + def test_rq_exception_handler_retry(self, mock_statsd, mock_sentry_sdk): + queue = get_queue() + job = Job.create(func=print, meta={"task_name": "job.rescheduled"}, connection=queue.connection) + job.retries_left = 1 + job.retry_intervals = [1] + + with pytest.raises(ValueError) as e: + # This is only here to generate the exception values. + raise ValueError("This is a fake exception") + + job.set_status(JobStatus.SCHEDULED) + + assert FailedTask.objects.count() == 0 + + store_task_exception_handler(job, e.type, e.value, e.tb) + + assert FailedTask.objects.count() == 0 + assert mock_statsd.incr.call_count == 3 + mock_statsd.incr.assert_any_call("job.rescheduled.retry") + mock_statsd.incr.assert_any_call("job.rescheduled.retries_left.2") + mock_statsd.incr.assert_any_call("news.tasks.retry_total") + mock_sentry_sdk.capture_exception.assert_not_called() diff --git a/basket/base/tests/test_tasks.py b/basket/base/tests/test_tasks.py new file mode 100644 index 000000000..cb99a0a5b --- /dev/null +++ b/basket/base/tests/test_tasks.py @@ -0,0 +1,46 @@ +import contextlib +import io +from time import time +from unittest import mock + +from django.test import override_settings + +from freezegun import freeze_time + +from basket.base.rq import get_worker +from basket.base.tasks import snitch + + +@freeze_time("2023-01-02 12:34:56.123456") +@override_settings(SNITCH_ID="999") +@mock.patch("basket.base.tasks.requests.post") +def test_snitch(mock_post): + seconds_ago = time() - 1 + snitch(seconds_ago) + mock_post.assert_called_with("https://nosnch.in/999", data={"m": 1000}) + + +@freeze_time("2023-01-02 12:34:56.123456") +@override_settings(SNITCH_ID="999") +@mock.patch("basket.base.tasks.requests.post") +def test_snitch_with_worker(mock_post): + seconds_ago = time() - 1 + snitch.delay(seconds_ago) + + worker = get_worker() + worker.work(burst=True) # Burst = worker will quit after all jobs consumed. + + mock_post.assert_called_with("https://nosnch.in/999", data={"m": 1000}) + + +@freeze_time("2023-01-02 12:34:56.123456") +@override_settings(SNITCH_ID=None) +@mock.patch("basket.base.tasks.requests.post") +def test_snitch_not_configured(mock_post): + seconds_ago = time() - 1 + + with contextlib.redirect_stdout(io.StringIO()) as f: + snitch(seconds_ago) + + mock_post.assert_not_called() + assert f.getvalue() == "Snitch: 1000ms\n" diff --git a/basket/base/utils.py b/basket/base/utils.py index 38621c2a4..e7375cb8a 100644 --- a/basket/base/utils.py +++ b/basket/base/utils.py @@ -2,10 +2,10 @@ def email_is_testing(email): - """Return true if email address is at a known testing domain""" + # Return true if email address is at a known testing domain if not settings.USE_SANDBOX_BACKEND: for domain in settings.TESTING_EMAIL_DOMAINS: - if email.endswith("@{}".format(domain)): + if email.endswith(f"@{domain}"): return True return False diff --git a/basket/news/admin.py b/basket/news/admin.py index 81566879a..3ef0d63ab 100644 --- a/basket/news/admin.py +++ b/basket/news/admin.py @@ -1,5 +1,6 @@ from django.conf import settings from django.contrib import admin, messages +from django.template.defaultfilters import pluralize from product_details import product_details @@ -159,10 +160,7 @@ def retry_task_action(self, request, queryset): for old_task in queryset: old_task.retry() count += 1 - messages.info( - request, - "Queued %d task%s to process" % (count, "" if count == 1 else "s"), - ) + messages.info(request, f"Queued {count} task{pluralize(count)} to process.") retry_task_action.short_description = "Process task(s)" @@ -180,10 +178,7 @@ def retry_task_action(self, request, queryset): for old_task in queryset: old_task.retry() count += 1 - messages.info( - request, - "Queued %d task%s to try again" % (count, "" if count == 1 else "s"), - ) + messages.info(request, f"Queued {count} task{pluralize(count)} to try again.") retry_task_action.short_description = "Retry task(s)" diff --git a/basket/news/apps.py b/basket/news/apps.py index 0b727bed2..e21322218 100644 --- a/basket/news/apps.py +++ b/basket/news/apps.py @@ -7,4 +7,4 @@ class BasketNewsConfig(AppConfig): def ready(self): # This will make sure the app is always imported when # Django starts so that shared_task will use this app. - import basket.news.celery # noqa + pass diff --git a/basket/news/celery.py b/basket/news/celery.py deleted file mode 100644 index 045433fcc..000000000 --- a/basket/news/celery.py +++ /dev/null @@ -1,75 +0,0 @@ -# flake8: noqa - -import os - -# set the default Django settings module for the 'celery' program. -os.environ.setdefault("DJANGO_SETTINGS_MODULE", "basket.settings") - -from django.conf import settings -from django.utils.encoding import force_bytes - -from celery import Celery -from cryptography.fernet import Fernet, InvalidToken, MultiFernet -from django_statsd.clients import statsd -from kombu import serialization -from kombu.utils import json - -FERNET = None - -if settings.KOMBU_FERNET_KEY: - FERNET = Fernet(settings.KOMBU_FERNET_KEY) - if settings.KOMBU_FERNET_KEY_PREVIOUS: - # this will try both keys. for key rotation. - FERNET = MultiFernet([FERNET, Fernet(settings.KOMBU_FERNET_KEY_PREVIOUS)]) - - -def fernet_dumps(message): - statsd.incr("basket.news.celery.fernet_dumps") - message = json.dumps(message) - if FERNET: - statsd.incr("basket.news.celery.fernet_dumps.encrypted") - return FERNET.encrypt(force_bytes(message)) - - statsd.incr("basket.news.celery.fernet_dumps.unencrypted") - return message - - -def fernet_loads(encoded_message): - statsd.incr("basket.news.celery.fernet_loads") - if FERNET: - try: - encoded_message = FERNET.decrypt(force_bytes(encoded_message)) - except InvalidToken: - statsd.incr("basket.news.celery.fernet_loads.unencrypted") - else: - statsd.incr("basket.news.celery.fernet_loads.encrypted") - else: - statsd.incr("basket.news.celery.fernet_loads.unencrypted") - - return json.loads(encoded_message) - - -serialization.unregister("json") -serialization.register( - "json", - fernet_dumps, - fernet_loads, - content_type="application/json", - content_encoding="utf-8", -) - - -app = Celery("basket") - -# Using a string here means the worker doesn't have to serialize -# the configuration object to child processes. -# - namespace='CELERY' means all celery-related configuration keys -# should have a `CELERY_` prefix. -app.config_from_object("django.conf:settings", namespace="CELERY") -# Load task modules from all registered Django app configs. -app.autodiscover_tasks() - - -@app.task(bind=True) -def debug_task(self): - print("Request: {0!r}".format(self.request)) diff --git a/basket/news/management/commands/process_maintenance_queue.py b/basket/news/management/commands/process_maintenance_queue.py index 68ccc42ee..dfed3da31 100644 --- a/basket/news/management/commands/process_maintenance_queue.py +++ b/basket/news/management/commands/process_maintenance_queue.py @@ -7,11 +7,7 @@ class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument( - "-n", - "--num-tasks", - type=int, - default=settings.QUEUE_BATCH_SIZE, - help="Number of tasks to process ({})".format(settings.QUEUE_BATCH_SIZE), + "-n", "--num-tasks", type=int, default=settings.QUEUE_BATCH_SIZE, help=f"Number of tasks to process ({settings.QUEUE_BATCH_SIZE})" ) def handle(self, *args, **options): @@ -23,4 +19,4 @@ def handle(self, *args, **options): task.retry() count += 1 - print("{} processed. {} remaining.".format(count, QueuedTask.objects.count())) + print(f"{count} processed. {QueuedTask.objects.count()} remaining.") diff --git a/basket/news/models.py b/basket/news/models.py index 9368b9544..f650061cf 100644 --- a/basket/news/models.py +++ b/basket/news/models.py @@ -8,10 +8,9 @@ import sentry_sdk from product_details import product_details +from basket.base.rq import enqueue_kwargs, get_queue from basket.news.fields import CommaSeparatedEmailField, LocaleField, parse_emails -from .celery import app as celery_app - def get_uuid(): """Needed because Django can't make migrations when using lambda.""" @@ -180,8 +179,9 @@ def __str__(self): # pragma: no cover return f"{self.name} {self.args} {self.kwargs}" def retry(self): - celery_app.send_task(self.name, args=self.args, kwargs=self.kwargs) - # Forget the old task + kwargs = enqueue_kwargs(self.name) + get_queue().enqueue(self.name, args=self.args, kwargs=self.kwargs, **kwargs) + # Forget the old task. self.delete() @@ -203,33 +203,9 @@ def formatted_call(self): formatted_kwargs = ["%s=%r" % (key, val) for key, val in self.kwargs.items()] return "%s(%s)" % (self.name, ", ".join(formatted_args + formatted_kwargs)) - @property - def filtered_args(self): - """ - Convert args that came from QueryDict instances to regular dicts. - - This is necessary because some tasks were bing called with QueryDict - instances, and whereas the Pickle for the task worked fine, storing - the args as JSON resulted in the dicts actually being a dict full - of length 1 lists instead of strings. This converts them back when - it finds them. - - This only needs to exist while we have old failure instances around. - - @return: list args: serialized QueryDicts converted to plain dicts. - """ - # TODO remove after old failed tasks are deleted - args = self.args - for i, arg in enumerate(args): - if _is_query_dict(arg): - args[i] = dict((key, arg[key][0]) for key in arg) - - return args - def retry(self): - # Meet the new task, - # same as the old task. - celery_app.send_task(self.name, args=self.filtered_args, kwargs=self.kwargs) + kwargs = enqueue_kwargs(self.name) + get_queue().enqueue(self.name, args=self.args, kwargs=self.kwargs, **kwargs) # Forget the old task self.delete() diff --git a/basket/news/tasks.py b/basket/news/tasks.py index ce0e57e59..db4a3e220 100644 --- a/basket/news/tasks.py +++ b/basket/news/tasks.py @@ -1,36 +1,26 @@ import logging -import re from datetime import date -from functools import wraps -from time import time from urllib.parse import urlencode from django.conf import settings from django.core.cache import cache -import requests -import sentry_sdk import user_agents -from celery.signals import task_failure, task_retry, task_success -from celery.utils.time import get_exponential_backoff_interval from django_statsd.clients import statsd -from silverpop.api import SilverpopResponseException +from basket.base.decorators import rq_task +from basket.base.exceptions import BasketError from basket.base.utils import email_is_testing from basket.news.backends.acoustic import acoustic, acoustic_tx -from basket.news.backends.common import NewsletterException from basket.news.backends.ctms import ( CTMSNotFoundByAltIDError, CTMSUniqueIDConflictError, ctms, ) -from basket.news.celery import app as celery_app from basket.news.models import ( AcousticTxEmailMessage, - FailedTask, Interest, Newsletter, - QueuedTask, ) from basket.news.newsletters import get_transactional_message_ids, newsletter_languages from basket.news.utils import ( @@ -47,181 +37,6 @@ log = logging.getLogger(__name__) -# don't propagate and don't retry if these are the error messages -IGNORE_ERROR_MSGS = [ - "INVALID_EMAIL_ADDRESS", - "InvalidEmailAddress", - "An invalid phone number was provided", - "No valid subscribers were provided", - "There are no valid subscribers", - "email address is suppressed", - "invalid email address", -] -# don't propagate and don't retry if these regex match the error messages -IGNORE_ERROR_MSGS_RE = [re.compile(r"campaignId \d+ not found")] -# don't propagate after max retries if these are the error messages -IGNORE_ERROR_MSGS_POST_RETRY = [] -# tasks exempt from maintenance mode queuing -MAINTENANCE_EXEMPT = [] - - -def exponential_backoff(retries): - """ - Return a number of seconds to delay the next task attempt using - an exponential back-off algorithm with jitter. - - See https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ - - :param retries: Number of retries so far - :return: number of seconds to delay the next try - """ - backoff_minutes = get_exponential_backoff_interval( - factor=2, - retries=retries, - maximum=settings.CELERY_MAX_RETRY_DELAY_MINUTES, - full_jitter=True, - ) - # wait for a minimum of 1 minute - return max(1, backoff_minutes) * 60 - - -def ignore_error(exc, to_ignore=None, to_ignore_re=None): - to_ignore = to_ignore or IGNORE_ERROR_MSGS - to_ignore_re = to_ignore_re or IGNORE_ERROR_MSGS_RE - msg = str(exc) - for ignore_msg in to_ignore: - if ignore_msg in msg: - return True - - for ignore_re in to_ignore_re: - if ignore_re.search(msg): - return True - - return False - - -def ignore_error_post_retry(exc): - return ignore_error(exc, IGNORE_ERROR_MSGS_POST_RETRY) - - -class BasketError(Exception): - """Tasks can raise this when an error happens that we should not retry. - E.g. if the error indicates we're passing bad parameters. - (As opposed to an error connecting to ExactTarget at the moment, - where we'd typically raise NewsletterException.) - """ - - def __init__(self, msg): - super(BasketError, self).__init__(msg) - - -class RetryTask(Exception): - """an exception to raise within a task if you just want to retry""" - - -@task_failure.connect -def on_task_failure(sender, task_id, exception, einfo, args, kwargs, **skwargs): - statsd.incr(sender.name + ".failure") - if not sender.name.endswith("snitch"): - statsd.incr("news.tasks.failure_total") - if settings.STORE_TASK_FAILURES: - FailedTask.objects.create( - task_id=task_id, - name=sender.name, - args=args, - kwargs=kwargs, - exc=repr(exception), - # str() gives more info than repr() on - # celery.datastructures.ExceptionInfo - einfo=str(einfo), - ) - - -@task_retry.connect -def on_task_retry(sender, **kwargs): - statsd.incr(sender.name + ".retry") - if not sender.name.endswith("snitch"): - statsd.incr("news.tasks.retry_total") - - -@task_success.connect -def on_task_success(sender, **kwargs): - statsd.incr(sender.name + ".success") - if not sender.name.endswith("snitch"): - statsd.incr("news.tasks.success_total") - - -def et_task(func): - """Decorator to standardize ET Celery tasks.""" - full_task_name = "news.tasks.%s" % func.__name__ - - # continue to use old names regardless of new layout - @celery_app.task( - name=full_task_name, - bind=True, - default_retry_delay=300, - max_retries=12, # 5 min - ) - @wraps(func) - def wrapped(self, *args, **kwargs): - start_time = kwargs.pop("start_time", None) - if start_time and not self.request.retries: - total_time = int((time() - start_time) * 1000) - statsd.timing(self.name + ".timing", total_time) - statsd.incr(self.name + ".total") - statsd.incr("news.tasks.all_total") - if settings.MAINTENANCE_MODE and self.name not in MAINTENANCE_EXEMPT: - if not settings.READ_ONLY_MODE: - # record task for later - QueuedTask.objects.create( - name=self.name, - args=args, - kwargs=kwargs, - ) - statsd.incr(self.name + ".queued") - else: - statsd.incr(self.name + ".not_queued") - - return - - try: - return func(*args, **kwargs) - except ( - IOError, - NewsletterException, - requests.RequestException, - RetryTask, - SilverpopResponseException, - ) as e: - # These could all be connection issues, so try again later. - # IOError covers URLError, SSLError, and requests.HTTPError. - if ignore_error(e): - with sentry_sdk.push_scope() as scope: - scope.set_tag("action", "ignored") - sentry_sdk.capture_exception() - return - - try: - if not (isinstance(e, RetryTask) or ignore_error_post_retry(e)): - with sentry_sdk.push_scope() as scope: - scope.set_tag("action", "retried") - sentry_sdk.capture_exception() - - # ~68 hr at 11 retries - statsd.incr(f"{self.name}.retries.{self.request.retries}") - statsd.incr(f"news.tasks.retries.{self.request.retries}") - raise self.retry(countdown=exponential_backoff(self.request.retries)) - except self.MaxRetriesExceededError: - statsd.incr(self.name + ".retry_max") - statsd.incr("news.tasks.retry_max_total") - # don't bubble certain errors - if ignore_error_post_retry(e): - return - - sentry_sdk.capture_exception() - - return wrapped - def fxa_source_url(metrics): source_url = settings.FXA_REGISTER_SOURCE_URL @@ -232,7 +47,7 @@ def fxa_source_url(metrics): return source_url -@et_task +@rq_task def fxa_email_changed(data): ts = data["ts"] fxa_id = data["uid"] @@ -281,12 +96,12 @@ def fxa_direct_update_contact(fxa_id, data): pass -@et_task +@rq_task def fxa_delete(data): fxa_direct_update_contact(data["uid"], {"fxa_deleted": True}) -@et_task +@rq_task def fxa_verified(data): """Add new FxA users""" # if we're not using the sandbox ignore testing domains @@ -327,7 +142,7 @@ def fxa_verified(data): upsert_contact(SUBSCRIBE, new_data, user_data) -@et_task +@rq_task def fxa_newsletters_update(data): email = data["email"] fxa_id = data["uid"] @@ -344,7 +159,7 @@ def fxa_newsletters_update(data): upsert_contact(SUBSCRIBE, new_data, get_fxa_user_data(fxa_id, email)) -@et_task +@rq_task def fxa_login(data): email = data["email"] # if we're not using the sandbox ignore testing domains @@ -401,7 +216,7 @@ def _add_fxa_activity(data): fxa_activity_acoustic.delay(login_data) -@et_task +@rq_task def fxa_activity_acoustic(data): acoustic.insert_update_relational_table( table_id=settings.ACOUSTIC_FXA_TABLE_ID, @@ -409,7 +224,7 @@ def fxa_activity_acoustic(data): ) -@et_task +@rq_task def update_get_involved( interest_id, lang, @@ -431,7 +246,7 @@ def update_get_involved( interest.notify_stewards(name, email, lang, message) -@et_task +@rq_task def update_user_meta(token, data): """Update a user's metadata, not newsletters""" try: @@ -440,7 +255,7 @@ def update_user_meta(token, data): raise -@et_task +@rq_task def upsert_user(api_call_type, data): """ Update or insert (upsert) a contact record @@ -538,7 +353,7 @@ def upsert_contact(api_call_type, data, user_data): # no user found. create new one. token = update_data["token"] = generate_token() if settings.MAINTENANCE_MODE: - sfdc_add_update.delay(update_data) + ctms_add_or_update.delay(update_data) else: ctms.add(update_data) @@ -567,7 +382,7 @@ def upsert_contact(api_call_type, data, user_data): token = update_data["token"] = generate_token() if settings.MAINTENANCE_MODE: - sfdc_add_update.delay(update_data, user_data) + ctms_add_or_update.delay(update_data, user_data) else: ctms.update(user_data, update_data) @@ -582,18 +397,10 @@ def upsert_contact(api_call_type, data, user_data): return token, False -@et_task -def sfdc_add_update(update_data, user_data=None): +@rq_task +def ctms_add_or_update(update_data, user_data=None): """ Add or update contact data when maintainance mode is completed. - - The first version was temporary, with: - TODO remove after maintenance is over and queue is processed - - The next version allowed SFDC / CTMS dual-write mode. - - This version only writes to CTMS, so it is now misnamed, but task - renames require coordination. """ if user_data: ctms.update(user_data, update_data) @@ -611,7 +418,7 @@ def sfdc_add_update(update_data, user_data=None): ctms.update(user_data, update_data) -@et_task +@rq_task def send_acoustic_tx_message(email, vendor_id, fields=None): acoustic_tx.send_mail(email, vendor_id, fields) @@ -629,7 +436,7 @@ def send_acoustic_tx_messages(email, lang, message_ids): return sent -@et_task +@rq_task def send_confirm_message(email, token, lang, message_type): lang = lang.strip() lang = lang or "en-US" @@ -639,7 +446,7 @@ def send_confirm_message(email, token, lang, message_type): acoustic_tx.send_mail(email, vid, {"basket_token": token}, save_to_db=True) -@et_task +@rq_task def confirm_user(token): """ Confirm any pending subscriptions for the user with this token. @@ -670,7 +477,7 @@ def confirm_user(token): ctms.update(user_data, {"optin": True}) -@et_task +@rq_task def update_custom_unsub(token, reason): """Record a user's custom unsubscribe reason.""" try: @@ -680,7 +487,7 @@ def update_custom_unsub(token, reason): pass -@et_task +@rq_task def send_recovery_message_acoustic(email, token, lang, fmt): message_name = "account-recovery" if fmt != "H": @@ -691,7 +498,7 @@ def send_recovery_message_acoustic(email, token, lang, fmt): acoustic_tx.send_mail(email, vid, {"basket_token": token}) -@et_task +@rq_task def record_common_voice_update(data): # do not change the sent data in place. A retry will use the changed data. dcopy = data.copy() @@ -711,18 +518,6 @@ def record_common_voice_update(data): ctms.add(new_data) -@celery_app.task() -def snitch(start_time=None): - if start_time is None: - snitch.delay(time()) - return - - snitch_id = settings.SNITCH_ID - totalms = int((time() - start_time) * 1000) - statsd.timing("news.tasks.snitch.timing", totalms) - requests.post("https://nosnch.in/{}".format(snitch_id), data={"m": totalms}) - - def get_fxa_user_data(fxa_id, email): """ Return a user data dict, just like `get_user_data` below, but ensure we have diff --git a/basket/news/tests/test_confirm.py b/basket/news/tests/test_confirm.py index 162a7da75..aa1e6b3a1 100644 --- a/basket/news/tests/test_confirm.py +++ b/basket/news/tests/test_confirm.py @@ -2,25 +2,12 @@ from django.test import TestCase -from celery.exceptions import Retry - -from basket.news.backends.common import NewsletterException from basket.news.tasks import confirm_user @patch("basket.news.tasks.ctms") @patch("basket.news.tasks.get_user_data") class TestConfirmTask(TestCase): - def test_error(self, get_user_data, ctms_mock): - """ - If user_data shows an error talking to ET, the task raises - an exception so our task logic will retry - """ - get_user_data.side_effect = NewsletterException("Stuffs broke yo.") - with self.assertRaises(Retry): - confirm_user("token") - self.assertFalse(ctms_mock.update.called) - def test_normal(self, get_user_data, ctms_mock): """If user_data is okay, and not yet confirmed, the task calls the right stuff""" diff --git a/basket/news/tests/test_models.py b/basket/news/tests/test_models.py index 28bf93c81..d26608f4d 100644 --- a/basket/news/tests/test_models.py +++ b/basket/news/tests/test_models.py @@ -2,6 +2,7 @@ from django.core import mail from django.test import TestCase +from django.test.utils import override_settings from basket.news import models @@ -46,71 +47,29 @@ def test_get_vendor_id(self): class FailedTaskTest(TestCase): - good_task_args = [{"case_type": "ringer", "email": "dude@example.com"}, "walter"] - - def test_retry_with_dict(self): - """When given args with a simple dict, subtask should get matching arguments.""" - task_name = "make_a_caucasian" - task = models.FailedTask.objects.create( - task_id="el-dudarino", - name=task_name, - args=self.good_task_args, - ) - with patch.object(models.celery_app, "send_task") as sub_mock: - task.retry() - - sub_mock.assert_called_with(task_name, args=self.good_task_args, kwargs={}) - - def test_retry_with_querydict(self): - """When given args with a QueryDict, subtask should get a dict.""" - task_name = "make_a_caucasian" - task_args = [{"case_type": ["ringer"], "email": ["dude@example.com"]}, "walter"] - task = models.FailedTask.objects.create( - task_id="el-dudarino", - name=task_name, - args=task_args, - ) - with patch.object(models.celery_app, "send_task") as sub_mock: - task.retry() - - sub_mock.assert_called_with(task_name, args=self.good_task_args, kwargs={}) - - def test_retry_with_querydict_not_first(self): - """When given args with a QueryDict in any position, subtask should get - a dict.""" - task_name = "make_a_caucasian" - task_args = [ - "donny", - {"case_type": ["ringer"], "email": ["dude@example.com"]}, - "walter", - ] - task = models.FailedTask.objects.create( - task_id="el-dudarino", - name=task_name, - args=task_args, - ) - with patch.object(models.celery_app, "send_task") as sub_mock: - task.retry() - - sub_mock.assert_called_with( - task_name, - args=["donny"] + self.good_task_args, - kwargs={}, - ) - - def test_retry_with_almost_querydict(self): - """When given args with a dict with a list, subtask should get a same args.""" + args = [{"case_type": "ringer", "email": "dude@example.com"}, "walter"] + kwargs = {"foo": "bar"} + + @override_settings(RQ_MAX_RETRIES=2) + @patch("basket.base.rq.random") + @patch("basket.base.rq.Queue.enqueue") + def test_retry(self, mock_enqueue, mock_random): + """Test args and kwargs are passed to enqueue.""" + mock_random.randrange.side_effect = [60, 90] task_name = "make_a_caucasian" - task_args = [{"case_type": "ringer", "email": ["dude@example.com"]}, "walter"] task = models.FailedTask.objects.create( task_id="el-dudarino", name=task_name, - args=task_args, + args=self.args, + kwargs=self.kwargs, ) - with patch.object(models.celery_app, "send_task") as sub_mock: - task.retry() + task.retry() - sub_mock.assert_called_with(task_name, args=task_args, kwargs={}) + mock_enqueue.assert_called_once() + assert mock_enqueue.call_args.args[0] == task_name + assert mock_enqueue.call_args.kwargs["args"] == self.args + assert mock_enqueue.call_args.kwargs["kwargs"] == self.kwargs + assert mock_enqueue.call_args.kwargs["retry"].intervals == [60, 90] class InterestTests(TestCase): diff --git a/basket/news/tests/test_tasks.py b/basket/news/tests/test_tasks.py index efb5fcfe5..791de6f82 100644 --- a/basket/news/tests/test_tasks.py +++ b/basket/news/tests/test_tasks.py @@ -1,65 +1,45 @@ from copy import deepcopy from unittest.mock import ANY, Mock, call, patch -from urllib.error import URLError from uuid import uuid4 from django.conf import settings from django.test import TestCase from django.test.utils import override_settings -from requests.exceptions import ConnectionError as RequestsConnectionError - from basket.news.backends.ctms import CTMSNotFoundByAltIDError -from basket.news.celery import app as celery_app from basket.news.models import FailedTask from basket.news.tasks import ( SUBSCRIBE, _add_fxa_activity, - et_task, fxa_delete, fxa_email_changed, fxa_login, fxa_verified, get_fxa_user_data, record_common_voice_update, - send_acoustic_tx_message, update_custom_unsub, update_user_meta, ) from basket.news.utils import iso_format_unix_timestamp -class FailedTaskTest(TestCase): - """Test that failed tasks are logged in our FailedTask table""" - - @patch("basket.news.tasks.acoustic_tx") - def test_failed_task_logging(self, mock_acoustic): - """Failed task is logged in FailedTask table""" - mock_acoustic.send_mail.side_effect = Exception("Test exception") - self.assertEqual(0, FailedTask.objects.count()) - args = ["you@example.com", "SFDCID"] - kwargs = {"fields": {"token": 3}} - result = send_acoustic_tx_message.apply(args=args, kwargs=kwargs) - fail = FailedTask.objects.get() - self.assertEqual("news.tasks.send_acoustic_tx_message", fail.name) - self.assertEqual(result.task_id, fail.task_id) - self.assertEqual(args, fail.args) - self.assertEqual(kwargs, fail.kwargs) - self.assertEqual("Exception('Test exception')", fail.exc) - self.assertIn("Exception: Test exception", fail.einfo) - - class RetryTaskTest(TestCase): """Test that we can retry a task""" + @override_settings(RQ_MAX_RETRIES=2) @patch("django.contrib.messages.info", autospec=True) - def test_retry_task(self, info): + @patch("basket.base.rq.random") + @patch("basket.base.rq.Queue.enqueue") + def test_retry_task(self, mock_enqueue, mock_random, info): + mock_random.randrange.side_effect = [60, 90] TASK_NAME = "news.tasks.update_phonebook" + args = [1, 2] + kwargs = {"token": 3} failed_task = FailedTask( name=TASK_NAME, task_id=4, - args=[1, 2], - kwargs={"token": 3}, + args=args, + kwargs=kwargs, exc="", einfo="", ) @@ -67,47 +47,17 @@ def test_retry_task(self, info): # that have been saved, so just mock that and check later that it was # called. failed_task.delete = Mock(spec=failed_task.delete) - with patch.object(celery_app, "send_task") as send_task_mock: - # Let's retry. - failed_task.retry() + failed_task.retry() # Task was submitted again - send_task_mock.assert_called_with(TASK_NAME, args=[1, 2], kwargs={"token": 3}) + mock_enqueue.assert_called_once() + assert mock_enqueue.call_args.args[0] == TASK_NAME + assert mock_enqueue.call_args.kwargs["args"] == args + assert mock_enqueue.call_args.kwargs["kwargs"] == kwargs + assert mock_enqueue.call_args.kwargs["retry"].intervals == [60, 90] # Previous failed task was deleted self.assertTrue(failed_task.delete.called) -class ETTaskTests(TestCase): - def _test_retry_increase(self, mock_backoff, error): - """ - The delay for retrying a task should increase geometrically by a - power of 2. I really hope I said that correctly. - """ - - @et_task - def myfunc(): - raise error - - myfunc.push_request(retries=4) - myfunc.retry = Mock(side_effect=Exception) - # have to use run() to make sure our request above is used - with self.assertRaises(Exception): # noqa: B017 - myfunc.run() - - mock_backoff.assert_called_with(4) - myfunc.retry.assert_called_with(countdown=mock_backoff()) - - @patch("basket.news.tasks.exponential_backoff") - def test_urlerror(self, mock_backoff): - self._test_retry_increase(mock_backoff, URLError(reason=Exception("foo bar!"))) - - @patch("basket.news.tasks.exponential_backoff") - def test_requests_connection_error(self, mock_backoff): - self._test_retry_increase( - mock_backoff, - RequestsConnectionError("Connection aborted."), - ) - - class AddFxaActivityTests(TestCase): def _base_test(self, user_agent=False, fxa_id="123", first_device=True): if not user_agent: diff --git a/basket/news/tests/test_update_user_task.py b/basket/news/tests/test_update_user_task.py index a0a5b1b57..a6622f50a 100644 --- a/basket/news/tests/test_update_user_task.py +++ b/basket/news/tests/test_update_user_task.py @@ -1,5 +1,5 @@ import json -from unittest.mock import ANY, patch +from unittest.mock import patch from django.core.cache import cache from django.test import RequestFactory, TestCase @@ -64,7 +64,7 @@ def test_accept_lang(self, nl_mock, get_best_language_mock): response = views.update_user_task(request, SUBSCRIBE, data, sync=False) self.assert_response_ok(response) - self.upsert_user.delay.assert_called_with(SUBSCRIBE, after_data, start_time=ANY) + self.upsert_user.delay.assert_called_with(SUBSCRIBE, after_data) @patch("basket.news.utils.get_best_language") @patch("basket.news.utils.newsletter_languages") @@ -78,7 +78,7 @@ def test_accept_lang_header(self, nl_mock, get_best_language_mock): response = views.update_user_task(request, SUBSCRIBE, data, sync=False) self.assert_response_ok(response) - self.upsert_user.delay.assert_called_with(SUBSCRIBE, after_data, start_time=ANY) + self.upsert_user.delay.assert_called_with(SUBSCRIBE, after_data) @patch("basket.news.utils.get_best_language") @patch("basket.news.utils.newsletter_languages") @@ -95,7 +95,7 @@ def test_lang_overrides_accept_lang(self, nl_mock, get_best_language_mock): response = views.update_user_task(request, SUBSCRIBE, data, sync=False) self.assert_response_ok(response) # basically asserts that the data['lang'] value wasn't changed. - self.upsert_user.delay.assert_called_with(SUBSCRIBE, data, start_time=ANY) + self.upsert_user.delay.assert_called_with(SUBSCRIBE, data) @patch("basket.news.utils.get_best_language") @patch("basket.news.utils.newsletter_languages") @@ -113,7 +113,7 @@ def test_lang_default_if_not_in_list(self, nl_mock, get_best_language_mock): response = views.update_user_task(request, SUBSCRIBE, data, sync=False) self.assert_response_ok(response) # basically asserts that the data['lang'] value wasn't changed. - self.upsert_user.delay.assert_called_with(SUBSCRIBE, after_data, start_time=ANY) + self.upsert_user.delay.assert_called_with(SUBSCRIBE, after_data) def test_missing_email(self): """ @@ -134,7 +134,7 @@ def test_success_no_sync(self): response = views.update_user_task(request, SUBSCRIBE, data, sync=False) self.assert_response_ok(response) - self.upsert_user.delay.assert_called_with(SUBSCRIBE, data, start_time=ANY) + self.upsert_user.delay.assert_called_with(SUBSCRIBE, data) self.assertFalse(self.upsert_contact.called) def test_success_with_valid_newsletters(self): @@ -171,7 +171,7 @@ def test_success_with_request_data(self): response = views.update_user_task(request, SUBSCRIBE, sync=False) self.assert_response_ok(response) - self.upsert_user.delay.assert_called_with(SUBSCRIBE, data, start_time=ANY) + self.upsert_user.delay.assert_called_with(SUBSCRIBE, data) @patch("basket.news.views.get_user_data") def test_success_with_sync(self, gud_mock): @@ -208,7 +208,7 @@ def test_success_with_unsubscribe_private_newsletter( response = views.update_user_task(request, UNSUBSCRIBE, data) self.assert_response_ok(response) - self.upsert_user.delay.assert_called_with(UNSUBSCRIBE, data, start_time=ANY) + self.upsert_user.delay.assert_called_with(UNSUBSCRIBE, data) mock_api_key.assert_not_called() @patch("basket.news.views.newsletter_and_group_slugs") diff --git a/basket/news/tests/test_views.py b/basket/news/tests/test_views.py index 59527b21f..fc26a2264 100644 --- a/basket/news/tests/test_views.py +++ b/basket/news/tests/test_views.py @@ -1,7 +1,5 @@ -# -*- coding: utf8 -*- - import json -from unittest.mock import ANY, Mock, patch +from unittest.mock import Mock, patch from django.conf import settings from django.core.cache import cache @@ -374,7 +372,6 @@ def test_subscribe_success(self): "country": "", "source_url": "", }, - start_time=ANY, ) def test_subscribe_success_non_ajax(self): @@ -397,7 +394,6 @@ def test_subscribe_success_non_ajax(self): "country": "", "source_url": "", }, - start_time=ANY, ) assert b"Thank you for subscribing" in response.content @@ -470,7 +466,6 @@ def test_lang_via_accept_language(self): "country": "", "source_url": "", }, - start_time=ANY, ) def test_lang_instead_of_accept_language(self): @@ -500,7 +495,6 @@ def test_lang_instead_of_accept_language(self): "country": "", "source_url": "", }, - start_time=ANY, ) def test_lang_default_if_unsupported(self): @@ -529,7 +523,6 @@ def test_lang_default_if_unsupported(self): "country": "", "source_url": "", }, - start_time=ANY, ) def test_source_url_from_referrer(self): @@ -552,7 +545,6 @@ def test_source_url_from_referrer(self): "country": "", "source_url": "https://example.com/bowling", }, - start_time=ANY, ) def test_source_url_from_invalid_referrer(self): @@ -575,7 +567,6 @@ def test_source_url_from_invalid_referrer(self): "country": "", "source_url": "", }, - start_time=ANY, ) def test_source_url_overrides_referrer(self): @@ -603,7 +594,6 @@ def test_source_url_overrides_referrer(self): "country": "", "source_url": "https://example.com/abiding", }, - start_time=ANY, ) def test_multiple_newsletters(self): @@ -631,7 +621,6 @@ def test_multiple_newsletters(self): "country": "", "source_url": "", }, - start_time=ANY, ) def test_blocked_email(self): diff --git a/basket/news/urls.py b/basket/news/urls.py index f7facb044..b5bf0386c 100644 --- a/basket/news/urls.py +++ b/basket/news/urls.py @@ -4,7 +4,6 @@ common_voice_goals, confirm, custom_unsub_reason, - debug_user, get_involved, list_newsletters, lookup_user, @@ -24,7 +23,6 @@ path("user//", user, name="user"), path("user-meta//", user_meta), path("confirm//", confirm), - path("debug-user/", debug_user), path("lookup-user/", lookup_user, name="lookup_user"), path("recover/", send_recovery_message, name="send_recovery_message"), path("custom_unsub_reason/", custom_unsub_reason), diff --git a/basket/news/views.py b/basket/news/views.py index 75fb234ea..5a582d051 100644 --- a/basket/news/views.py +++ b/basket/news/views.py @@ -1,6 +1,5 @@ import os import re -from time import time from urllib.parse import urlencode from django.conf import settings @@ -229,7 +228,7 @@ def confirm(request, token): increment=True, ): raise Ratelimited() - confirm_user.delay(token, start_time=time()) + confirm_user.delay(token) return HttpResponseJSON({"status": "ok"}) @@ -465,7 +464,7 @@ def subscribe_main(request): return respond_error(request, form, "Rate limit reached", 429) try: - upsert_user.delay(SUBSCRIBE, data, start_time=time()) + upsert_user.delay(SUBSCRIBE, data) except Exception: return respond_error(request, form, "Unknown error", 500) @@ -673,18 +672,6 @@ def send_recovery_message(request): return HttpResponseJSON({"status": "ok"}) -@never_cache -def debug_user(request): - return HttpResponseJSON( - { - "status": "error", - "desc": "method removed. use lookup-user and an API key.", - "code": errors.BASKET_USAGE_ERROR, - }, - 404, - ) - - # Custom update methods @@ -928,7 +915,7 @@ def update_user_task(request, api_call_type, data=None, optin=False, sync=False) statsd.incr("news.views.subscribe.sync") if settings.MAINTENANCE_MODE and not settings.MAINTENANCE_READ_ONLY: # save what we can - upsert_user.delay(api_call_type, data, start_time=time()) + upsert_user.delay(api_call_type, data) # have to error since we can't return a token return HttpResponseJSON( { @@ -959,5 +946,5 @@ def update_user_task(request, api_call_type, data=None, optin=False, sync=False) token, created = upsert_contact(api_call_type, data, user_data) return HttpResponseJSON({"status": "ok", "token": token, "created": created}) else: - upsert_user.delay(api_call_type, data, start_time=time()) + upsert_user.delay(api_call_type, data) return HttpResponseJSON({"status": "ok"}) diff --git a/basket/settings.py b/basket/settings.py index 61e8f67ac..0d481e99f 100644 --- a/basket/settings.py +++ b/basket/settings.py @@ -3,7 +3,6 @@ import socket import struct import sys -from datetime import timedelta from pathlib import Path import dj_database_url @@ -11,10 +10,10 @@ import sentry_sdk from decouple import Csv, UndefinedValueError, config from sentry_processor import DesensitizationProcessor -from sentry_sdk.integrations.celery import CeleryIntegration from sentry_sdk.integrations.django import DjangoIntegration from sentry_sdk.integrations.logging import ignore_logger from sentry_sdk.integrations.redis import RedisIntegration +from sentry_sdk.integrations.rq import RqIntegration # Application version. VERSION = (0, 1) @@ -29,6 +28,11 @@ def path(*args): DEBUG = config("DEBUG", default=False, cast=bool) +UNITTEST = config("UNITTEST", default=False, cast=bool) + +# If we forget to set a `UNITTEST` env var by are running `pytest`, set it. +if sys.argv[0].endswith(("py.test", "pytest")): + UNITTEST = True ADMINS = ( # ('Your Name', 'your_email@domain.com'), @@ -47,14 +51,6 @@ def path(*args): default="https://prod-oregon-b.basket.moz.works", ) -REDIS_URL = config("REDIS_URL", None) -if REDIS_URL: - REDIS_URL = REDIS_URL.rstrip("/0") - HIREDIS_URL = REDIS_URL.replace("redis://", "hiredis://") - # use redis for celery and cache - os.environ["CELERY_BROKER_URL"] = REDIS_URL + "/" + config("REDIS_CELERY_DB", "0") - os.environ["CACHE_URL"] = HIREDIS_URL + "/" + config("REDIS_CACHE_DB", "1") - # Production uses MySQL, but Sqlite should be sufficient for local development. # Our CI server tests against MySQL. DATABASES = { @@ -70,6 +66,15 @@ def path(*args): } DEFAULT_AUTO_FIELD = "django.db.models.AutoField" +# CACHE_URL and RQ_URL are derived from REDIS_URL. +REDIS_URL = config("REDIS_URL", None) +if REDIS_URL: + REDIS_URL = REDIS_URL.rstrip("/0") + HIREDIS_URL = REDIS_URL.replace("redis://", "hiredis://") + # use redis for cache and rq. + os.environ["CACHE_URL"] = HIREDIS_URL + "/" + config("REDIS_CACHE_DB", "1") + os.environ["RQ_URL"] = RQ_URL = REDIS_URL + "/" + config("REDIS_RQ_DB", "2") + CACHES = { "default": config("CACHE_URL", default="locmem://", cast=django_cache_url.parse), "bad_message_ids": { @@ -244,49 +249,15 @@ def path(*args): # view rate limiting RATELIMIT_VIEW = "basket.news.views.ratelimited" -KOMBU_FERNET_KEY = config("KOMBU_FERNET_KEY", None) -# for key rotation -KOMBU_FERNET_KEY_PREVIOUS = config("KOMBU_FERNET_KEY_PREVIOUS", None) -CELERY_TASK_ALWAYS_EAGER = config("CELERY_TASK_ALWAYS_EAGER", DEBUG, cast=bool) -CELERY_TASK_SERIALIZER = "json" -CELERY_TASK_ACKS_LATE = config("CELERY_TASK_ACKS_LATE", True, cast=bool) -CELERY_TASK_REJECT_ON_WORKER_LOST = False -CELERY_ACCEPT_CONTENT = ["json"] -CELERY_MAX_RETRY_DELAY_MINUTES = 2048 -CELERY_BROKER_TRANSPORT_OPTIONS = { - "visibility_timeout": CELERY_MAX_RETRY_DELAY_MINUTES * 60, -} -CELERY_BROKER_URL = config("CELERY_BROKER_URL", None) -CELERY_REDIS_MAX_CONNECTIONS = config("CELERY_REDIS_MAX_CONNECTIONS", 2, cast=int) -CELERY_WORKER_DISABLE_RATE_LIMITS = True -CELERY_TASK_IGNORE_RESULT = True -CELERY_WORKER_PREFETCH_MULTIPLIER = config( - "CELERY_WORKER_PREFETCH_MULTIPLIER", - 1, - cast=int, -) -CELERY_TASK_COMPRESSION = "gzip" -CELERY_TASK_ROUTES = { - "basket.news.tasks.snitch": {"queue": "snitch"}, -} - -# size in kb -CELERY_WORKER_MAX_MEMORY_PER_CHILD = config( - "CELERY_WORKER_MAX_MEMORY_PER_CHILD", - 200000, - cast=int, -) +# RQ configuration. +RQ_RESULT_TTL = config("RQ_RESULT_TTL", default=0, cast=int) # Ignore results. +RQ_MAX_RETRY_DELAY = config("RQ_MAX_RETRY_DELAY", default=34 * 60 * 60, cast=int) # 34 hours in seconds. +RQ_MAX_RETRIES = 0 if UNITTEST else config("RQ_MAX_RETRIES", default=12, cast=int) +RQ_EXCEPTION_HANDLERS = ["basket.base.rq.store_task_exception_handler"] +RQ_IS_ASYNC = False if UNITTEST else config("RQ_IS_ASYNC", default=True, cast=bool) SNITCH_ID = config("SNITCH_ID", None) -CELERY_BEAT_SCHEDULE = {} - -if SNITCH_ID: - CELERY_BEAT_SCHEDULE["snitch"] = { - "task": "basket.news.tasks.snitch", - "schedule": timedelta(minutes=5), - } - # via http://stackoverflow.com/a/6556951/107114 def get_default_gateway_linux(): @@ -349,7 +320,7 @@ def before_send(event, hint): dsn=config("SENTRY_DSN", None), release=config("GIT_SHA", None), server_name=".".join(x for x in [K8S_NAMESPACE, CLUSTER_NAME, HOSTNAME] if x), - integrations=[CeleryIntegration(), DjangoIntegration(), RedisIntegration()], + integrations=[DjangoIntegration(signals_spans=False), RedisIntegration(), RqIntegration()], before_send=before_send, ) @@ -399,11 +370,14 @@ def before_send(event, hint): # regardless of their existence in the DB EXTRA_SUPPORTED_LANGS = config("EXTRA_SUPPORTED_LANGS", "", cast=Csv()) -TESTING_EMAIL_DOMAINS = config( - "TESTING_EMAIL_DOMAINS", - "restmail.net,restmail.lcip.org,example.com", - cast=Csv(), -) +if UNITTEST: + TESTING_EMAIL_DOMAINS = [] +else: + TESTING_EMAIL_DOMAINS = config( + "TESTING_EMAIL_DOMAINS", + "restmail.net,restmail.lcip.org,example.com", + cast=Csv(), + ) MAINTENANCE_MODE = config("MAINTENANCE_MODE", False, cast=bool) QUEUE_BATCH_SIZE = config("QUEUE_BATCH_SIZE", 500, cast=int) @@ -465,8 +439,3 @@ def before_send(event, hint): OIDC_CREATE_USER = config("OIDC_CREATE_USER", default=False, cast=bool) MIDDLEWARE += ("basket.news.middleware.OIDCSessionRefreshMiddleware",) LOGIN_REDIRECT_URL = "/admin/" - -if sys.argv[0].endswith("py.test") or sys.argv[0].endswith("pytest") or (len(sys.argv) > 1 and sys.argv[1] == "test"): - # stuff that's absolutely required for a test run - CELERY_TASK_ALWAYS_EAGER = True - TESTING_EMAIL_DOMAINS = [] diff --git a/bin/run-clock.sh b/bin/run-clock.sh deleted file mode 100755 index aa2328bc3..000000000 --- a/bin/run-clock.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash -ex - -exec celery -A basket.news beat -l "${CELERY_LOG_LEVEL:-warning}" diff --git a/bin/run-dev.sh b/bin/run-dev.sh index ba5150e1d..1aa16a1b9 100755 --- a/bin/run-dev.sh +++ b/bin/run-dev.sh @@ -1,6 +1,4 @@ -#!/bin/bash - -set -ex +#!/bin/bash -ex urlwait bin/post-deploy.sh diff --git a/bin/run-worker.sh b/bin/run-worker.sh index bfe6204c1..1726b6421 100755 --- a/bin/run-worker.sh +++ b/bin/run-worker.sh @@ -1,7 +1,4 @@ #!/bin/bash -ex -exec newrelic-admin run-program celery -A basket.news worker \ - -P "${CELERY_POOL:-prefork}" \ - -l "${CELERY_LOG_LEVEL:-warning}" \ - -c "${CELERY_NUM_WORKERS:-4}" \ - -Q celery,snitch +NEW_RELIC_CONFIG_FILE=newrelic.ini newrelic-admin run-program \ +python manage.py rqworker --with-scheduler diff --git a/docker-compose.yml b/docker-compose.yml index 7f98ebd1d..e50689a28 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,16 +19,9 @@ services: platform: linux/amd64 volumes: - .:/app - environment: - - DATABASE_URL=mysql://root@db/basket - - DEBUG=True - - ALLOWED_HOSTS=localhost,127.0.0.1, - - REDIS_URL=redis://redis:6379 - - CELERY_TASK_ALWAYS_EAGER=False - - CELERY_LOG_LEVEL=debug - - DJANGO_LOG_LEVEL=DEBUG - - URLWAIT_TIMEOUT=30 - env_file: .env + env_file: + - docker/envfiles/local.env + - .env ports: - "8000:8000" depends_on: @@ -42,16 +35,9 @@ services: platform: linux/amd64 volumes: - .:/app - environment: - - DATABASE_URL=mysql://root@db/basket - - DEBUG=True - - ALLOWED_HOSTS=localhost,127.0.0.1, - - REDIS_URL=redis://redis:6379 - - CELERY_TASK_ALWAYS_EAGER=False - - CELERY_LOG_LEVEL=debug - - DJANGO_LOG_LEVEL=DEBUG - - URLWAIT_TIMEOUT=30 - env_file: .env + env_file: + - docker/envfiles/local.env + - .env depends_on: - db - redis @@ -63,16 +49,9 @@ services: platform: linux/amd64 volumes: - .:/app - environment: - - DATABASE_URL=mysql://root@db/basket - - DEBUG=True - - ALLOWED_HOSTS=localhost,127.0.0.1, - - CELERY_TASK_ALWAYS_EAGER=True - - CELERY_LOG_LEVEL=debug - - DJANGO_LOG_LEVEL=DEBUG - - URLWAIT_TIMEOUT=30 - - REDIS_URL=redis://redis:6379 - env_file: docker/envfiles/test.env + env_file: + - docker/envfiles/local.env + - docker/envfiles/test.env depends_on: - db - redis @@ -82,16 +61,9 @@ services: test-image: image: mozmeao/basket:${GIT_COMMIT_SHORT:-latest} platform: linux/amd64 - environment: - - DATABASE_URL=mysql://root@db/basket - - DEBUG=True - - ALLOWED_HOSTS=localhost,127.0.0.1, - - CELERY_TASK_ALWAYS_EAGER=True - - CELERY_LOG_LEVEL=debug - - DJANGO_LOG_LEVEL=DEBUG - - URLWAIT_TIMEOUT=30 - - REDIS_URL=redis://redis:6379 - env_file: docker/envfiles/test.env + env_file: + - docker/envfiles/local.env + - docker/envfiles/test.env depends_on: - db - redis diff --git a/docker/envfiles/local.env b/docker/envfiles/local.env new file mode 100644 index 000000000..6403ad2de --- /dev/null +++ b/docker/envfiles/local.env @@ -0,0 +1,6 @@ +ALLOWED_HOSTS=localhost,127.0.0.1, +DATABASE_URL=mysql://root@db/basket +DEBUG=True +DJANGO_LOG_LEVEL=DEBUG +REDIS_URL=redis://redis:6379 +URLWAIT_TIMEOUT=30 diff --git a/docker/envfiles/test.env b/docker/envfiles/test.env index 624c8c873..5d4255645 100644 --- a/docker/envfiles/test.env +++ b/docker/envfiles/test.env @@ -1,7 +1,8 @@ +ADMINS=["thedude@example.com"] +ALLOWED_HOSTS=* +DATABASE_URL=sqlite:///basket.db DEBUG=False DEV=False -ALLOWED_HOSTS=* +DJANGO_SETTINGS_MODULE=basket.settings SECRET_KEY=ssssssssshhhhhhhhhh -ADMINS=["thedude@example.com"] -CELERY_TASK_ALWAYS_EAGER=True -DATABASE_URL=sqlite:///basket.db +UNITTEST=True diff --git a/docs/install.rst b/docs/install.rst index e6cc8a042..e7623ceb9 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -42,42 +42,41 @@ code according to our style and check for errors for every commit. Use Docker ---------- +Basket requires a database (either MySQL or SQLite locally, depending on the ``DATABASE_URL`` setting) and Redis. We use Docker to run these services. + The steps to get up and running are these: .. code-block:: bash - $ # this pulls our latest builds from the docker hub. - $ # it's optional but will speed up your builds considerably. - $ docker-compose pull - $ # this starts the server and dependencies - $ docker-compose up web + $ make build + $ make run # runs both the web app and the worker. If you've made changes to the `Dockerfile` or `requirements/*.txt` you'll need to rebuild the image to run the app and tests: .. code-block:: bash - $ docker-compose build web + $ make build -Then to run the app you run the `docker-compose up web` command again, or for running tests against your local changes you run: +Then to run the app you run the `make run` command again, or for running tests against your local changes you run: .. code-block:: bash - $ docker-compose run --rm test + $ make test We use pytest for running tests. So if you'd like to craft your own pytest command to run individual test files or something you can do so by passing in a command to the above: .. code-block:: bash - $ docker-compose run --rm test py.test basket/news/tests/test_views.py + $ make run-shell + $ pytest basket/news/tests/test_views.py And if you need to debug a running container, you can open another terminal to your basket code and run the following: .. code-block:: bash - $ docker-compose exec web bash - $ # or - $ docker-compose exec web python manage.py shell + $ make shell + $ python manage.py shell Maintaining Python requirements diff --git a/docs/newsletter_api.rst b/docs/newsletter_api.rst index 8af343544..d38c944df 100644 --- a/docs/newsletter_api.rst +++ b/docs/newsletter_api.rst @@ -8,45 +8,47 @@ Newsletter API ============================ -This "news" app provides a service for managing Mozilla newsletters. +The "news" app provides a service for managing Mozilla newsletters. `fixtures/newsletters.json` is a fixture that can be used to load some initial data, but is probably out of date by the time you read this. -Currently available newsletters can be found in JSON format via the +You can access the currently available newsletters in JSON format through the `/news/newsletters/ API endpoint `_. -If 'token-required' is specified, a token must be suffixed onto the API -URL, such as:: +If 'token-required' is specified, you must append a token to the API URL. For +example:: /news/user// -This is a user-specific token given away by the email backend or -basket in some manner (i.e. emailed to the user from basket). This -token allows clients to do more powerful things with the user. +The user-specific token is provided by the email backend or basket via email. +This token grants clients additional capabilities to perform actions on on user +accounts. -A client might also have an ``API key`` that it can use with some APIs to -do privileged things, like looking up a user from their email. +Clients may also possess an "API key" that enables privileged operations, such +as email-based user lookup, when used with specific APIs. -If 'SSL required', the call will fail if not called over a secure -(SSL) connection. +If 'SSL required' is specified, the API call must be made over a secure (SSL) +connection. Otherwise, the call will fail. -Whenever possible (even when the HTTP status is not 200), the response body -will be a JSON-encoded dictionary with several guaranteed fields, along with -any data being returned by the particular call: +In most cases, the response body will be a JSON-encoded dictionary containing +several predefined fields, even if the HTTP status code is not 200. +Additionally, the response may include data specific to the requested operation. + +The following fields are guaranteed to be present in the response: 'status': 'ok' if the call succeeded, 'error' if there was an error -If there was an error, these fields will also be included: +If an error occurs, the following fields will also be included: - 'code': an integer error code taken from ``basket.errors`` + 'code': An integer error code taken from ``basket.errors`` in `basket-client `_. - 'desc': brief English description of the error. + 'desc': A brief description in English explaining the encountered error. The following URLs are available (assuming "/news" is app url): -/news/subscribe ---------------- +/news/subscribe/ +---------------- This method subscribes the user to the newsletters defined in the "newsletters" field, which should be a comma-delimited list of @@ -92,8 +94,8 @@ The following URLs are available (assuming "/news" is app url): If the email address is invalid (due to format, or unrecognized domain), the error code will be ``BASKET_INVALID_EMAIL`` from the basket client. -/news/unsubscribe ------------------ +/news/unsubscribe/ +------------------ This method unsubscribes the user from the newsletters defined in the "newsletters" field, which should be a comma-delimited list of @@ -107,8 +109,8 @@ The following URLs are available (assuming "/news" is app url): { status: error, desc: } on error token-required -/news/user ----------- +/news/user/ +----------- Returns information about the user including all the newsletters he/she is subscribed to:: @@ -142,8 +144,20 @@ The following URLs are available (assuming "/news" is app url): { status: error, desc: } on error token-required -/news/newsletters ------------------ +/news/user-meta/ +---------------- + + Used to update user metadata only, not newsletters. + + method: POST + fields: first_name, last_name, country, lang, source_url + returns: { status: ok } on success + { status: error, desc: } on error + token-required + + +/news/newsletters/ +------------------ Returns information about all of the available newsletters:: @@ -170,13 +184,8 @@ The following URLs are available (assuming "/news" is app url): } } -/news/debug-user ----------------- - - REMOVED. Will return a 404. Use the newer and better ``lookup-user`` method. - -/news/lookup-user ------------------ +/news/lookup-user/ +------------------ This allows retrieving user information given either their token or their email (but not both). To retrieve by email, an API key is diff --git a/env-dist b/env-dist index 18934b904..966df874d 100644 --- a/env-dist +++ b/env-dist @@ -1,9 +1,6 @@ -DEBUG=1 +DEBUG=True # change this to something secret when deployed SECRET_KEY=sssssssshhhhhhhhhh -# uncomment this if you aren't using the full docker environment -#CELERY_TASK_ALWAYS_EAGER=1 - ALLOWED_HOSTS='*' diff --git a/newrelic.ini b/newrelic.ini index 340f08424..70c002ae3 100644 --- a/newrelic.ini +++ b/newrelic.ini @@ -169,7 +169,7 @@ error_collector.enabled = true # To stop specific errors from reporting to the UI, set this to # a space separated list of the Python exception type names to # ignore. The exception name should be of the form 'module:class'. -error_collector.ignore_errors = celery.exceptions:Retry ratelimit.exceptions:Ratelimited +error_collector.ignore_errors = ratelimit.exceptions:Ratelimited # Browser monitoring is the Real User Monitoring feature of the UI. # For those Python web frameworks that are supported, this diff --git a/pyproject.toml b/pyproject.toml index 082f98d9e..783031ea2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,14 +21,14 @@ exclude = ''' # Set what ruff should check for. # See https://beta.ruff.rs/docs/rules/ for a list of rules. select = [ - "A", # flake8-builtin errors. - "B", # bugbear errors - "DJ", # flake8-django errors. - "E", # pycodestyle errors - "F", # pyflakes errors - "I", # import sorting - "Q", # flake8-quotes errors - "W", # pycodestyle warnings + "A", # flake8-builtin errors. + "B", # bugbear errors + "DJ", # flake8-django errors. + "E", # pycodestyle errors + "F", # pyflakes errors + "I", # import sorting + "Q", # flake8-quotes errors + "W", # pycodestyle warnings ] line-length = 150 # To match black. target-version = 'py39' @@ -46,4 +46,3 @@ django = ["django"] [tool.pytest.ini_options] addopts = "-ra --ignore=vendor" -DJANGO_SETTINGS_MODULE = "basket.settings" diff --git a/requirements/dev.txt b/requirements/dev.txt index a92801ea1..00225550e 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -4,12 +4,6 @@ # # make compile-requirements # -amqp==2.6.1 \ - --hash=sha256:70cdb10628468ff14e57ec2f751c7aa9e48e7e3651cfd62d431213c0c4e58f21 \ - --hash=sha256:aa7f313fb887c91f15474c1229907a04dac0b8135822d6603437803424c0aa59 - # via - # -r requirements/prod.txt - # kombu apscheduler==3.10.1 \ --hash=sha256:0293937d8f6051a0f493359440c1a1b93e882c57daf0197afeff0e727777b96e \ --hash=sha256:e813ad5ada7aff36fb08cdda746b520531eaac7757832abc204868ba78e0c8f6 @@ -48,12 +42,6 @@ backports-zoneinfo==0.2.1 \ --hash=sha256:f04e857b59d9d1ccc39ce2da1021d196e47234873820cbeaad210724b1ee28ac \ --hash=sha256:fadbfe37f74051d024037f223b8e001611eac868b5c5b06144ef4d8b799862f2 # via -r requirements/prod.txt -billiard==3.6.4.0 \ - --hash=sha256:299de5a8da28a783d51b197d496bef4f1595dd023a93a4f59dde1886ae905547 \ - --hash=sha256:87103ea78fa6ab4d5c751c4909bcff74617d985de7fa8b672cf8618afd5a875b - # via - # -r requirements/prod.txt - # celery black==23.3.0 \ --hash=sha256:064101748afa12ad2291c2b91c960be28b817c0c7eaa35bec09cc63aa56493c5 \ --hash=sha256:0945e13506be58bf7db93ee5853243eb368ace1c08a24c65ce108986eac65915 \ @@ -92,10 +80,6 @@ botocore==1.29.152 \ # -r requirements/prod.txt # boto3 # s3transfer -celery==4.4.7 \ - --hash=sha256:a92e1d56e650781fb747032a3997d16236d037c8199eacd5217d1a72893bca45 \ - --hash=sha256:d220b13a8ed57c78149acf82c006785356071844afe0b27012a4991d44026f9f - # via -r requirements/prod.txt certifi==2022.12.7 \ --hash=sha256:35824b4c3a97115964b408844d64aa14db1cc518f6562e8d7261699d1350a9e3 \ --hash=sha256:4ad3232f5e926d6718ec31cfc1fcadfde020920e278684144551c91769c7bc18 @@ -266,7 +250,10 @@ charset-normalizer==3.0.1 \ click==8.1.3 \ --hash=sha256:7682dc8afb30297001674575ea00d1814d808d6a36af415a82bd481d37ba7b8e \ --hash=sha256:bb4d8133cb15a609f44e8213d9b391b0809795062913b383c62be0ee95b1db48 - # via black + # via + # -r requirements/prod.txt + # black + # rq configparser==5.3.0 \ --hash=sha256:8be267824b541c09b08db124917f48ab525a6c3e837011f3130781a224c57090 \ --hash=sha256:b065779fd93c6bf4cee42202fa4351b4bb842e96a3fb469440e484517a49b9fa @@ -328,6 +315,11 @@ coverage[toml]==7.1.0 \ --hash=sha256:f5b4198d85a3755d27e64c52f8c95d6333119e49fd001ae5798dac872c95e0f8 \ --hash=sha256:ffeeb38ee4a80a30a6877c5c4c359e5498eec095878f1581453202bfacc8fbc2 # via pytest-cov +crontab==1.0.1 \ + --hash=sha256:89477e3f93c81365e738d5ee2659509e6373bb2846de13922663e79aa74c6b91 + # via + # -r requirements/prod.txt + # rq-scheduler cryptography==40.0.2 \ --hash=sha256:05dc219433b14046c476f6f09d7636b92a1c3e5808b9a6536adf4932b3b2c440 \ --hash=sha256:0dcca15d3a19a66e63662dc8d30f8036b07be851a8680eda92d079868f106288 \ @@ -426,6 +418,12 @@ exceptiongroup==1.1.0 \ --hash=sha256:327cbda3da756e2de031a3107b81ab7b3770a602c4d16ca618298c526f4bec1e \ --hash=sha256:bcb67d800a4497e1b404c2dd44fca47d3b7a5e5433dbab67f96c1a685cdfdf23 # via pytest +freezegun==1.2.2 \ + --hash=sha256:cd22d1ba06941384410cd967d8a99d5ae2442f57dfafeff2fda5de8dc5c05446 \ + --hash=sha256:ea1b963b993cb9ea195adbd893a48d573fda951b0da64f60883d7e988b606c9f + # via + # -r requirements/prod.txt + # rq-scheduler greenlet==0.4.17 \ --hash=sha256:1023d7b43ca11264ab7052cb09f5635d4afdb43df55e0854498fc63070a0b206 \ --hash=sha256:124a3ae41215f71dc91d1a3d45cbf2f84e46b543e5d60b99ecc20e24b4c8f272 \ @@ -577,12 +575,6 @@ josepy==1.13.0 \ # via # -r requirements/prod.txt # mozilla-django-oidc -kombu==4.6.11 \ - --hash=sha256:be48cdffb54a2194d93ad6533d73f69408486483d189fe9f5990ee24255b0e0a \ - --hash=sha256:ca1b45faac8c0b18493d02a8571792f3c40291cf2bcf1f55afed3d8f3aa7ba74 - # via - # -r requirements/prod.txt - # celery lxml==4.9.2 \ --hash=sha256:01d36c05f4afb8f7c20fd9ed5badca32a2029b93b1750f571ccc0b142531caf7 \ --hash=sha256:04876580c050a8c5341d706dd464ff04fd597095cc8c023252566a8826505726 \ @@ -783,6 +775,8 @@ python-dateutil==2.8.2 \ # via # -r requirements/prod.txt # botocore + # freezegun + # rq-scheduler python-decouple==3.8 \ --hash=sha256:ba6e2657d4f376ecc46f77a3a615e058d93ba5e465c01bbe57289bfb7cce680f \ --hash=sha256:d0d45340815b25f4de59c974b855bb38d03151d81b037d9e3f463b0c9f8cbd66 @@ -793,7 +787,6 @@ pytz==2022.7.1 \ # via # -r requirements/prod.txt # apscheduler - # celery # django pytz-deprecation-shim==0.1.0.post0 \ --hash=sha256:8314c9692a636c8eb3bda879b9f119e350e93223ae83e70e80c31675a0fdc1a6 \ @@ -849,6 +842,7 @@ redis==4.5.5 \ # via # -r requirements/prod.txt # django-redis + # rq requests==2.28.2 \ --hash=sha256:64299f4909223da747622c030b781c0d7811e359c37124b4bd368fb8c6518baa \ --hash=sha256:98b1b2782e3c6c4904938b84c0eb932721069dfdb9134313beff7c83c2df24bf @@ -866,6 +860,16 @@ requests-oauthlib==1.3.1 \ # via # -r requirements/prod.txt # pysilverpop +rq==1.15.0 \ + --hash=sha256:6bdc8885bf839a246c4b4e02f2ee31d8f840061fced200f13df9a58a582ec04a \ + --hash=sha256:9e89beb034bd253163ac4aa2a0c7d7c7c536aa488df8bbbbc9d354c96e81bb44 + # via + # -r requirements/prod.txt + # rq-scheduler +rq-scheduler==0.13.1 \ + --hash=sha256:89d6a18f215536362b22c0548db7dbb8678bc520c18dc18a82fd0bb2b91695ce \ + --hash=sha256:c2b19c3aedfc7de4d405183c98aa327506e423bf4cdc556af55aaab9bbe5d1a1 + # via -r requirements/prod.txt ruff==0.0.272 \ --hash=sha256:06b8ee4eb8711ab119db51028dd9f5384b44728c23586424fd6e241a5b9c4a3b \ --hash=sha256:1609b864a8d7ee75a8c07578bdea0a7db75a144404e75ef3162e0042bfdc100d \ @@ -967,13 +971,6 @@ user-agents==2.2.0 \ --hash=sha256:a98c4dc72ecbc64812c4534108806fb0a0b3a11ec3fd1eafe807cee5b0a942e7 \ --hash=sha256:d36d25178db65308d1458c5fa4ab39c9b2619377010130329f3955e7626ead26 # via -r requirements/prod.txt -vine==1.3.0 \ - --hash=sha256:133ee6d7a9016f177ddeaf191c1f58421a1dcc6ee9a42c58b34bed40e1d2cd87 \ - --hash=sha256:ea4947cc56d1fd6f2095c8d543ee25dad966f78692528e68b4fada11ba3f98af - # via - # -r requirements/prod.txt - # amqp - # celery webob==1.8.7 \ --hash=sha256:73aae30359291c14fa3b956f8b5ca31960e420c28c1bec002547fb04928cf89b \ --hash=sha256:b64ef5141be559cfade448f044fa45c2260351edcb6a8ef6b7e00c7dcef0c323 diff --git a/requirements/prod.in b/requirements/prod.in index 958e6ef14..28dedb052 100644 --- a/requirements/prod.in +++ b/requirements/prod.in @@ -1,7 +1,6 @@ APScheduler==3.10.1 backports.zoneinfo==0.2.1 boto3==1.26.152 -celery==4.4.7 configparser==5.3.0 contextlib2==21.6.0 cryptography==40.0.2 @@ -33,6 +32,8 @@ pysilverpop==0.2.6 python-decouple==3.8 PyYAML==6.0 redis==4.5.5 +rq==1.15.0 +rq-scheduler==0.13.1 sentry-processor==0.0.1 sentry-sdk==1.25.1 user-agents==2.2.0 diff --git a/requirements/prod.txt b/requirements/prod.txt index cde504fb5..1d719151e 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -4,10 +4,6 @@ # # make compile-requirements # -amqp==2.6.1 \ - --hash=sha256:70cdb10628468ff14e57ec2f751c7aa9e48e7e3651cfd62d431213c0c4e58f21 \ - --hash=sha256:aa7f313fb887c91f15474c1229907a04dac0b8135822d6603437803424c0aa59 - # via kombu apscheduler==3.10.1 \ --hash=sha256:0293937d8f6051a0f493359440c1a1b93e882c57daf0197afeff0e727777b96e \ --hash=sha256:e813ad5ada7aff36fb08cdda746b520531eaac7757832abc204868ba78e0c8f6 @@ -38,10 +34,6 @@ backports-zoneinfo==0.2.1 \ --hash=sha256:f04e857b59d9d1ccc39ce2da1021d196e47234873820cbeaad210724b1ee28ac \ --hash=sha256:fadbfe37f74051d024037f223b8e001611eac868b5c5b06144ef4d8b799862f2 # via -r requirements/prod.in -billiard==3.6.4.0 \ - --hash=sha256:299de5a8da28a783d51b197d496bef4f1595dd023a93a4f59dde1886ae905547 \ - --hash=sha256:87103ea78fa6ab4d5c751c4909bcff74617d985de7fa8b672cf8618afd5a875b - # via celery boto3==1.26.152 \ --hash=sha256:a2778c5729d3dc0b3688c9f0d103543d7ec5ff44a4fd0e84d0d542e2dff05e62 \ --hash=sha256:ee0b8f8d238d4e1cf50fa6a185e4e066955b6105e9838a80b1b6582cd327dfdf @@ -52,10 +44,6 @@ botocore==1.29.152 \ # via # boto3 # s3transfer -celery==4.4.7 \ - --hash=sha256:a92e1d56e650781fb747032a3997d16236d037c8199eacd5217d1a72893bca45 \ - --hash=sha256:d220b13a8ed57c78149acf82c006785356071844afe0b27012a4991d44026f9f - # via -r requirements/prod.in certifi==2022.12.7 \ --hash=sha256:35824b4c3a97115964b408844d64aa14db1cc518f6562e8d7261699d1350a9e3 \ --hash=sha256:4ad3232f5e926d6718ec31cfc1fcadfde020920e278684144551c91769c7bc18 @@ -218,6 +206,10 @@ charset-normalizer==3.0.1 \ --hash=sha256:f9d0c5c045a3ca9bedfc35dca8526798eb91a07aa7a2c0fee134c6c6f321cbd7 \ --hash=sha256:ff6f3db31555657f3163b15a6b7c6938d08df7adbfc9dd13d9d19edad678f1e8 # via requests +click==8.1.3 \ + --hash=sha256:7682dc8afb30297001674575ea00d1814d808d6a36af415a82bd481d37ba7b8e \ + --hash=sha256:bb4d8133cb15a609f44e8213d9b391b0809795062913b383c62be0ee95b1db48 + # via rq configparser==5.3.0 \ --hash=sha256:8be267824b541c09b08db124917f48ab525a6c3e837011f3130781a224c57090 \ --hash=sha256:b065779fd93c6bf4cee42202fa4351b4bb842e96a3fb469440e484517a49b9fa @@ -226,6 +218,9 @@ contextlib2==21.6.0 \ --hash=sha256:3fbdb64466afd23abaf6c977627b75b6139a5a3e8ce38405c5b413aed7a0471f \ --hash=sha256:ab1e2bfe1d01d968e1b7e8d9023bc51ef3509bba217bb730cee3827e1ee82869 # via -r requirements/prod.in +crontab==1.0.1 \ + --hash=sha256:89477e3f93c81365e738d5ee2659509e6373bb2846de13922663e79aa74c6b91 + # via rq-scheduler cryptography==40.0.2 \ --hash=sha256:05dc219433b14046c476f6f09d7636b92a1c3e5808b9a6536adf4932b3b2c440 \ --hash=sha256:0dcca15d3a19a66e63662dc8d30f8036b07be851a8680eda92d079868f106288 \ @@ -318,6 +313,10 @@ email-validator==1.3.1 \ --hash=sha256:49a72f5fa6ed26be1c964f0567d931d10bf3fdeeacdf97bc26ef1cd2a44e0bda \ --hash=sha256:d178c5c6fa6c6824e9b04f199cf23e79ac15756786573c190d2ad13089411ad2 # via -r requirements/prod.in +freezegun==1.2.2 \ + --hash=sha256:cd22d1ba06941384410cd967d8a99d5ae2442f57dfafeff2fda5de8dc5c05446 \ + --hash=sha256:ea1b963b993cb9ea195adbd893a48d573fda951b0da64f60883d7e988b606c9f + # via rq-scheduler greenlet==0.4.17 \ --hash=sha256:1023d7b43ca11264ab7052cb09f5635d4afdb43df55e0854498fc63070a0b206 \ --hash=sha256:124a3ae41215f71dc91d1a3d45cbf2f84e46b543e5d60b99ecc20e24b4c8f272 \ @@ -457,10 +456,6 @@ josepy==1.13.0 \ --hash=sha256:6f64eb35186aaa1776b7a1768651b1c616cab7f9685f9660bffc6491074a5390 \ --hash=sha256:8931daf38f8a4c85274a0e8b7cb25addfd8d1f28f9fb8fbed053dd51aec75dc9 # via mozilla-django-oidc -kombu==4.6.11 \ - --hash=sha256:be48cdffb54a2194d93ad6533d73f69408486483d189fe9f5990ee24255b0e0a \ - --hash=sha256:ca1b45faac8c0b18493d02a8571792f3c40291cf2bcf1f55afed3d8f3aa7ba74 - # via celery lxml==4.9.2 \ --hash=sha256:01d36c05f4afb8f7c20fd9ed5badca32a2029b93b1750f571ccc0b142531caf7 \ --hash=sha256:04876580c050a8c5341d706dd464ff04fd597095cc8c023252566a8826505726 \ @@ -611,7 +606,10 @@ pysilverpop==0.2.6 \ python-dateutil==2.8.2 \ --hash=sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86 \ --hash=sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9 - # via botocore + # via + # botocore + # freezegun + # rq-scheduler python-decouple==3.8 \ --hash=sha256:ba6e2657d4f376ecc46f77a3a615e058d93ba5e465c01bbe57289bfb7cce680f \ --hash=sha256:d0d45340815b25f4de59c974b855bb38d03151d81b037d9e3f463b0c9f8cbd66 @@ -621,7 +619,6 @@ pytz==2022.7.1 \ --hash=sha256:78f4f37d8198e0627c5f1143240bb0206b8691d8d7ac6d78fee88b78733f8c4a # via # apscheduler - # celery # django pytz-deprecation-shim==0.1.0.post0 \ --hash=sha256:8314c9692a636c8eb3bda879b9f119e350e93223ae83e70e80c31675a0fdc1a6 \ @@ -675,6 +672,7 @@ redis==4.5.5 \ # via # -r requirements/prod.in # django-redis + # rq requests==2.28.2 \ --hash=sha256:64299f4909223da747622c030b781c0d7811e359c37124b4bd368fb8c6518baa \ --hash=sha256:98b1b2782e3c6c4904938b84c0eb932721069dfdb9134313beff7c83c2df24bf @@ -689,6 +687,16 @@ requests-oauthlib==1.3.1 \ --hash=sha256:2577c501a2fb8d05a304c09d090d6e47c306fef15809d102b327cf8364bddab5 \ --hash=sha256:75beac4a47881eeb94d5ea5d6ad31ef88856affe2332b9aafb52c6452ccf0d7a # via pysilverpop +rq==1.15.0 \ + --hash=sha256:6bdc8885bf839a246c4b4e02f2ee31d8f840061fced200f13df9a58a582ec04a \ + --hash=sha256:9e89beb034bd253163ac4aa2a0c7d7c7c536aa488df8bbbbc9d354c96e81bb44 + # via + # -r requirements/prod.in + # rq-scheduler +rq-scheduler==0.13.1 \ + --hash=sha256:89d6a18f215536362b22c0548db7dbb8678bc520c18dc18a82fd0bb2b91695ce \ + --hash=sha256:c2b19c3aedfc7de4d405183c98aa327506e423bf4cdc556af55aaab9bbe5d1a1 + # via -r requirements/prod.in s3transfer==0.6.0 \ --hash=sha256:06176b74f3a15f61f1b4f25a1fc29a4429040b7647133a463da8fa5bd28d5ecd \ --hash=sha256:2ed07d3866f523cc561bf4a00fc5535827981b117dd7876f036b0c1aca42c947 @@ -743,12 +751,6 @@ user-agents==2.2.0 \ --hash=sha256:a98c4dc72ecbc64812c4534108806fb0a0b3a11ec3fd1eafe807cee5b0a942e7 \ --hash=sha256:d36d25178db65308d1458c5fa4ab39c9b2619377010130329f3955e7626ead26 # via -r requirements/prod.in -vine==1.3.0 \ - --hash=sha256:133ee6d7a9016f177ddeaf191c1f58421a1dcc6ee9a42c58b34bed40e1d2cd87 \ - --hash=sha256:ea4947cc56d1fd6f2095c8d543ee25dad966f78692528e68b4fada11ba3f98af - # via - # amqp - # celery webob==1.8.7 \ --hash=sha256:73aae30359291c14fa3b956f8b5ca31960e420c28c1bec002547fb04928cf89b \ --hash=sha256:b64ef5141be559cfade448f044fa45c2260351edcb6a8ef6b7e00c7dcef0c323