Skip to content

Commit

Permalink
Merge pull request #55 from OvalMoney/add_ssl_option
Browse files Browse the repository at this point in the history
Add SSL connection options
  • Loading branch information
Fabio Todaro authored Apr 20, 2021
2 parents fccaa27 + d9d2aaf commit 0d1d376
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 14 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ classifier = [
"Development Status :: 4 - Beta",
"Environment :: Console",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3 :: Only",
"Operating System :: OS Independent"
]
requires-dist = ["click>=7", "celery>=4,<5", "prometheus_client>=0.0.20"]
requires-dist = ["click>=7", "celery>=4,<5", "prometheus_client>=0.0.20", "redis>=3.2.0; extra == 'redis'"]
provides-extra = ["redis"]

[package.metadata.maturin.scripts]
celery-exporter = "celery_exporter.__main__:main"
1 change: 0 additions & 1 deletion celery_exporter/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@

60 changes: 54 additions & 6 deletions celery_exporter/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import time

import click
from .core import CeleryExporter

__VERSION__ = (2, 0, 0)
from .core import CeleryExporter
from .utils import generate_broker_use_ssl, get_transport_scheme

LOG_FORMAT = "[%(asctime)s] %(name)s:%(levelname)s: %(message)s"

Expand Down Expand Up @@ -62,20 +62,58 @@
allow_from_autoenv=False,
help="Periodically enable Celery events.",
)
@click.option(
"--use-ssl",
is_flag=True,
help="Enable SSL usage on broker connection if redis or amqp.",
)
@click.option(
"--ssl-verify",
type=click.Choice(
["CERT_NONE", "CERT_OPTIONAL", "CERT_REQUIRED"], case_sensitive=True
),
default="CERT_REQUIRED",
help="SSL verify mode.",
)
@click.option(
"--ssl-ca-certs",
type=click.Path(
exists=True, file_okay=True, dir_okay=False, writable=False, readable=True
),
help="SSL path to the CA certificate.",
)
@click.option(
"--ssl-certfile",
type=click.Path(
exists=True, file_okay=True, dir_okay=False, writable=False, readable=True
),
help="SSL path to the Client Certificate.",
)
@click.option(
"--ssl-keyfile",
type=click.Path(
exists=True, file_okay=True, dir_okay=False, writable=False, readable=True
),
help="SSL path to the Client Key.",
)
@click.option(
"--tz", type=str, allow_from_autoenv=False, help="Timezone used by the celery app."
)
@click.option(
"--verbose", is_flag=True, allow_from_autoenv=False, help="Enable verbose logging."
)
@click.version_option(version=".".join([str(x) for x in __VERSION__]))
def main(
broker_url,
listen_address,
max_tasks,
namespace,
transport_options,
enable_events,
use_ssl,
ssl_verify,
ssl_ca_certs,
ssl_certfile,
ssl_keyfile,
tz,
verbose,
): # pragma: no cover
Expand All @@ -93,22 +131,32 @@ def main(
try:
transport_options = json.loads(transport_options)
except ValueError:
print(
logging.error(
"Error parsing broker transport options from JSON '{}'".format(
transport_options
),
file=sys.stderr,
)
)
sys.exit(1)

broker_use_ssl = generate_broker_use_ssl(
use_ssl,
get_transport_scheme(broker_url),
ssl_verify,
ssl_ca_certs,
ssl_certfile,
ssl_keyfile,
)

celery_exporter = CeleryExporter(
broker_url,
listen_address,
max_tasks,
namespace,
transport_options,
enable_events,
broker_use_ssl,
)

celery_exporter.start()

def shutdown(signum, frame): # pragma: no cover
Expand Down
9 changes: 6 additions & 3 deletions celery_exporter/core.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import celery
import logging

import celery
import prometheus_client

from .monitor import (
EnableEventsThread,
TaskThread,
WorkerMonitoringThread,
EnableEventsThread,
setup_metrics,
)

Expand All @@ -20,13 +22,14 @@ def __init__(
namespace="celery",
transport_options=None,
enable_events=False,
broker_use_ssl=None,
):
self._listen_address = listen_address
self._max_tasks = max_tasks
self._namespace = namespace
self._enable_events = enable_events

self._app = celery.Celery(broker=broker_url)
self._app = celery.Celery(broker=broker_url, broker_use_ssl=broker_use_ssl)
self._app.conf.broker_transport_options = transport_options or {}

def start(self):
Expand Down
6 changes: 3 additions & 3 deletions celery_exporter/monitor.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import collections
from itertools import chain
import logging
import time
import threading
import time
from itertools import chain

import celery
import celery.states

from .metrics import TASKS, TASKS_RUNTIME, LATENCY, WORKERS
from .celery_exporter import CeleryState
from .metrics import LATENCY, TASKS, TASKS_RUNTIME, WORKERS
from .utils import get_config


Expand Down
36 changes: 36 additions & 0 deletions celery_exporter/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import ssl
from itertools import chain
from urllib.parse import urlparse

CELERY_DEFAULT_QUEUE = "celery"
CELERY_MISSING_DATA = "undefined"
Expand Down Expand Up @@ -40,3 +42,37 @@ def get_config(app):
else:
res[task_name] = default
return res


def get_transport_scheme(broker_url):
return urlparse(broker_url)[0]


def generate_broker_use_ssl(
use_ssl, broker_scheme, ssl_verify, ssl_ca_certs, ssl_certfile, ssl_keyfile
):
scheme_map = {"redis": "ssl_", "amqp": ""}

verify_map = {
"CERT_NONE": ssl.CERT_NONE,
"CERT_OPTIONAL": ssl.CERT_OPTIONAL,
"CERT_REQUIRED": ssl.CERT_REQUIRED,
}

if not use_ssl:
return None

if broker_scheme not in list(scheme_map.keys()):
raise ValueError(f"Unsupported transport for SSL: {broker_scheme}")

if ssl_verify not in list(verify_map.keys()):
raise ValueError(f"Unsupported ssl_verify argument: {ssl_verify}")

prefix = scheme_map.get(broker_scheme)

return {
f"{prefix}keyfile": ssl_keyfile,
f"{prefix}certfile": ssl_certfile,
f"{prefix}ca_certs": ssl_ca_certs,
f"{prefix}cert_reqs": verify_map.get(ssl_verify),
}
64 changes: 64 additions & 0 deletions test/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import ssl
import pytest

from celery_exporter.utils import get_transport_scheme, generate_broker_use_ssl


@pytest.mark.parametrize("brokers", [("redis://foo", "redis"), ("amqp://bar", "amqp")])
def test_get_transport_scheme(brokers):
assert get_transport_scheme(brokers[0]) == brokers[1]


def test_generate_broker_use_ssl_no_ssl():
assert (
generate_broker_use_ssl(
False, "redis", "CERT_NONE", "path/ca.pem", "path/cert.pem", "path/key.pem"
)
== None
)


def test_generate_broker_use_ssl_exception():
with pytest.raises(ValueError):
generate_broker_use_ssl(
True, "wrong", "CERT_NONE", "path/ca.pem", "path/cert.pem", "path/key.pem"
)

with pytest.raises(ValueError):
generate_broker_use_ssl(
True,
"redis",
"WRONG_VERIFY",
"path/ca.pem",
"path/cert.pem",
"path/key.pem",
)


def test_generate_broker_use_ssl():
assert generate_broker_use_ssl(
True, "redis", "CERT_NONE", "path/ca.pem", "path/cert.pem", "path/key.pem"
) == {
"ssl_keyfile": "path/key.pem",
"ssl_certfile": "path/cert.pem",
"ssl_ca_certs": "path/ca.pem",
"ssl_cert_reqs": ssl.CERT_NONE,
}

assert generate_broker_use_ssl(
True, "amqp", "CERT_OPTIONAL", "path/ca.pem", "path/cert.pem", "path/key.pem"
) == {
"keyfile": "path/key.pem",
"certfile": "path/cert.pem",
"ca_certs": "path/ca.pem",
"cert_reqs": ssl.CERT_OPTIONAL,
}

assert generate_broker_use_ssl(
True, "amqp", "CERT_REQUIRED", "path/ca.pem", "path/cert.pem", "path/key.pem"
) == {
"keyfile": "path/key.pem",
"certfile": "path/cert.pem",
"ca_certs": "path/ca.pem",
"cert_reqs": ssl.CERT_REQUIRED,
}

0 comments on commit 0d1d376

Please sign in to comment.