Skip to content
This repository has been archived by the owner on Sep 11, 2024. It is now read-only.

Cpappas/hackathonxxiii messaging #258

Merged
merged 1 commit into from
Feb 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions registrar/apps/core/management/commands/run_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
""" Management command to run worker that will act on messages """
import logging

from django.contrib.auth.models import Group
from django.core.management.base import BaseCommand, CommandError

from registrar.consumer import run_consumer_worker

logger = logging.getLogger(__name__)


class Command(BaseCommand):
# pylint: disable=missing-docstring

help = 'Runs a worker to act on messages received from queue.'

def handle(self, *args, **options):
run_consumer_worker()
62 changes: 33 additions & 29 deletions registrar/consumer.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,43 @@
"""
Defines the Kombu consumer for the registrar project.
"""
from __future__ import absolute_import
"""
Defines the Kombu consumer for the registrar project.
"""
from __future__ import absolute_import

from kombu.mixins import ConsumerMixin
from kombu import Exchange, Queue
from kombu.mixins import ConsumerMixin
from kombu import Exchange, Queue

task_exchange = Exchange('course_discovery', type='direct')
queues = [
Queue('task_queue', task_exchange, routing_key='task_queue'),
]
from registrar.apps.core.models import Organization

class Worker(ConsumerMixin):
task_exchange = Exchange('course_discovery', type='direct')
queues = [
Queue('task_queue', task_exchange, routing_key='task_queue'),
]

def __init__(self, connection):
self.connection = connection
class Worker(ConsumerMixin):

def get_consumers(self, Consumer, channel):
return [
Consumer(queues, callbacks=[self.on_message], accept=['json']),
]
def __init__(self, connection):
self.connection = connection

def on_message(self, body, message):
print('RECEIVED MESSAGE: {0!r}'.format(body))
message.ack()
def get_consumers(self, Consumer, channel):
return [
Consumer(queues, callbacks=[self.on_message], accept=['json']),
]

def on_message(self, body, message):
print(Organization.objects.first())
print("If this prints, then we can access Django models!")
print('RECEIVED MESSAGE: {0!r}'.format(body))
message.ack()

if __name__ == '__main__':
from kombu import Connection
from kombu.utils.debug import setup_logging
setup_logging(loglevel='DEBUG')

with Connection('redis://:password@redis:6379/0') as conn:
try:
Worker(conn).run()
except KeyboardInterrupt:
print('bye bye')
def run_consumer_worker():
from kombu import Connection
from kombu.utils.debug import setup_logging
setup_logging(loglevel='DEBUG')

with Connection('redis://:password@redis:6379/0') as conn:
try:
Worker(conn).run()
except KeyboardInterrupt:
print('bye bye')