From 0c75f20149ebc998e3fd248c380abe5735a96d83 Mon Sep 17 00:00:00 2001 From: Christopher Pappas Date: Tue, 11 Feb 2020 17:48:52 -0500 Subject: [PATCH] getting consumner to run via management command; --- .../core/management/commands/run_consumer.py | 18 ++++++ registrar/consumer.py | 62 ++++++++++--------- 2 files changed, 51 insertions(+), 29 deletions(-) create mode 100644 registrar/apps/core/management/commands/run_consumer.py diff --git a/registrar/apps/core/management/commands/run_consumer.py b/registrar/apps/core/management/commands/run_consumer.py new file mode 100644 index 00000000..7f750f8a --- /dev/null +++ b/registrar/apps/core/management/commands/run_consumer.py @@ -0,0 +1,18 @@ +""" Management command to run worker that will act on messages """ +import logging + +from django.contrib.auth.models import Group +from django.core.management.base import BaseCommand, CommandError + +from registrar.consumer import run_consumer_worker + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + # pylint: disable=missing-docstring + + help = 'Runs a worker to act on messages received from queue.' + + def handle(self, *args, **options): + run_consumer_worker() diff --git a/registrar/consumer.py b/registrar/consumer.py index b7f8c178..ae5a12b2 100644 --- a/registrar/consumer.py +++ b/registrar/consumer.py @@ -1,39 +1,43 @@ - """ - Defines the Kombu consumer for the registrar project. - """ - from __future__ import absolute_import +""" +Defines the Kombu consumer for the registrar project. +""" +from __future__ import absolute_import - from kombu.mixins import ConsumerMixin - from kombu import Exchange, Queue +from kombu.mixins import ConsumerMixin +from kombu import Exchange, Queue - task_exchange = Exchange('course_discovery', type='direct') - queues = [ - Queue('task_queue', task_exchange, routing_key='task_queue'), - ] +from registrar.apps.core.models import Organization - class Worker(ConsumerMixin): +task_exchange = Exchange('course_discovery', type='direct') +queues = [ + Queue('task_queue', task_exchange, routing_key='task_queue'), +] - def __init__(self, connection): - self.connection = connection +class Worker(ConsumerMixin): - def get_consumers(self, Consumer, channel): - return [ - Consumer(queues, callbacks=[self.on_message], accept=['json']), - ] + def __init__(self, connection): + self.connection = connection - def on_message(self, body, message): - print('RECEIVED MESSAGE: {0!r}'.format(body)) - message.ack() + def get_consumers(self, Consumer, channel): + return [ + Consumer(queues, callbacks=[self.on_message], accept=['json']), + ] + def on_message(self, body, message): + print(Organization.objects.first()) + print("If this prints, then we can access Django models!") + print('RECEIVED MESSAGE: {0!r}'.format(body)) + message.ack() - if __name__ == '__main__': - from kombu import Connection - from kombu.utils.debug import setup_logging - setup_logging(loglevel='DEBUG') - with Connection('redis://:password@redis:6379/0') as conn: - try: - Worker(conn).run() - except KeyboardInterrupt: - print('bye bye') +def run_consumer_worker(): + from kombu import Connection + from kombu.utils.debug import setup_logging + setup_logging(loglevel='DEBUG') + + with Connection('redis://:password@redis:6379/0') as conn: + try: + Worker(conn).run() + except KeyboardInterrupt: + print('bye bye')