Skip to content

Commit

Permalink
Control Docker more robustly
Browse files Browse the repository at this point in the history
Previously, Docker was controlled through custom shell commands issued
via `pytest-testinfra`. Multiple loops where used to wait for containers
to converge to the desired state. Given the test requires quite some
resources in terms of RAM, CPU, and bandwith, its execution can take
between 20 minutes and a full hour. Therefore, it would be better
to avoid having to edit the code to fiddle with timeouts and loop
counts.

One of the way we used to wait for completion was looking at the
Docker service logs for specific messages. Sadly, `pytest-testinfra`
does not really provide a way to monitor the output of a command as it
runs.

All-in-all, it felt easier to replace the custom shell commands by a
library offering a Python API to control Docker. While `dockerpy` is the
official library, its future maintainance by Docker has been in flux
(see: docker/docker-py#2989) and more
importantly, as it speaks directly to the Docker socket, it does not
support anything like `docker stack`.

Meanwhile, the `python_on_whales` has been featured on the official
Docker blog (https://www.docker.com/blog/guest-post-calling-the-docker-cli-from-python-with-python-on-whales/)
and implements as much as possible of Docker command line interface (by
shelling out).

With it, it was possible to:

- Replace all command lines with Python calls.
- Wait for containers to shut down on teardown before removing volumes.
- Wait for log messages to appear by blocking on the service output.
- Wait for Docker readyness when we start the stack and scale services.

Some refactoring and naming improvements have been made along the way.
Some leftover breakpoints were also removed.

Because we still would like to timeout in case of a problem,
`pytest-timeout` has been added to the requirements with a default
timeout of 30 minutes.
  • Loading branch information
Jérémy Bobbio (Lunar) committed Oct 24, 2022
1 parent adc6295 commit c062adb
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 127 deletions.
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ log_cli = true
log_cli_level = "INFO"
log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)"
log_cli_date_format = "%Y-%m-%d %H:%M:%S"
timeout = 1871
timeout_method = "signal"

[build-system]
requires = ["setuptools", "wheel", "tree_sitter"]
requires = ["setuptools", "wheel"]
3 changes: 2 additions & 1 deletion requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pytest
pytest-testinfra
pytest-dotenv
pytest-timeout
requests
msgpack
confluent-kafka
python-on-whales
6 changes: 2 additions & 4 deletions tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ As of today, only docker swarm based deployment tests are available.

## docker swarm deployment tests

This test is using
[pytest-testinfra](https://github.com/pytest-dev/pytest-testinfra) to
orchestrate the deployment and checks that are made against the replicated
Software Heritage Archive.
This test is using a Docker Swarm to orchestrate the deployment and checks that
are made against the replicated Software Heritage Archive.

The idea of this test is:
- a test dataset is built by loading a few origins in a dedicated swh instance
Expand Down
90 changes: 41 additions & 49 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
from os import chdir, environ
from pathlib import Path
from shutil import copy, copytree
import time
from uuid import uuid4

import pytest
import testinfra
from python_on_whales import DockerClient, DockerException

APIURL = "http://127.0.0.1:5080/api/1/"
SWH_IMAGE_TAG = environ["SWH_IMAGE_TAG"]
Expand All @@ -34,17 +33,18 @@ def pytest_addoption(parser, pluginmanager):


@pytest.fixture(scope="session")
def docker_host():
return testinfra.get_host("local://")
def docker_client():
return DockerClient()


# scope='session' so we use the same container for all the tests;
@pytest.fixture(scope="session")
def mirror_stack(request, docker_host, tmp_path_factory):
def mirror_stack(request, docker_client, tmp_path_factory):
tmp_path = tmp_path_factory.mktemp("mirror")
copytree(SRC_PATH / "conf", tmp_path / "conf")
copytree(SRC_PATH / "env", tmp_path / "env")
copy(SRC_PATH / "mirror.yml", tmp_path)
Path(tmp_path / "secret").write_bytes(b"not-so-secret\n")
cwd = Path.cwd()
chdir(tmp_path)
# copy test-specific conf files
Expand All @@ -63,59 +63,51 @@ def mirror_stack(request, docker_host, tmp_path_factory):
stack_name = f"swhtest_{tmp_path.name}"

LOGGER.info("Create missing secrets")
existing_secrets = [
line.strip()
for line in docker_host.check_output(
"docker secret ls --format '{{.Name}}'"
).splitlines()
]
for srv in ("storage", "web", "vault", "scheduler"):
secret = f"swh-mirror-{srv}-db-password"
if secret not in existing_secrets:
LOGGER.info("Creating secret %s", secret)
docker_host.check_output(
f"echo not-so-secret | docker secret create {secret} -"
)
secret_name = f"swh-mirror-{srv}-db-password"
try:
docker_client.secret.create(secret_name, tmp_path / "secret")
LOGGER.info("Created secret %s", secret_name)
except DockerException as e:
if "code = AlreadyExists" not in e.stderr:
raise

LOGGER.info("Remove config objects (if any)")
existing_configs = [
line.strip()
for line in docker_host.check_output(
"docker config ls --format '{{.Name}}'"
).splitlines()
]
for cfg in existing_configs:
if cfg.startswith(f"{stack_name}_"):
docker_host.check_output(f"docker config rm {cfg}")
existing_configs = docker_client.config.list(
filters={"label=com.docker.stack.namespace": stack_name}
)
for config in existing_configs:
config.remove()

LOGGER.info("Deploy docker stack %s", stack_name)
docker_host.check_output(f"docker stack deploy -c mirror.yml {stack_name}")
docker_stack = docker_client.stack.deploy(stack_name, "mirror.yml")

yield stack_name
yield docker_stack

# breakpoint()
if not request.config.getoption("keep_stack"):
LOGGER.info("Remove stack %s", stack_name)
docker_host.check_output(f"docker stack rm {stack_name}")
# wait for services to be down
LOGGER.info("Wait for all services of %s to be down", stack_name)
while docker_host.check_output(
"docker service ls --format {{.Name}} "
f"--filter label=com.docker.stack.namespace={stack_name}"
):
time.sleep(0.2)

# give a bit of time to docker to sync the state of service<->volumes
# relations so the next step runs ok
time.sleep(20)
LOGGER.info("Remove volumes of stack %s", stack_name)
for volume in docker_host.check_output(
"docker volume ls --format {{.Name}} "
f"--filter label=com.docker.stack.namespace={stack_name}"
).splitlines():
docker_stack.remove()
stack_containers = docker_client.container.list(
filters={"label=com.docker.stack.namespace": stack_name}
)

try:
LOGGER.info("Waiting for all containers of %s to be down", stack_name)
docker_client.container.wait(stack_containers)
except DockerException as e:
# We have a TOCTOU issue, so skip the error if some containers have already
# been stopped by the time we wait for them.
if "No such container" not in e.stderr:
raise

LOGGER.info("Remove volumes of stack %s", stack_name)
stack_volumes = docker_client.volume.list(
filters={"label=com.docker.stack.namespace": stack_name}
)
for volume in stack_volumes:
try:
docker_host.check_output(f"docker volume rm {volume}")
except AssertionError:
LOGGER.error("Failed to remove volume %s", volume)
volume.remove()
except DockerException:
LOGGER.exception("Failed to remove volume %s", volume)

chdir(cwd)
166 changes: 94 additions & 72 deletions tests/test_graph_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@
import re
import tarfile
import time
from typing import Dict
from urllib.parse import quote

from confluent_kafka import Consumer, KafkaException
import msgpack
from python_on_whales import DockerException
import requests

from .conftest import KAFKA_GROUPID, KAFKA_PASSWORD, KAFKA_USERNAME, LOGGER

SERVICES = {
INITIAL_SERVICES_STATUS = {
"{}_amqp": "1/1",
"{}_content-replayer": "0/0",
"{}_grafana": "1/1",
Expand All @@ -40,57 +42,74 @@
"{}_scheduler-listener": "1/1",
"{}_scheduler-runner": "1/1",
}
ATTEMPTS = 600
DELAY = 1
SCALE = 2
API_URL = "http://127.0.0.1:5081/api/1"


def running_services(host, stack):
all_services = host.check_output(
"docker service ls --format '{{.Name}} {{.Replicas}}'"
)
return dict(
line.split()[:2]
for line in all_services.splitlines()
if line.startswith(f"{stack}_")
)


def check_running_services(host, stack, services):
LOGGER.info("Waiting for services %s", services)
mirror_services_ = {}
for i in range(ATTEMPTS):
mirror_services = running_services(host, stack)
mirror_services = {k: v for k, v in mirror_services.items() if k in services}
if mirror_services == services:
LOGGER.info("Got them all!")
break
if mirror_services != mirror_services_:
LOGGER.info("Not yet there %s", mirror_services)
mirror_services_ = mirror_services
time.sleep(0.5)
return mirror_services == services


def get_logs(host, service):
rows = host.check_output(f"docker service logs -t {service}").splitlines()
reg = re.compile(
rf"^(?P<timestamp>.+) {service}[.]"
r"(?P<num>\d+)[.](?P<id>\w+)@(?P<host>\w+) +[|] "
r"(?P<logline>.+)$"
)
return [m.groupdict() for m in (reg.match(row) for row in rows) if m is not None]
def service_target_replicas(service):
if "Replicated" in service.spec.mode:
return service.spec.mode["Replicated"]["Replicas"]
elif "Global" in service.spec.mode:
return 1
else:
raise ValueError(f"Unknown mode {service.spec.mode}")


def wait_for_log_entry(host, service, logline, occurrences=1):
for i in range(ATTEMPTS):
logs = get_logs(host, service)
match = [entry for entry in logs if logline in entry["logline"]]
if match and len(match) >= occurrences:
return match
time.sleep(DELAY)
return []
def is_task_running(task):
try:
return task.status.state == "running"
except DockerException as e:
# A task might already have disappeared before we can get its status.
# In that case, we know for sure it’s not running.
if "No such object" in e.stderr:
return False
else:
raise


def wait_services_status(stack, target_status: Dict[str, int]):
LOGGER.info("Waiting for services %s", target_status)
last_changed_status = {}
while True:
services = [
service
for service in stack.services()
if service.spec.name in target_status
]
status = {
service.spec.name: "%s/%s"
% (
len([True for task in service.ps() if is_task_running(task)]),
service_target_replicas(service),
)
for service in services
}
if status == target_status:
LOGGER.info("Got them all!")
break
if status != last_changed_status:
LOGGER.info("Not yet there %s", status)
last_changed_status = status
time.sleep(1)
return status == target_status


def wait_for_log_entry(docker_client, service, log_entry, occurrences=1):
count = 0
for stream_type, stream_content in docker_client.service.logs(
service, follow=True, stream=True
):
LOGGER.debug("%s output: %s", service.spec.name, stream_content)
if stream_type != "stdout":
continue
count += len(
re.findall(
re.escape(log_entry.encode("us-ascii", errors="replace")),
stream_content,
)
)
if count >= occurrences:
break


def content_get(url, done):
Expand Down Expand Up @@ -319,40 +338,43 @@ def on_assign(cons, parts):
return stats


def test_mirror(host, mirror_stack):
services = {k.format(mirror_stack): v for k, v in SERVICES.items()}
check_running_services(host, mirror_stack, services)
def test_mirror(docker_client, mirror_stack):
initial_services_status = {
k.format(mirror_stack.name): v for k, v in INITIAL_SERVICES_STATUS.items()
}
wait_services_status(mirror_stack, initial_services_status)

# run replayer services
for service_type in ("content", "graph"):
service = f"{mirror_stack}_{service_type}-replayer"
LOGGER.info("Scale %s to 1", service)
host.check_output(f"docker service scale -d {service}=1")
if not check_running_services(host, mirror_stack, {service: "1/1"}):
breakpoint()
logs = wait_for_log_entry(
host, service, f"Starting the SWH mirror {service_type} replayer"
service = docker_client.service.inspect(
f"{mirror_stack}_{service_type}-replayer"
)
LOGGER.info("Scale %s to 1", service.spec.name)
service.scale(1)
wait_services_status(mirror_stack, {service.spec.name: "1/1"})
wait_for_log_entry(
docker_client,
service,
f"Starting the SWH mirror {service_type} replayer",
1,
)
assert len(logs) == 1

LOGGER.info("Scale %s to %d", service, SCALE)
host.check_output(f"docker service scale -d {service}={SCALE}")
check_running_services(host, mirror_stack, {service: f"{SCALE}/{SCALE}"})
logs = wait_for_log_entry(
host, service, f"Starting the SWH mirror {service_type} replayer", SCALE
LOGGER.info("Scale %s to %d", service.spec.name, SCALE)
service.scale(SCALE)
wait_for_log_entry(
docker_client,
service,
f"Starting the SWH mirror {service_type} replayer",
SCALE,
)
assert len(logs) == SCALE

# wait for the replaying to be done (stop_on_oef is true)
LOGGER.info("Wait for %s to be done", service)
logs = wait_for_log_entry(host, service, "Done.", SCALE)
# >= SCALE below because replayer services may have been restarted
# (once done) before we scale them to 0
if not (len(logs) >= SCALE):
breakpoint()
assert len(logs) >= SCALE
LOGGER.info("Scale %s to 0", service)
check_running_services(host, mirror_stack, {service: f"0/{SCALE}"})
LOGGER.info("Wait for %s to be done", service.spec.name)
wait_for_log_entry(docker_client, service, "Done.", SCALE)

LOGGER.info("Scale %s to 0", service.spec.name)
service.scale(0)
wait_services_status(mirror_stack, {service.spec.name: "0/0"})

# TODO: check there are no error reported in redis after the replayers are done

Expand Down

0 comments on commit c062adb

Please sign in to comment.