Skip to content

Commit

Permalink
AAP-27923: implement EDA HA Cluster Redis support. (#1000)
Browse files Browse the repository at this point in the history
Co-authored-by: Madhu Kanoor <mkanoor@redhat.com>
Co-authored-by: Doston <31990136+Dostonbek1@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 13, 2024
1 parent ca215ca commit 8d843a0
Show file tree
Hide file tree
Showing 11 changed files with 259 additions and 48 deletions.
4 changes: 2 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

196 changes: 181 additions & 15 deletions src/aap_eda/core/tasking/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,19 @@
import logging
from datetime import datetime, timedelta
from types import MethodType
from typing import Any, Callable, Iterable, Optional, Protocol, Type, Union
from typing import (
Any,
Callable,
Iterable,
List,
Optional,
Protocol,
Type,
Union,
)

import redis
import rq
import rq_scheduler
from ansible_base.lib.redis.client import (
DABRedis,
DABRedisCluster,
Expand All @@ -24,7 +32,9 @@
DEFAULT_WORKER_TTL,
)
from rq.job import Job as _Job, JobStatus
from rq.registry import StartedJobRegistry
from rq.serializers import JSONSerializer
from rq_scheduler import Scheduler as _Scheduler

from aap_eda.settings import default

Expand Down Expand Up @@ -73,9 +83,47 @@ def get_redis_client(**kwargs):
DAB will return an appropriate client for HA based on the passed
parameters.
"""
# HA cluster does not support an alternate redis db and will generate an
# exception if we pass a value (even the default). If we're in that
# situation we drop the db and, if the db is anything other than the
# default log an informational message.
db = kwargs.get("db", None)
if (db is not None) and (kwargs.get("mode", "") == "cluster"):
del kwargs["db"]
if db != default.DEFAULT_REDIS_DB:
logger.info(
f"clustered redis supports only the default db"
f"; db specified: {db}"
)

return _get_redis_client(_create_url_from_parameters(**kwargs), **kwargs)


class Scheduler(_Scheduler):
"""Custom scheduler class."""

def __init__(
self,
queue_name="default",
queue=None,
interval=60,
connection=None,
job_class=None,
queue_class=None,
name=None,
):
connection = _get_necessary_client_connection(connection)
super().__init__(
queue_name=queue_name,
queue=queue,
interval=interval,
connection=connection,
job_class=job_class,
queue_class=queue_class,
name=name,
)


def enable_redis_prefix():
redis_prefix = settings.RQ_REDIS_PREFIX

Expand All @@ -102,16 +150,12 @@ def enable_redis_prefix():
f"{redis_prefix}:canceled:{0}"
)

rq_scheduler.Scheduler.redis_scheduler_namespace_prefix = (
Scheduler.redis_scheduler_namespace_prefix = (
f"{redis_prefix}:scheduler_instance:"
)
rq_scheduler.Scheduler.scheduler_key = f"{redis_prefix}:scheduler"
rq_scheduler.Scheduler.scheduler_lock_key = (
f"{redis_prefix}:scheduler_lock"
)
rq_scheduler.Scheduler.scheduled_jobs_key = (
f"{redis_prefix}:scheduler:scheduled_jobs"
)
Scheduler.scheduler_key = f"{redis_prefix}:scheduler"
Scheduler.scheduler_lock_key = f"{redis_prefix}:scheduler_lock"
Scheduler.scheduled_jobs_key = f"{redis_prefix}:scheduler:scheduled_jobs"

def eda_get_key(job_id):
return f"{redis_prefix}:results:{job_id}"
Expand Down Expand Up @@ -168,7 +212,7 @@ def __init__(
super().__init__(
name=name,
default_timeout=default_timeout,
connection=connection,
connection=_get_necessary_client_connection(connection),
is_async=is_async,
job_class=job_class,
serializer=serializer,
Expand All @@ -190,6 +234,7 @@ def __init__(
):
if serializer is None:
serializer = JSONSerializer
connection = _get_necessary_client_connection(connection)

super().__init__(id, connection, serializer)

Expand All @@ -207,7 +252,130 @@ def _get_necessary_client_connection(connection: Connection) -> Connection:
return connection


class DefaultWorker(_Worker):
class Worker(_Worker):
"""Custom worker class.
Provides establishment of DAB Redis client and work arounds for various
DABRedisCluster issues.
"""

def __init__(
self,
queues: Iterable[Union[Queue, str]],
name: Optional[str] = None,
default_result_ttl: int = DEFAULT_RESULT_TTL,
connection: Optional[Connection] = None,
exc_handler: Any = None,
exception_handlers: _ErrorHandlersArgType = None,
default_worker_ttl: int = DEFAULT_WORKER_TTL,
job_class: Type[_Job] = None,
queue_class: Type[_Queue] = None,
log_job_description: bool = True,
job_monitoring_interval: int = DEFAULT_JOB_MONITORING_INTERVAL,
disable_default_exception_handler: bool = False,
prepare_for_work: bool = True,
serializer: Optional[SerializerProtocol] = None,
):
connection = _get_necessary_client_connection(connection)
super().__init__(
queues=queues,
name=name,
default_result_ttl=default_result_ttl,
connection=connection,
exc_handler=exc_handler,
exception_handlers=exception_handlers,
default_worker_ttl=default_worker_ttl,
job_class=job_class,
queue_class=queue_class,
log_job_description=log_job_description,
job_monitoring_interval=job_monitoring_interval,
disable_default_exception_handler=disable_default_exception_handler, # noqa: E501
prepare_for_work=prepare_for_work,
serializer=JSONSerializer,
)

def _set_connection(
self,
connection: Union[DABRedis, DABRedisCluster],
) -> Union[DABRedis, DABRedisCluster]:
# A DABRedis connection doesn't need intervention.
if isinstance(connection, DABRedis):
return super()._set_connection(connection)

try:
connection_pool = connection.connection_pool
current_socket_timeout = connection_pool.connection_kwargs.get(
"socket_timeout"
)
if current_socket_timeout is None:
timeout_config = {"socket_timeout": self.connection_timeout}
connection_pool.connection_kwargs.update(timeout_config)
except AttributeError:
nodes = connection.get_nodes()
for node in nodes:
connection_pool = node.redis_connection.connection_pool
current_socket_timeout = connection_pool.connection_kwargs.get(
"socket_timeout"
)
if current_socket_timeout is None:
timeout_config = {
"socket_timeout": self.connection_timeout
}
connection_pool.connection_kwargs.update(timeout_config)
return connection

@classmethod
def all(
cls,
connection: Optional[Union[DABRedis, DABRedisCluster]] = None,
job_class: Optional[Type[Job]] = None,
queue_class: Optional[Type[Queue]] = None,
queue: Optional[Queue] = None,
serializer=None,
) -> List[Worker]:
# If we don't have a queue (whose connection would be used) make
# certain that we have an appropriate connection and pass it
# to the superclass.
if queue is None:
connection = _get_necessary_client_connection(connection)
return super().all(
connection,
job_class,
queue_class,
queue,
serializer,
)

def handle_job_success(
self, job: Job, queue: Queue, started_job_registry: StartedJobRegistry
):
# A DABRedis connection doesn't need intervention.
if isinstance(self.connection, DABRedis):
return super().handle_job_success(job, queue, started_job_registry)

# For DABRedisCluster perform success handling.
# DABRedisCluster doesn't provide the watch, multi, etc. methods
# necessary for the superclass implementation, but we don't need
# them as there's no dependencies in how we use the jobs.
with self.connection.pipeline() as pipeline:
self.set_current_job_id(None, pipeline=pipeline)
self.increment_successful_job_count(pipeline=pipeline)
self.increment_total_working_time(
job.ended_at - job.started_at,
pipeline,
)

result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
job._handle_success(result_ttl, pipeline=pipeline)

job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False)
started_job_registry.remove(job, pipeline=pipeline)

pipeline.execute()


class DefaultWorker(Worker):
"""Custom default worker class used for non-activation tasks.
Uses JSONSerializer as a default one.
Expand All @@ -234,7 +402,6 @@ def __init__(
job_class = Job
if queue_class is None:
queue_class = Queue
connection = _get_necessary_client_connection(connection)

super().__init__(
queues=queues,
Expand All @@ -254,7 +421,7 @@ def __init__(
)


class ActivationWorker(_Worker):
class ActivationWorker(Worker):
"""Custom worker class used for activation related tasks.
Uses JSONSerializer as a default one.
Expand All @@ -281,7 +448,6 @@ def __init__(
job_class = Job
if queue_class is None:
queue_class = Queue
connection = _get_necessary_client_connection(connection)
queue_name = settings.RULEBOOK_QUEUE_NAME

super().__init__(
Expand Down
28 changes: 25 additions & 3 deletions src/aap_eda/settings/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,10 @@ def _get_databases_settings() -> dict:
# TASKING SETTINGS
# ---------------------------------------------------------
RQ = {
"QUEUE_CLASS": "aap_eda.core.tasking.Queue",
"JOB_CLASS": "aap_eda.core.tasking.Job",
"QUEUE_CLASS": "aap_eda.core.tasking.Queue",
"SCHEDULER_CLASS": "aap_eda.core.tasking.Scheduler",
"WORKER_CLASS": "aap_eda.core.tasking.Worker",
}

REDIS_UNIX_SOCKET_PATH = settings.get("MQ_UNIX_SOCKET_PATH", None)
Expand All @@ -370,9 +372,18 @@ def _get_databases_settings() -> dict:
REDIS_CLIENT_CACERT_PATH = settings.get("MQ_CLIENT_CACERT_PATH", None)
REDIS_CLIENT_CERT_PATH = settings.get("MQ_CLIENT_CERT_PATH", None)
REDIS_CLIENT_KEY_PATH = settings.get("MQ_CLIENT_KEY_PATH", None)
REDIS_DB = settings.get("MQ_DB", 0)
DEFAULT_REDIS_DB = 0
REDIS_DB = settings.get("MQ_DB", DEFAULT_REDIS_DB)
RQ_REDIS_PREFIX = settings.get("RQ_REDIS_PREFIX", "eda-rq")

# The HA cluster hosts is a string of <host>:<port>[,<host>:port>]+
# and is exhaustive; i.e., not in addition to REDIS_HOST:REDIS_PORT.
# EDA does not validate the content, but relies on DAB to do so.
#
# In establishing an HA Cluster Redis client connection DAB ignores
# the host and port kwargs.
REDIS_HA_CLUSTER_HOSTS = settings.get("MQ_REDIS_HA_CLUSTER_HOSTS", "").strip()


def _rq_common_parameters():
params = {
Expand Down Expand Up @@ -403,14 +414,25 @@ def _rq_redis_client_additional_parameters():
return params


def rq_redis_client_instantiation_parameters():
def rq_standalone_redis_client_instantiation_parameters():
params = _rq_common_parameters() | _rq_redis_client_additional_parameters()

# Convert to lowercase for use in instantiating a redis client.
params = {k.lower(): v for (k, v) in params.items()}
return params


def rq_redis_client_instantiation_parameters():
params = rq_standalone_redis_client_instantiation_parameters()

# Include the HA cluster parameters.
if REDIS_HA_CLUSTER_HOSTS:
params["mode"] = "cluster"
params["redis_hosts"] = REDIS_HA_CLUSTER_HOSTS

return params


# A list of queues to be used in multinode mode
# If the list is empty, use the default singlenode queue name
RULEBOOK_WORKER_QUEUES = settings.get("RULEBOOK_WORKER_QUEUES", [])
Expand Down
3 changes: 1 addition & 2 deletions src/aap_eda/tasks/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django_rq import get_queue
from rq import Worker

import aap_eda.tasks.activation_request_queue as requests_queue
from aap_eda.core import models
Expand All @@ -31,7 +30,7 @@
ProcessParentType,
)
from aap_eda.core.models import Activation, ActivationRequestQueue, EventStream
from aap_eda.core.tasking import unique_enqueue
from aap_eda.core.tasking import Worker, unique_enqueue
from aap_eda.services.activation import exceptions
from aap_eda.services.activation.activation_manager import (
ActivationManager,
Expand Down
Loading

0 comments on commit 8d843a0

Please sign in to comment.