Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-4736] Improve async replication stability #526

Merged
merged 5 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions src/relations/async_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ def __init__(self, charm):
super().__init__(charm, "postgresql")
self.charm = charm
self.framework.observe(
self.charm.on[REPLICATION_OFFER_RELATION].relation_joined,
self._on_async_relation_joined,
self.charm.on[REPLICATION_OFFER_RELATION].relation_created,
self._on_async_relation_created,
)
self.framework.observe(
self.charm.on[REPLICATION_CONSUMER_RELATION].relation_joined,
self._on_async_relation_joined,
self.charm.on[REPLICATION_CONSUMER_RELATION].relation_created,
self._on_async_relation_created,
)
self.framework.observe(
self.charm.on[REPLICATION_OFFER_RELATION].relation_changed,
Expand Down Expand Up @@ -468,7 +468,7 @@ def is_primary_cluster(self) -> bool:
return self.charm.app == self._get_primary_cluster()

def _on_async_relation_broken(self, _) -> None:
if "departing" in self.charm._peers.data[self.charm.unit]:
if self.charm._peers is None or "departing" in self.charm._peers.data[self.charm.unit]:
logger.debug("Early exit on_async_relation_broken: Skipping departing unit.")
return

Expand Down Expand Up @@ -526,19 +526,21 @@ def _on_async_relation_changed(self, event: RelationChangedEvent) -> None:
if self._wait_for_standby_leader(event):
return

if (
not self.container.can_connect()
or len(self.container.pebble.get_services(names=[self.charm._postgresql_service])) == 0
):
logger.debug("Early exit on_async_relation_changed: container hasn't started yet.")
event.defer()
return

# Update the asynchronous replication configuration and start the database.
self.charm.update_config()
self.container.start(self.charm._postgresql_service)

self._handle_database_start(event)

def _on_async_relation_departed(self, event: RelationDepartedEvent) -> None:
"""Set a flag to avoid setting a wrong status message on relation broken event handler."""
# This is needed because of https://bugs.launchpad.net/juju/+bug/1979811.
if event.departing_unit == self.charm.unit:
self.charm._peers.data[self.charm.unit].update({"departing": "True"})

def _on_async_relation_joined(self, _) -> None:
def _on_async_relation_created(self, _) -> None:
"""Publish this unit address in the relation data."""
self._relation.data[self.charm.unit].update({"unit-address": self._get_unit_ip()})

Expand All @@ -549,6 +551,12 @@ def _on_async_relation_joined(self, _) -> None:
"unit-promoted-cluster-counter": highest_promoted_cluster_counter
})

def _on_async_relation_departed(self, event: RelationDepartedEvent) -> None:
"""Set a flag to avoid setting a wrong status message on relation broken event handler."""
# This is needed because of https://bugs.launchpad.net/juju/+bug/1979811.
if event.departing_unit == self.charm.unit:
self.charm._peers.data[self.charm.unit].update({"departing": "True"})

def _on_create_replication(self, event: ActionEvent) -> None:
"""Set up asynchronous replication between two clusters."""
if self._get_primary_cluster() is not None:
Expand Down
164 changes: 164 additions & 0 deletions tests/unit/test_async_replication.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

from unittest.mock import patch

import pytest
from ops.testing import Harness

from charm import PostgresqlOperatorCharm
from constants import PEER

RELATION_NAMES = ["replication-offer", "replication"]


@pytest.fixture(autouse=True)
def harness():
with patch("charm.KubernetesServicePatch", lambda x, y: None):
harness = Harness(PostgresqlOperatorCharm)

# Set up the initial relation and hooks.
harness.set_leader(True)
harness.begin()

yield harness
harness.cleanup()


@pytest.mark.parametrize("relation_name", RELATION_NAMES)
@pytest.mark.parametrize("is_leader", [True, False])
def test_on_async_relation_broken(harness, is_leader, relation_name):
with (
patch("charm.PostgresqlOperatorCharm.update_config") as _update_config,
patch(
"relations.async_replication.PostgreSQLAsyncReplication._set_app_status"
) as _set_app_status,
patch("charm.Patroni.get_standby_leader") as _get_standby_leader,
patch(
"relations.async_replication.PostgreSQLAsyncReplication._on_async_relation_departed"
) as _on_async_relation_departed,
):
# Test before the peer relation is available.
with harness.hooks_disabled():
harness.set_leader(is_leader)
rel_id = harness.add_relation(relation_name, harness.charm.app.name)
harness.add_relation_unit(rel_id, harness.charm.unit.name)
harness.remove_relation(rel_id)
_get_standby_leader.assert_not_called()
_set_app_status.assert_not_called()

# Test the departing unit.
with harness.hooks_disabled():
peer_rel_id = harness.add_relation(PEER, harness.charm.app.name)
harness.update_relation_data(
peer_rel_id,
harness.charm.app.name,
{"promoted-cluster-counter": "1"},
)
harness.update_relation_data(
peer_rel_id,
harness.charm.unit.name,
{"departing": "True", "stopped": "True", "unit-promoted-cluster-counter": "1"},
)
rel_id = harness.add_relation(relation_name, harness.charm.app.name)
harness.add_relation_unit(rel_id, harness.charm.unit.name)
harness.remove_relation(rel_id)
# assert harness.get_relation_data(peer_rel_id, harness.charm.app.name) == {
# "promoted-cluster-counter": ("0" if is_leader else "0")}
assert harness.get_relation_data(peer_rel_id, harness.charm.unit.name) == {
"departing": "True",
"stopped": "True",
"unit-promoted-cluster-counter": "1",
}
_get_standby_leader.assert_not_called()
_set_app_status.assert_not_called()

# Test in a primary cluster.
with harness.hooks_disabled():
_get_standby_leader.return_value = None
harness.update_relation_data(
peer_rel_id,
harness.charm.unit.name,
{"departing": "", "stopped": "True", "unit-promoted-cluster-counter": "1"},
)
rel_id = harness.add_relation(relation_name, harness.charm.app.name)
harness.add_relation_unit(rel_id, harness.charm.unit.name)
harness.remove_relation(rel_id)
assert harness.get_relation_data(peer_rel_id, harness.charm.app.name) == (
{} if is_leader else {"promoted-cluster-counter": "1"}
)
assert harness.get_relation_data(peer_rel_id, harness.charm.unit.name) == {}
_get_standby_leader.assert_called_once()
_update_config.assert_called_once()

# Test in a standby cluster.
_update_config.reset_mock()
with harness.hooks_disabled():
_get_standby_leader.return_value = harness.charm.unit.name
harness.update_relation_data(
peer_rel_id,
harness.charm.unit.name,
{"stopped": "True", "unit-promoted-cluster-counter": "1"},
)
rel_id = harness.add_relation(relation_name, harness.charm.app.name)
harness.add_relation_unit(rel_id, harness.charm.unit.name)
harness.remove_relation(rel_id)
assert harness.get_relation_data(peer_rel_id, harness.charm.app.name) == {
"promoted-cluster-counter": ("0" if is_leader else "1")
}
assert harness.get_relation_data(peer_rel_id, harness.charm.unit.name) == {}
assert _set_app_status.call_count == (1 if is_leader else 0)
_update_config.assert_not_called()


@pytest.mark.parametrize("relation_name", RELATION_NAMES)
def test_on_async_relation_created(harness, relation_name):
with (
patch(
"relations.async_replication.PostgreSQLAsyncReplication._get_highest_promoted_cluster_counter_value",
side_effect=["0", "1"],
) as _get_highest_promoted_cluster_counter_value,
patch(
"relations.async_replication.PostgreSQLAsyncReplication._get_unit_ip",
return_value="1.1.1.1",
) as _get_unit_ip,
):
# Test in a standby cluster.
with harness.hooks_disabled():
peer_rel_id = harness.add_relation(PEER, harness.charm.app.name)
rel_id = harness.add_relation(relation_name, harness.charm.app.name)
assert harness.get_relation_data(rel_id, harness.charm.unit.name) == {
"unit-address": "1.1.1.1"
}
assert harness.get_relation_data(peer_rel_id, harness.charm.unit.name) == {}

# Test in a primary cluster.
with harness.hooks_disabled():
harness.update_relation_data(rel_id, harness.charm.unit.name, {"unit-address": ""})
harness.remove_relation(rel_id)
rel_id = harness.add_relation(relation_name, harness.charm.app.name)
assert harness.get_relation_data(rel_id, harness.charm.unit.name) == {
"unit-address": "1.1.1.1"
}
assert harness.get_relation_data(peer_rel_id, harness.charm.unit.name) == {
"unit-promoted-cluster-counter": "1"
}


@pytest.mark.parametrize("relation_name", RELATION_NAMES)
def test_on_async_relation_departed(harness, relation_name):
# Test the departing unit.
with harness.hooks_disabled():
peer_rel_id = harness.add_relation(PEER, harness.charm.app.name)
rel_id = harness.add_relation(relation_name, harness.charm.app.name)
harness.add_relation_unit(rel_id, harness.charm.unit.name)
harness.remove_relation_unit(rel_id, harness.charm.unit.name)
assert harness.get_relation_data(peer_rel_id, harness.charm.unit.name) == {"departing": "True"}

# Test the non-departing unit.
other_unit = f"{harness.charm.app.name}/1"
with harness.hooks_disabled():
harness.update_relation_data(peer_rel_id, harness.charm.unit.name, {"departing": ""})
harness.add_relation_unit(rel_id, other_unit)
harness.remove_relation_unit(rel_id, other_unit)
assert harness.get_relation_data(peer_rel_id, harness.charm.unit.name) == {}
Loading