Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache last updated location for a configurable number of seconds #6

Merged
merged 14 commits into from
Jun 13, 2024
Merged
1 change: 1 addition & 0 deletions docs/ENVIRONMENT.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ Kubernetes
- **PATRONI\_KUBERNETES\_PORTS**: (optional) if the Service object has the name for the port, the same name must appear in the Endpoint object, otherwise service won't work. For example, if your service is defined as ``{Kind: Service, spec: {ports: [{name: postgresql, port: 5432, targetPort: 5432}]}}``, then you have to set ``PATRONI_KUBERNETES_PORTS='[{"name": "postgresql", "port": 5432}]'`` and Patroni will use it for updating subsets of the leader Endpoint. This parameter is used only if `PATRONI_KUBERNETES_USE_ENDPOINTS` is set.
- **PATRONI\_KUBERNETES\_CACERT**: (optional) Specifies the file with the CA_BUNDLE file with certificates of trusted CAs to use while verifying Kubernetes API SSL certs. If not provided, patroni will use the value provided by the ServiceAccount secret.
- **PATRONI\_RETRIABLE\_HTTP\_CODES**: (optional) list of HTTP status codes from K8s API to retry on. By default Patroni is retrying on ``500``, ``503``, and ``504``, or if K8s API response has ``retry-after`` HTTP header.
- **PATRONI\_KUBERNETES\_XLOG\_CACHE\_TTL**: (optional) duration in seconds to retain the previous value of the member xlog position when updating the member pod metadata. Higher values reduce the frequency of pod metadata updates from Patroni, at the expense of having an outdated xlog position. The default value is ``0``, indicating that the system should always use the up-to-date position. Setting it to the value that is a multiple of ``loop_wait`` reduces the number of API server requests in the Kubernetes cluster.

Raft (deprecated)
-----------------
Expand Down
12 changes: 12 additions & 0 deletions fork.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
.. _fork:

Timescale patroni fork
======================


TS_1
-----

- Add ``xlog_cache_ttl`` parameter for the Kubernetes DCS

Setting it to multiple of ``loop_wait`` prevents frequent pod updates, reducing the load on the API server, at the expense of stale value of the xlog position in the member's metadata.
4 changes: 3 additions & 1 deletion patroni/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ def _get_auth(name: str, params: Collection[str] = _AUTH_ALLOWED_PARAMETERS[:2])
'SERVICE_TAGS', 'NAMESPACE', 'CONTEXT', 'USE_ENDPOINTS', 'SCOPE_LABEL', 'ROLE_LABEL',
'POD_IP', 'PORTS', 'LABELS', 'BYPASS_API_SERVICE', 'RETRIABLE_HTTP_CODES', 'KEY_PASSWORD',
'USE_SSL', 'SET_ACLS', 'GROUP', 'DATABASE', 'LEADER_LABEL_VALUE', 'FOLLOWER_LABEL_VALUE',
'STANDBY_LEADER_LABEL_VALUE', 'TMP_ROLE_LABEL', 'AUTH_DATA') and name:
'STANDBY_LEADER_LABEL_VALUE', 'TMP_ROLE_LABEL', 'AUTH_DATA', 'XLOG_CACHE_TTL') and name:
value = os.environ.pop(param)
if name == 'CITUS':
if suffix == 'GROUP':
Expand All @@ -665,6 +665,8 @@ def _get_auth(name: str, params: Collection[str] = _AUTH_ALLOWED_PARAMETERS[:2])
continue
elif suffix == 'PORT':
value = value and parse_int(value)
elif suffix == 'XLOG_CACHE_TTL':
value = value and parse_int(value, 's')
elif suffix in ('HOSTS', 'PORTS', 'CHECKS', 'SERVICE_TAGS', 'RETRIABLE_HTTP_CODES'):
value = value and _parse_list(value)
elif suffix in ('LABELS', 'SET_ACLS', 'AUTH_DATA'):
Expand Down
4 changes: 2 additions & 2 deletions patroni/ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2176,7 +2176,7 @@ def version(cluster_name: str, group: Optional[int], member_names: List[str]) ->
:param group: filter which Citus group we should get members from. Refer to the module note for more details.
:param member_names: filter which members we should get version information from.
"""
click.echo("patronictl version {0}".format(__version__))
click.echo("patronictl (timescale fork) version {0}".format(__version__))

if not cluster_name:
return
Expand All @@ -2192,7 +2192,7 @@ def version(cluster_name: str, group: Optional[int], member_names: List[str]) ->
version = data.get('patroni', {}).get('version')
pg_version = data.get('server_version')
pg_version_str = " PostgreSQL {0}".format(format_pg_version(pg_version)) if pg_version else ""
click.echo("{0}: Patroni {1}{2}".format(m.name, version, pg_version_str))
click.echo("{0}: Patroni (timescale fork) {1}{2}".format(m.name, version, pg_version_str))
except Exception as e:
click.echo("{0}: failed to get version: {1}".format(m.name, e))

Expand Down
2 changes: 1 addition & 1 deletion patroni/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def get_base_arg_parser() -> argparse.ArgumentParser:
from .version import __version__

parser = argparse.ArgumentParser()
parser.add_argument('--version', action='version', version='%(prog)s {0}'.format(__version__))
parser.add_argument('--version', action='version', version='%(prog)s (timescale fork) {0}'.format(__version__))
parser.add_argument('configfile', nargs='?', default='',
help='Patroni may also read the configuration from the {0} environment variable'
.format(Config.PATRONI_CONFIG_VARIABLE))
Expand Down
44 changes: 42 additions & 2 deletions patroni/dcs/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from ..exceptions import DCSError
from ..postgresql.mpp import AbstractMPP
from ..utils import deep_compare, iter_response_objects, keepalive_socket_options, \
Retry, RetryFailedError, tzutc, uri, USER_AGENT
parse_int, Retry, RetryFailedError, tzutc, uri, USER_AGENT
if TYPE_CHECKING: # pragma: no cover
from ..config import Config

Expand Down Expand Up @@ -758,6 +758,9 @@ def __init__(self, config: Dict[str, Any], mpp: AbstractMPP) -> None:
self._standby_leader_label_value = config.get('standby_leader_label_value', 'master')
self._tmp_role_label = config.get('tmp_role_label')
self._ca_certs = os.environ.get('PATRONI_KUBERNETES_CACERT', config.get('cacert')) or SERVICE_CERT_FILENAME
self._xlog_cache_ttl = 0
self._cached_xlog_location_modified_timestamp = None
self._cached_xlog_location = None
super(Kubernetes, self).__init__({**config, 'namespace': ''}, mpp)
if self._mpp.is_enabled():
self._labels[self._mpp.k8s_group_label] = str(self._mpp.group)
Expand Down Expand Up @@ -831,10 +834,14 @@ def reload_config(self, config: Union['Config', Dict[str, Any]]) -> None:
super(Kubernetes, self).reload_config(config)
if TYPE_CHECKING: # pragma: no cover
assert self._retry.deadline is not None

# we could be called with only Kubernetes part of the config (module init), or with the whole config
# during reload; make sure only kubernetes part of the config is fetched below.
kconfig = config.get('kubernetes') or config
self._api.configure_timeouts(self.loop_wait, self._retry.deadline, self.ttl)

# retriable_http_codes supposed to be either int, list of integers or comma-separated string with integers.
retriable_http_codes: Union[str, List[Union[str, int]]] = config.get('retriable_http_codes', [])
retriable_http_codes: Union[str, List[Union[str, int]]] = kconfig.get('retriable_http_codes', [])
if not isinstance(retriable_http_codes, list):
retriable_http_codes = [c.strip() for c in str(retriable_http_codes).split(',')]

Expand All @@ -843,6 +850,11 @@ def reload_config(self, config: Union['Config', Dict[str, Any]]) -> None:
except Exception as e:
logger.warning('Invalid value of retriable_http_codes = %s: %r', config['retriable_http_codes'], e)

# cache xlog location for the member, preventing pod update when xlog location is the only update for the pod
self._xlog_cache_ttl = parse_int(kconfig.get('xlog_cache_ttl', '0'), 's') or 0
if self._xlog_cache_ttl > 0:
logger.debug("set xlog_cache_ttl to %d", self._xlog_cache_ttl)

@staticmethod
def member(pod: K8sObject) -> Member:
annotations = pod.metadata.annotations or EMPTY_DICT
Expand Down Expand Up @@ -1302,6 +1314,13 @@ def _config_resource_version(self) -> Optional[str]:
def set_config_value(self, value: str, version: Optional[str] = None) -> bool:
return self.patch_or_create_config({self._CONFIG: value}, version, bool(self._config_resource_version), False)

def _get_cached_xlog_location(self) -> Tuple[Optional[str], Optional[int]]:
return self._cached_xlog_location, self._cached_xlog_location_modified_timestamp

def _set_cached_xlog_location(self, location: str) -> None:
self._cached_xlog_location = location
self._cached_xlog_location_modified_timestamp = int(time.time())

@catch_kubernetes_errors
def touch_member(self, data: Dict[str, Any]) -> bool:
cluster = self.cluster
Expand All @@ -1321,17 +1340,38 @@ def touch_member(self, data: Dict[str, Any]) -> bool:

member = cluster and cluster.get_member(self._name, fallback_to_leader=False)
pod_labels = member and member.data.pop('pod_labels', None)

replaced_xlog_location: Optional[str] = data.get('xlog_location', None)
alexeyklyukin marked this conversation as resolved.
Show resolved Hide resolved
cached_xlog_location, last_updated = self._get_cached_xlog_location()
now = int(time.time())
use_cached_xlog = False
if last_updated is not None and last_updated + self._xlog_cache_ttl > now:
if cached_xlog_location is not None and replaced_xlog_location is not None:
data['xlog_location'] = cached_xlog_location
use_cached_xlog = True
elif replaced_xlog_location is not None:
# location cache expired
self._set_cached_xlog_location(replaced_xlog_location)
ret = member and pod_labels is not None\
and all(pod_labels.get(k) == v for k, v in role_labels.items())\
and deep_compare(data, member.data)

if not ret:
# if we move forward with an update anyway, make sure to write the actual
# value for the xlog, and not the stale cached value.
if use_cached_xlog and replaced_xlog_location is not None:
self._set_cached_xlog_location(replaced_xlog_location)
data['xlog_location'] = replaced_xlog_location
metadata = {'namespace': self._namespace, 'name': self._name, 'labels': role_labels,
'annotations': {'status': json.dumps(data, separators=(',', ':'))}}
body = k8s_client.V1Pod(metadata=k8s_client.V1ObjectMeta(**metadata))
ret = self._api.patch_namespaced_pod(self._name, self._namespace, body)
if ret:
self._pods.set(self._name, ret)
elif use_cached_xlog and cached_xlog_location != replaced_xlog_location and last_updated is not None:
logger.debug("prevented pod update, keeping cached xlog value for up to %d seconds",
(last_updated + self._xlog_cache_ttl - now))

if self._should_create_config_service:
self._create_config_service()
return bool(ret)
Expand Down
1 change: 1 addition & 0 deletions patroni/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,7 @@ def validate_watchdog_mode(value: Any) -> None:
Optional("ports"): [{"name": str, "port": IntValidator(max=65535, expected_type=int, raise_assert=True)}],
Optional("cacert"): str,
Optional("retriable_http_codes"): Or(int, [int]),
Optional('xlog_cache_ttl'): IntValidator(min=0, max=3600, base_unit='s', raise_assert=True)
},
}),
Optional("citus"): {
Expand Down
2 changes: 1 addition & 1 deletion tests/test_ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ def test_edit_config(self):
@patch('patroni.ctl.request_patroni')
def test_version(self, mock_request):
result = self.runner.invoke(ctl, ['version'])
assert 'patronictl version' in result.output
assert 'patronictl (timescale fork) version' in result.output
mock_request.return_value.data = b'{"patroni":{"version":"1.2.3"},"server_version": 100001}'
result = self.runner.invoke(ctl, ['version', 'dummy'])
assert '1.2.3' in result.output
Expand Down
Loading