Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial factory commit #43

Merged
merged 18 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ test-zambeze:
tags:
- cades
script:
- python3.9 -m venv /opt/zambeze/zambeze-py39-env
- source /opt/zambeze/zambeze-py39-env/bin/activate
- python3.9 -m pip install -r requirements.txt
- chmod 600 $ZAMBEZE_CI_TEST_RSYNC_SSH_KEY
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ pyyaml>=5.4.1
pyzmq>=23.2.0
setuptools>=49.3.1
typer>=0.4.2
pytest-asyncio>=0.19.0
31 changes: 31 additions & 0 deletions tests/test_queue_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Local imports
from zambeze.orchestration.queue.queue_factory import QueueFactory
from zambeze.orchestration.zambeze_types import ChannelType, QueueType

# Standard imports
import asyncio
import pytest
import os
import random


async def factory_nats(queue):
await queue.connect()
await queue.subscribe(ChannelType.TEST)
original_number = random.randint(0, 100000000000)
await queue.send(ChannelType.TEST, {"value": original_number})
returned_msg = await queue.nextMsg(ChannelType.TEST)
assert returned_msg["value"] == original_number
await queue.close()


@pytest.mark.gitlab_runner
def test_factory_nats():

factory = QueueFactory()
config = {}
config["ip"] = os.getenv("ZAMBEZE_CI_TEST_NATS_IP")
config["port"] = os.getenv("ZAMBEZE_CI_TEST_NATS_PORT")

queue = factory.create(QueueType.NATS, config)
asyncio.run(factory_nats(queue))
105 changes: 105 additions & 0 deletions tests/test_queue_nats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Local imports
from zambeze.orchestration.queue.queue_nats import QueueNATS
from zambeze.orchestration.zambeze_types import ChannelType, QueueType

# Standard imports
import asyncio
import os
import pytest
import random


@pytest.mark.unit
def test_queue_nats_type():
queue = QueueNATS({})
assert queue.type == QueueType.NATS


@pytest.mark.unit
def test_queue_nats_uri():

queue = QueueNATS({})
assert queue.uri == "nats://127.0.0.1:4222"

config = {}
config["ip"] = "127.0.0.1"
config["port"] = "4222"
queue = QueueNATS(config)
assert queue.uri == f"nats://{config['ip']}:{config['port']}"


@pytest.mark.unit
def test_queue_nats_connected():

queue = QueueNATS({})
assert queue.uri == "nats://127.0.0.1:4222"

config = {}
config["ip"] = "127.0.0.1"
config["port"] = "4222"
queue = QueueNATS(config)
assert queue.connected is False


async def queue_nats_connect_close(config):
queue = QueueNATS(config)
assert queue.connected is False
result = await queue.connect()
print(result)
assert queue.connected
await queue.close()
assert queue.connected is False


@pytest.mark.gitlab_runner
def test_queue_nats_connect_close():

config = {}
config["ip"] = os.getenv("ZAMBEZE_CI_TEST_NATS_IP")
config["port"] = os.getenv("ZAMBEZE_CI_TEST_NATS_PORT")

asyncio.run(queue_nats_connect_close(config))


async def queue_nats_subscribe(config):
queue = QueueNATS(config)
assert len(queue.subscriptions) == 0
await queue.connect()
await queue.subscribe(ChannelType.TEST)
assert len(queue.subscriptions) == 1
assert queue.subscriptions[0] == ChannelType.TEST
await queue.subscribe(ChannelType.ACTIVITY)
assert len(queue.subscriptions) == 2
await queue.unsubscribe(ChannelType.TEST)
assert len(queue.subscriptions) == 1
assert queue.subscriptions[0] == ChannelType.ACTIVITY


@pytest.mark.gitlab_runner
def test_queue_nats_subscribe():

config = {}
config["ip"] = os.getenv("ZAMBEZE_CI_TEST_NATS_IP")
config["port"] = os.getenv("ZAMBEZE_CI_TEST_NATS_PORT")
asyncio.run(queue_nats_subscribe(config))


async def queue_nats_send_subscribe_nextMsg(config, original_number):
queue = QueueNATS(config)
await queue.connect()
await queue.subscribe(ChannelType.TEST)
await queue.send(ChannelType.TEST, {"value": original_number})
returned_msg = await queue.nextMsg(ChannelType.TEST)
assert returned_msg["value"] == original_number
await queue.close()


@pytest.mark.gitlab_runner
def test_queue_nats_send_subscribe_nextMsg():

config = {}
config["ip"] = os.getenv("ZAMBEZE_CI_TEST_NATS_IP")
config["port"] = os.getenv("ZAMBEZE_CI_TEST_NATS_PORT")
original_number = random.randint(0, 100000000000)

asyncio.run(queue_nats_send_subscribe_nextMsg(config, original_number))
Empty file.
128 changes: 128 additions & 0 deletions zambeze/orchestration/queue/abstract_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from abc import ABC, abstractmethod
from ..zambeze_types import QueueType, ChannelType


class AbstractQueue(ABC):
@property
@abstractmethod
def type(self) -> QueueType:
"""Returns the Queue Client type. i.e. RabbitMQ"""
raise NotImplementedError(
"type - method does not exist for:" f"{self._queue_type.value}"
)

@property
@abstractmethod
def uri(self) -> str:
"""Returns the uri to the queue i.e. http://127.0.0.1:1451"""
raise NotImplementedError(
"uri - method does not exist for:" f"{self._queue_type.value}"
)

@property
@abstractmethod
def connected(self) -> bool:
"""If the Queue Client was able to connect to the Queue returns True
else it will return False"""
raise NotImplementedError(
"connected - method does not exist for:" f"{self._queue_type.value}"
)

@abstractmethod
async def connect(self) -> (bool, str):
"""This method will attempt to connect the client to the Queue.

:return: if able to connect will return True with a string saying as
much, if unable to connect will return False with an error message.
:rtype: tuple(bool, str)

Example:

>>> val = await queue.connect()
"""
raise NotImplementedError(
"connect - method does not exist for:" f"{self._queue_type.value}"
)

@property
@abstractmethod
def subscribed(self, channel: ChannelType = None) -> bool:
"""Checks to see if the client is subscribed to a particular channel.

:param channel: This is the channel that we are checking to see if
the client is subscribed too. If no channel is provided will set equal
to None
:type channel: ChannelType
:return: Will return true if subscribed to at least one channel or if
a channel is provided, will return True if subscribed to that
particular channel.
:rtype: bool

Example:
This example assumes the following code is called from within an async
function.

>>> queue = factory.create(QueueType.NATS, config)
>>> assert queue.subscribed is False
>>> await queue.connect()
>>> assert queue.subscribed is False
>>> await queue.subscribe(ChannelType.STATUS)
>>> assert queue.subscribed is True
>>> assert queue.subscribed(ChannelType.STATUS) is True
>>> await queue.unsubscribe(ChannelType.STATUS)
>>> assert queue.subscribed(ChannelType.STATUS) is False
"""
raise NotImplementedError(
"subscribed - method does not exist for:" f"{self._queue_type.value}"
)

@abstractmethod
async def subscribe(self, channel: ChannelType):
"""Subscribe to a channel.

There are no limits to the number of channels that can be subscribed
too.

:param channel: the channel to subscribe to
:type channel: ChannelType
"""
raise NotImplementedError(
"subscribe - method does not exist for:" f"{self._queue_type.value}"
)

@property
@abstractmethod
def subscriptions(self) -> list[ChannelType]:
"""Returns a list containing all of the channels the client is
subscribed too."""
raise NotImplementedError(
"subscriptions - method does not exist for:" f"{self._queue_type.value}"
)

@abstractmethod
async def nextMsg(self, channel: ChannelType) -> dict:
raise NotImplementedError(
"nextMsg - method does not exist for:" f"{self._queue_type.value}"
)

async def ackMsg(self):
raise NotImplementedError(
"ackMsg - method does not exist for:" f"{self._queue_type.value}"
)

async def nackMsg(self):
raise NotImplementedError(
"nackMsg - method does not exist for:" f"{self._queue_type.value}"
)

@abstractmethod
async def send(self, channel: ChannelType, body: dict):
raise NotImplementedError(
"send - method does not exist for:" f"{self._queue_type.value}"
)

@abstractmethod
async def close(self):
raise NotImplementedError(
"close - method does not exist for:" f"{self._queue_type.value}"
)
59 changes: 59 additions & 0 deletions zambeze/orchestration/queue/queue_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import logging
from typing import Optional

from .queue_nats import QueueNATS
from .abstract_queue import AbstractQueue
from ..zambeze_types import QueueType


class QueueFactory:
def __init__(self, logger: Optional[logging.Logger] = None):
self._logger = logger

def create(self, queue_type: QueueType, args: dict) -> AbstractQueue:
"""Is responsible for creating a Queue Client.

The advantages of a factory method is to isolate the implementation
details of a concrete type to a single file and to provide a standard
interface for their creation and access.

:param queue_type: the queue type to be created.
:type queue_type: the type to created.
:param args: the arguments specific to the client that are needed for
its construction.
:type args: dict
:return: Should return an abstract Queue based on the provided enum.
:rtype: AbstractQueue

Example:

The following example assumes the code block appears in an async
function.

>>> RabbitMQ = QueueType.RabbitMQ
>>> NATS = QueueType.NATS
>>> factory = QueueFactory(logger)
>>> # Create first client
>>> args = { "ip": 127.0.0.1, "port": 4222 }
>>> queue_clients[NATS] = QueueFactory.create(NATS, args)
>>> # Create second client
>>> args = { "ip": 127.0.0.1, "port": 5672 }
>>> queue_clients[RabbitMQ] = QueueFactory.create(RabbitMQ, args)
>>> # Loop through clients and print uri
>>> for client in queue_clients:
>>> print(client.uri)
>>> # Have each client connect to their own queue
>>> for client in queue_clients:
>>> await client.connect()
"""
if queue_type == QueueType.NATS:
return QueueNATS(args, self._logger)
elif queue_type == QueueType.RABBITMQ:
raise Exception(
"RabbitMQ queue client has not yet been implemented: "
f"{queue_type.value}"
)
else:
raise Exception(
"Unrecognized queue type cannot instantiate: " f"{queue_type.value}"
)
Loading