Skip to content

Commit

Permalink
Merge pull request #62 from wrieg123/authenticated-supervisor
Browse files Browse the repository at this point in the history
add support for client authentication; fix e2e client tests
  • Loading branch information
timkpaine authored Jan 11, 2025
2 parents cc2c74d + 0951bfb commit 6622ba1
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 33 deletions.
19 changes: 9 additions & 10 deletions airflow_supervisor/client/xmlrpc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime
from enum import Enum
from typing import Dict, List
from typing import Dict, List, Optional
from xmlrpc.client import Fault, ServerProxy

from pydantic import BaseModel
Expand Down Expand Up @@ -116,17 +116,16 @@ def __init__(self, cfg: SupervisorAirflowConfiguration):
self._port = int(cfg.airflow.port.split(":")[-1])
self._protocol = cfg.airflow.protocol
self._rpcpath = "/" + cfg.airflow.rpcpath if not cfg.airflow.rpcpath.startswith("/") else cfg.airflow.rpcpath

if cfg.airflow.port == 80:
# force http
self._rpcurl = f"http://{self._host}{self._rpcpath}"
elif cfg.airflow.port == 443:
# force https
self._rpcurl = f"https://{self._host}{self._rpcpath}"
else:
self._rpcurl = f"{self._protocol}://{self._host}:{self._port}{self._rpcpath}"
self._rpcurl = self._build_rpcurl(username=cfg.airflow.username, password=cfg.airflow.password)
self._client = ServerProxy(self._rpcurl)

def _build_rpcurl(self, username: Optional[str], password: Optional[str]) -> str:
# Forces http or https based on port, otherwise resolves to given protocol
protocol = {80: "http", 443: "https"}.get(self._port, self._protocol)
port = "" if self._port in {80, 443} else f":{self._port}"
authentication = f"{username}:{password.get_secret_value()}@" if username and password else ""
return f"{protocol}://{authentication}{self._host}{port}{self._rpcpath}"

#######################
# supervisord methods #
#######################
Expand Down
2 changes: 1 addition & 1 deletion airflow_supervisor/config/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class AirflowConfiguration(_BaseCfgModel):
)
password: Optional[SecretStr] = Field(
default=None,
description="he password required for authentication to the HTTP/Unix server. This can be a cleartext password, or can be specified as a SHA-1 hash if prefixed by the string {SHA}. For example, {SHA}82ab876d1387bfafe46cc1c8a2ef074eae50cb1d is the SHA-stored version of the password “thepassword”. Note that hashed password must be in hex format.",
description="The password required for authentication to the HTTP/Unix server. This can be a cleartext password, or can be specified as a SHA-1 hash if prefixed by the string {SHA}. For example, {SHA}82ab876d1387bfafe46cc1c8a2ef074eae50cb1d is the SHA-stored version of the password “thepassword”. Note that hashed password must be in hex format.",
)

#################
Expand Down
51 changes: 30 additions & 21 deletions airflow_supervisor/tests/client/test_client_e2e.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,37 @@
import xmlrpc
from time import sleep

import pytest

from airflow_supervisor import SupervisorAirflowConfiguration, SupervisorRemoteXMLRPCClient
from airflow_supervisor.client.xmlrpc import ProcessState


def test_supervisor_client(supervisor_instance: SupervisorAirflowConfiguration):
client = SupervisorRemoteXMLRPCClient(supervisor_instance)
print(client.getProcessInfo("test"))
sleep(0.5)
print(client.startAllProcesses())
sleep(0.5)
print(client.getProcessInfo("test"))
sleep(0.5)
print(client.getProcessInfo("test"))
sleep(0.5)
print(client.getProcessInfo("test"))
print(client.startProcess("test"))
sleep(0.5)
print(client.startProcess("test"))
sleep(0.5)
print(client.stopAllProcesses())
sleep(0.5)
print(client.startProcess("test"))
def _assert_client_actions(client: SupervisorRemoteXMLRPCClient):
assert client.getProcessInfo("test").state == ProcessState.STOPPED
sleep(0.5)
print(client.stopAllProcesses())
sleep(0.5)
print(client.stopProcess("test"))
assert client.startAllProcesses()["test"].state == ProcessState.RUNNING
sleep(0.5)
assert client.getProcessInfo("test").state == ProcessState.EXITED
assert client.startProcess("test").state == ProcessState.RUNNING
assert client.stopProcess("test").state == ProcessState.STOPPED
assert client.startProcess("test").state == ProcessState.RUNNING
assert client.stopAllProcesses()["test"].state == ProcessState.STOPPED


def test_supervisor_client(supervisor_instance: SupervisorAirflowConfiguration):
client = SupervisorRemoteXMLRPCClient(supervisor_instance)
_assert_client_actions(client=client)


def test_permissioned_supervisor_client_rejected(permissioned_supervisor_instance: SupervisorAirflowConfiguration):
permissioned_supervisor_instance.airflow.username = "bad-username"
client = SupervisorRemoteXMLRPCClient(permissioned_supervisor_instance)
with pytest.raises(xmlrpc.client.ProtocolError):
client.getProcessInfo("test")


def test_permissioned_supervisor_client(permissioned_supervisor_instance: SupervisorAirflowConfiguration):
permissioned_supervisor_instance.airflow.username = "user1"
client = SupervisorRemoteXMLRPCClient(permissioned_supervisor_instance)
_assert_client_actions(client=client)
45 changes: 44 additions & 1 deletion airflow_supervisor/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ def open_port() -> int:
return port


@fixture(scope="module")
def permissioned_open_port() -> int:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 0))
s.listen(1)
port = s.getsockname()[1]
s.close()
return port


@fixture(scope="module")
def supervisor_airflow_configuration(open_port: int) -> Iterator[SupervisorAirflowConfiguration]:
with NamedTemporaryFile("w", suffix=".cfg") as tf:
Expand All @@ -26,7 +36,26 @@ def supervisor_airflow_configuration(open_port: int) -> Iterator[SupervisorAirfl
path=tf.name,
program={
"test": ProgramConfiguration(
command="sleep 1 && exit 1",
command="bash -c 'sleep 1; exit 1'",
)
},
)
yield cfg


@fixture(scope="module")
def permissioned_supervisor_airflow_configuration(
permissioned_open_port: int,
) -> Iterator[SupervisorAirflowConfiguration]:
with NamedTemporaryFile("w", suffix=".cfg") as tf:
cfg = SupervisorAirflowConfiguration(
airflow=AirflowConfiguration(
port=f"*:{permissioned_open_port}", username="user1", password="testpassword1"
),
path=tf.name,
program={
"test": ProgramConfiguration(
command="bash -c 'sleep 1; exit 1'",
)
},
)
Expand All @@ -45,3 +74,17 @@ def supervisor_instance(
sleep(1)
yield cfg
cfg.kill()


@fixture(scope="module")
def permissioned_supervisor_instance(
permissioned_supervisor_airflow_configuration: SupervisorAirflowConfiguration,
) -> Iterator[SupervisorAirflowConfiguration]:
cfg = permissioned_supervisor_airflow_configuration
cfg.write()
cfg.start(daemon=False)
for _ in range(5):
if not cfg.running():
sleep(1)
yield cfg
cfg.kill()

0 comments on commit 6622ba1

Please sign in to comment.