Skip to content

Commit

Permalink
Merge pull request #43 from ORNL/JoshuaSBrown-factory-queue
Browse files Browse the repository at this point in the history
initial factory commit
  • Loading branch information
JoshuaSBrown authored Sep 30, 2022
2 parents 3898ce4 + 8982af8 commit 3fba1cc
Show file tree
Hide file tree
Showing 9 changed files with 488 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ test-zambeze:
tags:
- cades
script:
- python3.9 -m venv /opt/zambeze/zambeze-py39-env
- source /opt/zambeze/zambeze-py39-env/bin/activate
- python3.9 -m pip install -r requirements.txt
- chmod 600 $ZAMBEZE_CI_TEST_RSYNC_SSH_KEY
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ pyyaml>=5.4.1
pyzmq>=23.2.0
setuptools>=49.3.1
typer>=0.4.2
pytest-asyncio>=0.19.0
31 changes: 31 additions & 0 deletions tests/test_queue_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Local imports
from zambeze.orchestration.queue.queue_factory import QueueFactory
from zambeze.orchestration.zambeze_types import ChannelType, QueueType

# Standard imports
import asyncio
import pytest
import os
import random


async def factory_nats(queue):
await queue.connect()
await queue.subscribe(ChannelType.TEST)
original_number = random.randint(0, 100000000000)
await queue.send(ChannelType.TEST, {"value": original_number})
returned_msg = await queue.nextMsg(ChannelType.TEST)
assert returned_msg["value"] == original_number
await queue.close()


@pytest.mark.gitlab_runner
def test_factory_nats():

factory = QueueFactory()
config = {}
config["ip"] = os.getenv("ZAMBEZE_CI_TEST_NATS_IP")
config["port"] = os.getenv("ZAMBEZE_CI_TEST_NATS_PORT")

queue = factory.create(QueueType.NATS, config)
asyncio.run(factory_nats(queue))
105 changes: 105 additions & 0 deletions tests/test_queue_nats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Local imports
from zambeze.orchestration.queue.queue_nats import QueueNATS
from zambeze.orchestration.zambeze_types import ChannelType, QueueType

# Standard imports
import asyncio
import os
import pytest
import random


@pytest.mark.unit
def test_queue_nats_type():
queue = QueueNATS({})
assert queue.type == QueueType.NATS


@pytest.mark.unit
def test_queue_nats_uri():

queue = QueueNATS({})
assert queue.uri == "nats://127.0.0.1:4222"

config = {}
config["ip"] = "127.0.0.1"
config["port"] = "4222"
queue = QueueNATS(config)
assert queue.uri == f"nats://{config['ip']}:{config['port']}"


@pytest.mark.unit
def test_queue_nats_connected():

queue = QueueNATS({})
assert queue.uri == "nats://127.0.0.1:4222"

config = {}
config["ip"] = "127.0.0.1"
config["port"] = "4222"
queue = QueueNATS(config)
assert queue.connected is False


async def queue_nats_connect_close(config):
queue = QueueNATS(config)
assert queue.connected is False
result = await queue.connect()
print(result)
assert queue.connected
await queue.close()
assert queue.connected is False


@pytest.mark.gitlab_runner
def test_queue_nats_connect_close():

config = {}
config["ip"] = os.getenv("ZAMBEZE_CI_TEST_NATS_IP")
config["port"] = os.getenv("ZAMBEZE_CI_TEST_NATS_PORT")

asyncio.run(queue_nats_connect_close(config))


async def queue_nats_subscribe(config):
queue = QueueNATS(config)
assert len(queue.subscriptions) == 0
await queue.connect()
await queue.subscribe(ChannelType.TEST)
assert len(queue.subscriptions) == 1
assert queue.subscriptions[0] == ChannelType.TEST
await queue.subscribe(ChannelType.ACTIVITY)
assert len(queue.subscriptions) == 2
await queue.unsubscribe(ChannelType.TEST)
assert len(queue.subscriptions) == 1
assert queue.subscriptions[0] == ChannelType.ACTIVITY


@pytest.mark.gitlab_runner
def test_queue_nats_subscribe():

config = {}
config["ip"] = os.getenv("ZAMBEZE_CI_TEST_NATS_IP")
config["port"] = os.getenv("ZAMBEZE_CI_TEST_NATS_PORT")
asyncio.run(queue_nats_subscribe(config))


async def queue_nats_send_subscribe_nextMsg(config, original_number):
queue = QueueNATS(config)
await queue.connect()
await queue.subscribe(ChannelType.TEST)
await queue.send(ChannelType.TEST, {"value": original_number})
returned_msg = await queue.nextMsg(ChannelType.TEST)
assert returned_msg["value"] == original_number
await queue.close()


@pytest.mark.gitlab_runner
def test_queue_nats_send_subscribe_nextMsg():

config = {}
config["ip"] = os.getenv("ZAMBEZE_CI_TEST_NATS_IP")
config["port"] = os.getenv("ZAMBEZE_CI_TEST_NATS_PORT")
original_number = random.randint(0, 100000000000)

asyncio.run(queue_nats_send_subscribe_nextMsg(config, original_number))
Empty file.
128 changes: 128 additions & 0 deletions zambeze/orchestration/queue/abstract_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from abc import ABC, abstractmethod
from ..zambeze_types import QueueType, ChannelType


class AbstractQueue(ABC):
@property
@abstractmethod
def type(self) -> QueueType:
"""Returns the Queue Client type. i.e. RabbitMQ"""
raise NotImplementedError(
"type - method does not exist for:" f"{self._queue_type.value}"
)

@property
@abstractmethod
def uri(self) -> str:
"""Returns the uri to the queue i.e. http://127.0.0.1:1451"""
raise NotImplementedError(
"uri - method does not exist for:" f"{self._queue_type.value}"
)

@property
@abstractmethod
def connected(self) -> bool:
"""If the Queue Client was able to connect to the Queue returns True
else it will return False"""
raise NotImplementedError(
"connected - method does not exist for:" f"{self._queue_type.value}"
)

@abstractmethod
async def connect(self) -> (bool, str):
"""This method will attempt to connect the client to the Queue.
:return: if able to connect will return True with a string saying as
much, if unable to connect will return False with an error message.
:rtype: tuple(bool, str)
Example:
>>> val = await queue.connect()
"""
raise NotImplementedError(
"connect - method does not exist for:" f"{self._queue_type.value}"
)

@property
@abstractmethod
def subscribed(self, channel: ChannelType = None) -> bool:
"""Checks to see if the client is subscribed to a particular channel.
:param channel: This is the channel that we are checking to see if
the client is subscribed too. If no channel is provided will set equal
to None
:type channel: ChannelType
:return: Will return true if subscribed to at least one channel or if
a channel is provided, will return True if subscribed to that
particular channel.
:rtype: bool
Example:
This example assumes the following code is called from within an async
function.
>>> queue = factory.create(QueueType.NATS, config)
>>> assert queue.subscribed is False
>>> await queue.connect()
>>> assert queue.subscribed is False
>>> await queue.subscribe(ChannelType.STATUS)
>>> assert queue.subscribed is True
>>> assert queue.subscribed(ChannelType.STATUS) is True
>>> await queue.unsubscribe(ChannelType.STATUS)
>>> assert queue.subscribed(ChannelType.STATUS) is False
"""
raise NotImplementedError(
"subscribed - method does not exist for:" f"{self._queue_type.value}"
)

@abstractmethod
async def subscribe(self, channel: ChannelType):
"""Subscribe to a channel.
There are no limits to the number of channels that can be subscribed
too.
:param channel: the channel to subscribe to
:type channel: ChannelType
"""
raise NotImplementedError(
"subscribe - method does not exist for:" f"{self._queue_type.value}"
)

@property
@abstractmethod
def subscriptions(self) -> list[ChannelType]:
"""Returns a list containing all of the channels the client is
subscribed too."""
raise NotImplementedError(
"subscriptions - method does not exist for:" f"{self._queue_type.value}"
)

@abstractmethod
async def nextMsg(self, channel: ChannelType) -> dict:
raise NotImplementedError(
"nextMsg - method does not exist for:" f"{self._queue_type.value}"
)

async def ackMsg(self):
raise NotImplementedError(
"ackMsg - method does not exist for:" f"{self._queue_type.value}"
)

async def nackMsg(self):
raise NotImplementedError(
"nackMsg - method does not exist for:" f"{self._queue_type.value}"
)

@abstractmethod
async def send(self, channel: ChannelType, body: dict):
raise NotImplementedError(
"send - method does not exist for:" f"{self._queue_type.value}"
)

@abstractmethod
async def close(self):
raise NotImplementedError(
"close - method does not exist for:" f"{self._queue_type.value}"
)
59 changes: 59 additions & 0 deletions zambeze/orchestration/queue/queue_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import logging
from typing import Optional

from .queue_nats import QueueNATS
from .abstract_queue import AbstractQueue
from ..zambeze_types import QueueType


class QueueFactory:
def __init__(self, logger: Optional[logging.Logger] = None):
self._logger = logger

def create(self, queue_type: QueueType, args: dict) -> AbstractQueue:
"""Is responsible for creating a Queue Client.
The advantages of a factory method is to isolate the implementation
details of a concrete type to a single file and to provide a standard
interface for their creation and access.
:param queue_type: the queue type to be created.
:type queue_type: the type to created.
:param args: the arguments specific to the client that are needed for
its construction.
:type args: dict
:return: Should return an abstract Queue based on the provided enum.
:rtype: AbstractQueue
Example:
The following example assumes the code block appears in an async
function.
>>> RabbitMQ = QueueType.RabbitMQ
>>> NATS = QueueType.NATS
>>> factory = QueueFactory(logger)
>>> # Create first client
>>> args = { "ip": 127.0.0.1, "port": 4222 }
>>> queue_clients[NATS] = QueueFactory.create(NATS, args)
>>> # Create second client
>>> args = { "ip": 127.0.0.1, "port": 5672 }
>>> queue_clients[RabbitMQ] = QueueFactory.create(RabbitMQ, args)
>>> # Loop through clients and print uri
>>> for client in queue_clients:
>>> print(client.uri)
>>> # Have each client connect to their own queue
>>> for client in queue_clients:
>>> await client.connect()
"""
if queue_type == QueueType.NATS:
return QueueNATS(args, self._logger)
elif queue_type == QueueType.RABBITMQ:
raise Exception(
"RabbitMQ queue client has not yet been implemented: "
f"{queue_type.value}"
)
else:
raise Exception(
"Unrecognized queue type cannot instantiate: " f"{queue_type.value}"
)
Loading

0 comments on commit 3fba1cc

Please sign in to comment.