From 8bb173d641ced2c03ee72fb57ea9da41a7b8235e Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin <oleksii@timescale.com> Date: Wed, 22 May 2024 11:52:50 +0200 Subject: [PATCH 01/14] WIP: prevent location-only pod updates Prevent annotation updates when the only update is the pod xlog location. This reduces the load on the K8s API for the replicas, as we don't have to update the pod every loop_wait interval. --- patroni/dcs/kubernetes.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index 9590bb98f..a6ac317f6 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -1321,11 +1321,19 @@ def touch_member(self, data: Dict[str, Any]) -> bool: member = cluster and cluster.get_member(self._name, fallback_to_leader=False) pod_labels = member and member.data.pop('pod_labels', None) + # XXX: add a parameter here + saved_xlog_location = data.get('xlog_location') + if saved_xlog_location is not None and True: + # avoid updating if the only change is the xlog location + if member.data.get('xlog_location') is not None: + data['xlog_location'] = member.data['xlog_location'] ret = member and pod_labels is not None\ and all(pod_labels.get(k) == v for k, v in role_labels.items())\ and deep_compare(data, member.data) if not ret: + # we decided to update anyway, set back the xlog location + data['xlog_location'] = saved_xlog_location metadata = {'namespace': self._namespace, 'name': self._name, 'labels': role_labels, 'annotations': {'status': json.dumps(data, separators=(',', ':'))}} body = k8s_client.V1Pod(metadata=k8s_client.V1ObjectMeta(**metadata)) From acfc64905c42b69f9f3639d3e6c5dd6dcf9cbb4d Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin <oleksii@timescale.com> Date: Wed, 22 May 2024 12:25:50 +0200 Subject: [PATCH 02/14] Add a new parameter to control pod xlog updates Add prevent_xlog_position_only_pod_updates K8s parameter. --- patroni/dcs/kubernetes.py | 10 ++++++---- patroni/validator.py | 1 + 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index a6ac317f6..4245bcf60 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -24,7 +24,7 @@ from ..exceptions import DCSError from ..postgresql.mpp import AbstractMPP from ..utils import deep_compare, iter_response_objects, keepalive_socket_options, \ - Retry, RetryFailedError, tzutc, uri, USER_AGENT + parse_bool, Retry, RetryFailedError, tzutc, uri, USER_AGENT if TYPE_CHECKING: # pragma: no cover from ..config import Config @@ -758,6 +758,8 @@ def __init__(self, config: Dict[str, Any], mpp: AbstractMPP) -> None: self._standby_leader_label_value = config.get('standby_leader_label_value', 'master') self._tmp_role_label = config.get('tmp_role_label') self._ca_certs = os.environ.get('PATRONI_KUBERNETES_CACERT', config.get('cacert')) or SERVICE_CERT_FILENAME + self._prevent_xlog_position_only_pod_updates = bool( + parse_bool(config.get('prevent_xlog_position_only_pod_updates'))) super(Kubernetes, self).__init__({**config, 'namespace': ''}, mpp) if self._mpp.is_enabled(): self._labels[self._mpp.k8s_group_label] = str(self._mpp.group) @@ -1323,9 +1325,9 @@ def touch_member(self, data: Dict[str, Any]) -> bool: pod_labels = member and member.data.pop('pod_labels', None) # XXX: add a parameter here saved_xlog_location = data.get('xlog_location') - if saved_xlog_location is not None and True: + if self._prevent_xlog_position_only_pod_updates and saved_xlog_location is not None: # avoid updating if the only change is the xlog location - if member.data.get('xlog_location') is not None: + if member and member.data.get('xlog_location') is not None: data['xlog_location'] = member.data['xlog_location'] ret = member and pod_labels is not None\ and all(pod_labels.get(k) == v for k, v in role_labels.items())\ @@ -1333,7 +1335,7 @@ def touch_member(self, data: Dict[str, Any]) -> bool: if not ret: # we decided to update anyway, set back the xlog location - data['xlog_location'] = saved_xlog_location + data['xlog_location'] = saved_xlog_location metadata = {'namespace': self._namespace, 'name': self._name, 'labels': role_labels, 'annotations': {'status': json.dumps(data, separators=(',', ':'))}} body = k8s_client.V1Pod(metadata=k8s_client.V1ObjectMeta(**metadata)) diff --git a/patroni/validator.py b/patroni/validator.py index 233b05b72..30a59b249 100644 --- a/patroni/validator.py +++ b/patroni/validator.py @@ -1124,6 +1124,7 @@ def validate_watchdog_mode(value: Any) -> None: Optional("ports"): [{"name": str, "port": IntValidator(max=65535, expected_type=int, raise_assert=True)}], Optional("cacert"): str, Optional("retriable_http_codes"): Or(int, [int]), + Optional('prevent_xlog_position_only_pod_updates'): bool }, }), Optional("citus"): { From 0a90399007aed4cf1594c02de7029d9cbd9899fc Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin <oleksii@timescale.com> Date: Wed, 29 May 2024 10:20:09 +0200 Subject: [PATCH 03/14] Cache last updated location for a configurable number of seconds Instead of updating xlog location in K8s every loop_wait interval, use a newly introduced parameter "location_cache_ttl" to prevent location-only updates from happening oftener than the value of this parameter. It is set to 0 by default, so nothing will change until it is explicitly set. Also, the maximum value is set to 10min, to prevent xlog value to be too stale (although it is only used for monitoring endpoints). --- patroni/dcs/kubernetes.py | 37 ++++++++++++++++++++++++++----------- patroni/validator.py | 2 +- 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index 4245bcf60..c8aa1db5b 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -24,7 +24,7 @@ from ..exceptions import DCSError from ..postgresql.mpp import AbstractMPP from ..utils import deep_compare, iter_response_objects, keepalive_socket_options, \ - parse_bool, Retry, RetryFailedError, tzutc, uri, USER_AGENT + parse_int, Retry, RetryFailedError, tzutc, uri, USER_AGENT if TYPE_CHECKING: # pragma: no cover from ..config import Config @@ -758,8 +758,9 @@ def __init__(self, config: Dict[str, Any], mpp: AbstractMPP) -> None: self._standby_leader_label_value = config.get('standby_leader_label_value', 'master') self._tmp_role_label = config.get('tmp_role_label') self._ca_certs = os.environ.get('PATRONI_KUBERNETES_CACERT', config.get('cacert')) or SERVICE_CERT_FILENAME - self._prevent_xlog_position_only_pod_updates = bool( - parse_bool(config.get('prevent_xlog_position_only_pod_updates'))) + self._xlog_location_cache_ttl = parse_int(config.get('xlog_location_cache_ttl', '0'), 's') or 0 + self._cached_xlog_location_modified_timestamp = None + self._cached_xlog_location = None super(Kubernetes, self).__init__({**config, 'namespace': ''}, mpp) if self._mpp.is_enabled(): self._labels[self._mpp.k8s_group_label] = str(self._mpp.group) @@ -1304,6 +1305,13 @@ def _config_resource_version(self) -> Optional[str]: def set_config_value(self, value: str, version: Optional[str] = None) -> bool: return self.patch_or_create_config({self._CONFIG: value}, version, bool(self._config_resource_version), False) + def _get_cached_xlog_location(self) -> [Optional[str], Optional[float]]: + return self._cached_xlog_location, self._cached_xlog_location_modified_timestamp + + def _set_cached_xlog_location(self, location: str) -> None: + self._cached_xlog_location = location + self._cached_xlog_location_modified_timestamp = time.time() + @catch_kubernetes_errors def touch_member(self, data: Dict[str, Any]) -> bool: cluster = self.cluster @@ -1323,19 +1331,26 @@ def touch_member(self, data: Dict[str, Any]) -> bool: member = cluster and cluster.get_member(self._name, fallback_to_leader=False) pod_labels = member and member.data.pop('pod_labels', None) - # XXX: add a parameter here - saved_xlog_location = data.get('xlog_location') - if self._prevent_xlog_position_only_pod_updates and saved_xlog_location is not None: - # avoid updating if the only change is the xlog location - if member and member.data.get('xlog_location') is not None: - data['xlog_location'] = member.data['xlog_location'] + + replaced_xlog_location = data['xlog_location'] + cached_xlog_location, last_updated = self._get_cached_xlog_location() + if last_updated is not None and last_updated + self._xlog_location_cache_ttl > time.time(): + if cached_xlog_location is not None: + data['xlog_location'] = cached_xlog_location + else: + # location cache expired + self._set_cached_xlog_location(data['xlog_location']) + replaced_xlog_location = None ret = member and pod_labels is not None\ and all(pod_labels.get(k) == v for k, v in role_labels.items())\ and deep_compare(data, member.data) if not ret: - # we decided to update anyway, set back the xlog location - data['xlog_location'] = saved_xlog_location + # if we move forward with an update anyway, make sure to write the actual + # value for the xlog, and not the stale cached value. + if replaced_xlog_location is not None: + self._set_cached_xlog_location(replaced_xlog_location) + data['xlog_location'] = replaced_xlog_location metadata = {'namespace': self._namespace, 'name': self._name, 'labels': role_labels, 'annotations': {'status': json.dumps(data, separators=(',', ':'))}} body = k8s_client.V1Pod(metadata=k8s_client.V1ObjectMeta(**metadata)) diff --git a/patroni/validator.py b/patroni/validator.py index 30a59b249..c65bdcad8 100644 --- a/patroni/validator.py +++ b/patroni/validator.py @@ -1124,7 +1124,7 @@ def validate_watchdog_mode(value: Any) -> None: Optional("ports"): [{"name": str, "port": IntValidator(max=65535, expected_type=int, raise_assert=True)}], Optional("cacert"): str, Optional("retriable_http_codes"): Or(int, [int]), - Optional('prevent_xlog_position_only_pod_updates'): bool + Optional('location_cache_ttl'): IntValidator(min=0, max=600, base_unit='s', raise_assert=True) }, }), Optional("citus"): { From bbfba7060df1917448ddfc36f82ef06f1e332b13 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin <oleksii@timescale.com> Date: Wed, 29 May 2024 11:03:36 +0200 Subject: [PATCH 04/14] Attempt to make a typechecker happy --- patroni/dcs/kubernetes.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index c8aa1db5b..2bb3ed0d8 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -1332,14 +1332,14 @@ def touch_member(self, data: Dict[str, Any]) -> bool: member = cluster and cluster.get_member(self._name, fallback_to_leader=False) pod_labels = member and member.data.pop('pod_labels', None) - replaced_xlog_location = data['xlog_location'] + replaced_xlog_location: Optional[str] = data.get('xlog_location', None) cached_xlog_location, last_updated = self._get_cached_xlog_location() if last_updated is not None and last_updated + self._xlog_location_cache_ttl > time.time(): - if cached_xlog_location is not None: + if cached_xlog_location is not None and replaced_xlog_location is not None: data['xlog_location'] = cached_xlog_location - else: + elif replaced_xlog_location is not None: # location cache expired - self._set_cached_xlog_location(data['xlog_location']) + self._set_cached_xlog_location(replaced_xlog_location) replaced_xlog_location = None ret = member and pod_labels is not None\ and all(pod_labels.get(k) == v for k, v in role_labels.items())\ From 1d977c10c2db7deb5436d095928304027541fca6 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin <oleksii@timescale.com> Date: Wed, 29 May 2024 11:16:51 +0200 Subject: [PATCH 05/14] Correct the type annotation. --- patroni/dcs/kubernetes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index 2bb3ed0d8..3f62246ce 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -1305,7 +1305,7 @@ def _config_resource_version(self) -> Optional[str]: def set_config_value(self, value: str, version: Optional[str] = None) -> bool: return self.patch_or_create_config({self._CONFIG: value}, version, bool(self._config_resource_version), False) - def _get_cached_xlog_location(self) -> [Optional[str], Optional[float]]: + def _get_cached_xlog_location(self) -> Tuple[Optional[str], Optional[float]]: return self._cached_xlog_location, self._cached_xlog_location_modified_timestamp def _set_cached_xlog_location(self, location: str) -> None: From 460e8ccadb5b78de1fe8244cc3ec5f2418b5f227 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin <oleksii@timescale.com> Date: Sun, 2 Jun 2024 19:28:19 +0200 Subject: [PATCH 06/14] Deal with xlog location updates Kubernetes DCS is not consistent about config reloads: it sends the DCS-specific config during init, while global config is sent on reload. Make sure we read parameters from the kubernetes part of the configuration. Update xlog_cache_ttl during local configuration reload. --- patroni/dcs/kubernetes.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index 3f62246ce..180b02164 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -758,7 +758,7 @@ def __init__(self, config: Dict[str, Any], mpp: AbstractMPP) -> None: self._standby_leader_label_value = config.get('standby_leader_label_value', 'master') self._tmp_role_label = config.get('tmp_role_label') self._ca_certs = os.environ.get('PATRONI_KUBERNETES_CACERT', config.get('cacert')) or SERVICE_CERT_FILENAME - self._xlog_location_cache_ttl = parse_int(config.get('xlog_location_cache_ttl', '0'), 's') or 0 + self._xlog_cache_ttl = 0 self._cached_xlog_location_modified_timestamp = None self._cached_xlog_location = None super(Kubernetes, self).__init__({**config, 'namespace': ''}, mpp) @@ -834,10 +834,14 @@ def reload_config(self, config: Union['Config', Dict[str, Any]]) -> None: super(Kubernetes, self).reload_config(config) if TYPE_CHECKING: # pragma: no cover assert self._retry.deadline is not None + + # we could be called with only Kubernetes part of the config (module init), or with the whole config + # during reload; make sure only kubernetes part of the config is fetched below. + kconfig = config.get('kubernetes') or config self._api.configure_timeouts(self.loop_wait, self._retry.deadline, self.ttl) # retriable_http_codes supposed to be either int, list of integers or comma-separated string with integers. - retriable_http_codes: Union[str, List[Union[str, int]]] = config.get('retriable_http_codes', []) + retriable_http_codes: Union[str, List[Union[str, int]]] = kconfig.get('retriable_http_codes', []) if not isinstance(retriable_http_codes, list): retriable_http_codes = [c.strip() for c in str(retriable_http_codes).split(',')] @@ -846,6 +850,9 @@ def reload_config(self, config: Union['Config', Dict[str, Any]]) -> None: except Exception as e: logger.warning('Invalid value of retriable_http_codes = %s: %r', config['retriable_http_codes'], e) + # cache xlog location for the member, preventing pod update when xlog location is the only update for the pod + self._xlog_cache_ttl = parse_int(kconfig.get('xlog_cache_ttl', '0'), 's') or 0 + @staticmethod def member(pod: K8sObject) -> Member: annotations = pod.metadata.annotations or EMPTY_DICT @@ -1334,7 +1341,7 @@ def touch_member(self, data: Dict[str, Any]) -> bool: replaced_xlog_location: Optional[str] = data.get('xlog_location', None) cached_xlog_location, last_updated = self._get_cached_xlog_location() - if last_updated is not None and last_updated + self._xlog_location_cache_ttl > time.time(): + if last_updated is not None and last_updated + self._xlog_cache_ttl > time.time(): if cached_xlog_location is not None and replaced_xlog_location is not None: data['xlog_location'] = cached_xlog_location elif replaced_xlog_location is not None: From 2b3f5d1866de39675aec285581bf755cfb7dd924 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin <oleksii@timescale.com> Date: Tue, 4 Jun 2024 10:28:55 +0200 Subject: [PATCH 07/14] Fix the validator and rise the upper limit --- patroni/validator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/patroni/validator.py b/patroni/validator.py index c65bdcad8..a3bf53eb9 100644 --- a/patroni/validator.py +++ b/patroni/validator.py @@ -1124,7 +1124,7 @@ def validate_watchdog_mode(value: Any) -> None: Optional("ports"): [{"name": str, "port": IntValidator(max=65535, expected_type=int, raise_assert=True)}], Optional("cacert"): str, Optional("retriable_http_codes"): Or(int, [int]), - Optional('location_cache_ttl'): IntValidator(min=0, max=600, base_unit='s', raise_assert=True) + Optional('xlog_cache_ttl'): IntValidator(min=0, max=1200, base_unit='s', raise_assert=True) }, }), Optional("citus"): { From ea5eb2c2e0aafcfcd950303c8423aae6b1ae5b63 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin <oleksii@timescale.com> Date: Tue, 4 Jun 2024 15:26:55 +0200 Subject: [PATCH 08/14] Change the version Reflect we are running a fork --- patroni/ctl.py | 4 ++-- patroni/daemon.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/patroni/ctl.py b/patroni/ctl.py index 4e05ab8d5..e797267c9 100644 --- a/patroni/ctl.py +++ b/patroni/ctl.py @@ -2176,7 +2176,7 @@ def version(cluster_name: str, group: Optional[int], member_names: List[str]) -> :param group: filter which Citus group we should get members from. Refer to the module note for more details. :param member_names: filter which members we should get version information from. """ - click.echo("patronictl version {0}".format(__version__)) + click.echo("patronictl (timescale fork) version {0}".format(__version__)) if not cluster_name: return @@ -2192,7 +2192,7 @@ def version(cluster_name: str, group: Optional[int], member_names: List[str]) -> version = data.get('patroni', {}).get('version') pg_version = data.get('server_version') pg_version_str = " PostgreSQL {0}".format(format_pg_version(pg_version)) if pg_version else "" - click.echo("{0}: Patroni {1}{2}".format(m.name, version, pg_version_str)) + click.echo("{0}: Patroni (timescale fork) {1}{2}".format(m.name, version, pg_version_str)) except Exception as e: click.echo("{0}: failed to get version: {1}".format(m.name, e)) diff --git a/patroni/daemon.py b/patroni/daemon.py index e160ce46f..a5b885dbe 100644 --- a/patroni/daemon.py +++ b/patroni/daemon.py @@ -27,7 +27,7 @@ def get_base_arg_parser() -> argparse.ArgumentParser: from .version import __version__ parser = argparse.ArgumentParser() - parser.add_argument('--version', action='version', version='%(prog)s {0}'.format(__version__)) + parser.add_argument('--version', action='version', version='%(prog)s (timescale fork) {0}'.format(__version__)) parser.add_argument('configfile', nargs='?', default='', help='Patroni may also read the configuration from the {0} environment variable' .format(Config.PATRONI_CONFIG_VARIABLE)) From ffaa34f4ac3a614dcde2b7116b90c3c173319e98 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin <oleksii@timescale.com> Date: Wed, 5 Jun 2024 19:15:14 +0200 Subject: [PATCH 09/14] Documentation-only update --- docs/ENVIRONMENT.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/ENVIRONMENT.rst b/docs/ENVIRONMENT.rst index 4b45798fc..71b9a177d 100644 --- a/docs/ENVIRONMENT.rst +++ b/docs/ENVIRONMENT.rst @@ -122,6 +122,7 @@ Kubernetes - **PATRONI\_KUBERNETES\_PORTS**: (optional) if the Service object has the name for the port, the same name must appear in the Endpoint object, otherwise service won't work. For example, if your service is defined as ``{Kind: Service, spec: {ports: [{name: postgresql, port: 5432, targetPort: 5432}]}}``, then you have to set ``PATRONI_KUBERNETES_PORTS='[{"name": "postgresql", "port": 5432}]'`` and Patroni will use it for updating subsets of the leader Endpoint. This parameter is used only if `PATRONI_KUBERNETES_USE_ENDPOINTS` is set. - **PATRONI\_KUBERNETES\_CACERT**: (optional) Specifies the file with the CA_BUNDLE file with certificates of trusted CAs to use while verifying Kubernetes API SSL certs. If not provided, patroni will use the value provided by the ServiceAccount secret. - **PATRONI\_RETRIABLE\_HTTP\_CODES**: (optional) list of HTTP status codes from K8s API to retry on. By default Patroni is retrying on ``500``, ``503``, and ``504``, or if K8s API response has ``retry-after`` HTTP header. +- **PATRONI\_KUBERNETES\_XLOG\_CACHE\_TTL**: (optional) duration in seconds to retain the previous value of the member xlog position when updating the member pod metadata. Higher values reduce the frequency of pod metadata updates from Patroni, at the expense of having an outdated xlog position. The default value is ``0``, indicating that the system should always use the up-to-date position. Setting it to the value that is a multiple of ``loop_wait`` reduces the number of API server requests in the Kubernetes cluster. Raft (deprecated) ----------------- From 5d124a3650c2b8a450342c224ab47ac46cdb28ef Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin <oleksii@timescale.com> Date: Wed, 5 Jun 2024 23:11:54 +0200 Subject: [PATCH 10/14] Docs update, debug logs Also add XLOG_CACHE_TTL env variable. --- fork.rst | 12 ++++++++++++ patroni/config.py | 4 +++- patroni/dcs/kubernetes.py | 9 ++++++++- patroni/validator.py | 2 +- tests/test_ctl.py | 2 +- 5 files changed, 25 insertions(+), 4 deletions(-) create mode 100644 fork.rst diff --git a/fork.rst b/fork.rst new file mode 100644 index 000000000..4e3a85308 --- /dev/null +++ b/fork.rst @@ -0,0 +1,12 @@ +.. _fork: + +Timescale patroni fork +====================== + + +TS_1 +----- + +- Add ``xlog_cache_ttl`` parameter for the Kubernetes DCS + + Setting it to multiple of ``loop_wait`` prevents frequent pod updates, reducing the load on the API server, at the expense of stale value of the xlog position in the member's metadata. diff --git a/patroni/config.py b/patroni/config.py index 662047f87..25d57d026 100644 --- a/patroni/config.py +++ b/patroni/config.py @@ -656,7 +656,7 @@ def _get_auth(name: str, params: Collection[str] = _AUTH_ALLOWED_PARAMETERS[:2]) 'SERVICE_TAGS', 'NAMESPACE', 'CONTEXT', 'USE_ENDPOINTS', 'SCOPE_LABEL', 'ROLE_LABEL', 'POD_IP', 'PORTS', 'LABELS', 'BYPASS_API_SERVICE', 'RETRIABLE_HTTP_CODES', 'KEY_PASSWORD', 'USE_SSL', 'SET_ACLS', 'GROUP', 'DATABASE', 'LEADER_LABEL_VALUE', 'FOLLOWER_LABEL_VALUE', - 'STANDBY_LEADER_LABEL_VALUE', 'TMP_ROLE_LABEL', 'AUTH_DATA') and name: + 'STANDBY_LEADER_LABEL_VALUE', 'TMP_ROLE_LABEL', 'AUTH_DATA', 'XLOG_CACHE_TTL') and name: value = os.environ.pop(param) if name == 'CITUS': if suffix == 'GROUP': @@ -665,6 +665,8 @@ def _get_auth(name: str, params: Collection[str] = _AUTH_ALLOWED_PARAMETERS[:2]) continue elif suffix == 'PORT': value = value and parse_int(value) + elif suffix == 'XLOG_CACHE_TTL': + value = value and parse_int(value, 's') elif suffix in ('HOSTS', 'PORTS', 'CHECKS', 'SERVICE_TAGS', 'RETRIABLE_HTTP_CODES'): value = value and _parse_list(value) elif suffix in ('LABELS', 'SET_ACLS', 'AUTH_DATA'): diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index 180b02164..22f4bc85e 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -852,6 +852,8 @@ def reload_config(self, config: Union['Config', Dict[str, Any]]) -> None: # cache xlog location for the member, preventing pod update when xlog location is the only update for the pod self._xlog_cache_ttl = parse_int(kconfig.get('xlog_cache_ttl', '0'), 's') or 0 + if self._xlog_cache_ttl is not None: + logger.debug("set xlog_cache_ttl to %d", self._xlog_cache_ttl) @staticmethod def member(pod: K8sObject) -> Member: @@ -1341,7 +1343,8 @@ def touch_member(self, data: Dict[str, Any]) -> bool: replaced_xlog_location: Optional[str] = data.get('xlog_location', None) cached_xlog_location, last_updated = self._get_cached_xlog_location() - if last_updated is not None and last_updated + self._xlog_cache_ttl > time.time(): + now = time.time() + if last_updated is not None and last_updated + self._xlog_cache_ttl > now: if cached_xlog_location is not None and replaced_xlog_location is not None: data['xlog_location'] = cached_xlog_location elif replaced_xlog_location is not None: @@ -1364,6 +1367,10 @@ def touch_member(self, data: Dict[str, Any]) -> bool: ret = self._api.patch_namespaced_pod(self._name, self._namespace, body) if ret: self._pods.set(self._name, ret) + elif cached_xlog_location != replaced_xlog_location and last_updated is not None: + logger.debug("prevented pod update, keeping cached xlog value for up to %d seconds", + (last_updated + self._xlog_cache_ttl - now)) + if self._should_create_config_service: self._create_config_service() return bool(ret) diff --git a/patroni/validator.py b/patroni/validator.py index a3bf53eb9..58e000d2f 100644 --- a/patroni/validator.py +++ b/patroni/validator.py @@ -1124,7 +1124,7 @@ def validate_watchdog_mode(value: Any) -> None: Optional("ports"): [{"name": str, "port": IntValidator(max=65535, expected_type=int, raise_assert=True)}], Optional("cacert"): str, Optional("retriable_http_codes"): Or(int, [int]), - Optional('xlog_cache_ttl'): IntValidator(min=0, max=1200, base_unit='s', raise_assert=True) + Optional('xlog_cache_ttl'): IntValidator(min=0, max=3600, base_unit='s', raise_assert=True) }, }), Optional("citus"): { diff --git a/tests/test_ctl.py b/tests/test_ctl.py index 42d78214f..7613b1292 100644 --- a/tests/test_ctl.py +++ b/tests/test_ctl.py @@ -705,7 +705,7 @@ def test_edit_config(self): @patch('patroni.ctl.request_patroni') def test_version(self, mock_request): result = self.runner.invoke(ctl, ['version']) - assert 'patronictl version' in result.output + assert 'patronictl (timescale fork) version' in result.output mock_request.return_value.data = b'{"patroni":{"version":"1.2.3"},"server_version": 100001}' result = self.runner.invoke(ctl, ['version', 'dummy']) assert '1.2.3' in result.output From a9267345da503dfe6e41579ddbd704fc454487cb Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin <oleksii@timescale.com> Date: Thu, 6 Jun 2024 10:30:01 +0200 Subject: [PATCH 11/14] Make pyright happy --- patroni/dcs/kubernetes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index 22f4bc85e..408543e71 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -852,7 +852,7 @@ def reload_config(self, config: Union['Config', Dict[str, Any]]) -> None: # cache xlog location for the member, preventing pod update when xlog location is the only update for the pod self._xlog_cache_ttl = parse_int(kconfig.get('xlog_cache_ttl', '0'), 's') or 0 - if self._xlog_cache_ttl is not None: + if self._xlog_cache_ttl > 0: logger.debug("set xlog_cache_ttl to %d", self._xlog_cache_ttl) @staticmethod From 1851cfce0626f15f3136aa514ac32294370d170b Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin <oleksii@timescale.com> Date: Thu, 6 Jun 2024 13:56:40 +0200 Subject: [PATCH 12/14] Refine debug logging We used to rely on cached_xlog_location and replaced_xlog_location when showing a debug message about pod update prevention, but that produced false positives, i.e. ``` 2024-06-06 11:34:36,308 DEBUG: prevented pod update, keeping cached xlog value for up to -1 seconds ``` Make sure we only show this message when we actually used cached xlog, no updates happen, but they should have happened, since cached value and received member value do not match. --- patroni/dcs/kubernetes.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index 408543e71..b3547085d 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -1344,13 +1344,14 @@ def touch_member(self, data: Dict[str, Any]) -> bool: replaced_xlog_location: Optional[str] = data.get('xlog_location', None) cached_xlog_location, last_updated = self._get_cached_xlog_location() now = time.time() + use_cached_xlog = False if last_updated is not None and last_updated + self._xlog_cache_ttl > now: if cached_xlog_location is not None and replaced_xlog_location is not None: data['xlog_location'] = cached_xlog_location + use_cached_xlog = True elif replaced_xlog_location is not None: # location cache expired self._set_cached_xlog_location(replaced_xlog_location) - replaced_xlog_location = None ret = member and pod_labels is not None\ and all(pod_labels.get(k) == v for k, v in role_labels.items())\ and deep_compare(data, member.data) @@ -1358,7 +1359,7 @@ def touch_member(self, data: Dict[str, Any]) -> bool: if not ret: # if we move forward with an update anyway, make sure to write the actual # value for the xlog, and not the stale cached value. - if replaced_xlog_location is not None: + if use_cached_xlog: self._set_cached_xlog_location(replaced_xlog_location) data['xlog_location'] = replaced_xlog_location metadata = {'namespace': self._namespace, 'name': self._name, 'labels': role_labels, @@ -1367,7 +1368,7 @@ def touch_member(self, data: Dict[str, Any]) -> bool: ret = self._api.patch_namespaced_pod(self._name, self._namespace, body) if ret: self._pods.set(self._name, ret) - elif cached_xlog_location != replaced_xlog_location and last_updated is not None: + elif use_cached_xlog and cached_xlog_location != replaced_xlog_location: logger.debug("prevented pod update, keeping cached xlog value for up to %d seconds", (last_updated + self._xlog_cache_ttl - now)) From 09d5ffcbc6f6c866666c7d5f7a5a86b262557c5c Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin <oleksii@timescale.com> Date: Thu, 6 Jun 2024 14:04:20 +0200 Subject: [PATCH 13/14] Make pyright happy (again) --- patroni/dcs/kubernetes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index b3547085d..d0439bc0e 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -1359,7 +1359,7 @@ def touch_member(self, data: Dict[str, Any]) -> bool: if not ret: # if we move forward with an update anyway, make sure to write the actual # value for the xlog, and not the stale cached value. - if use_cached_xlog: + if use_cached_xlog and replaced_xlog_location is not None: self._set_cached_xlog_location(replaced_xlog_location) data['xlog_location'] = replaced_xlog_location metadata = {'namespace': self._namespace, 'name': self._name, 'labels': role_labels, @@ -1368,7 +1368,7 @@ def touch_member(self, data: Dict[str, Any]) -> bool: ret = self._api.patch_namespaced_pod(self._name, self._namespace, body) if ret: self._pods.set(self._name, ret) - elif use_cached_xlog and cached_xlog_location != replaced_xlog_location: + elif use_cached_xlog and cached_xlog_location != replaced_xlog_location and last_updated is not None: logger.debug("prevented pod update, keeping cached xlog value for up to %d seconds", (last_updated + self._xlog_cache_ttl - now)) From 68d4f7e2a1a516868dbcf60246035e9af27c0a10 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin <oleksii@timescale.com> Date: Wed, 12 Jun 2024 17:14:12 +0200 Subject: [PATCH 14/14] Use integer values for timestamps Prevents bogus "keeping cahcned xlog value for up to 0 seconds" messages --- patroni/dcs/kubernetes.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index d0439bc0e..a4018d540 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -1314,12 +1314,12 @@ def _config_resource_version(self) -> Optional[str]: def set_config_value(self, value: str, version: Optional[str] = None) -> bool: return self.patch_or_create_config({self._CONFIG: value}, version, bool(self._config_resource_version), False) - def _get_cached_xlog_location(self) -> Tuple[Optional[str], Optional[float]]: + def _get_cached_xlog_location(self) -> Tuple[Optional[str], Optional[int]]: return self._cached_xlog_location, self._cached_xlog_location_modified_timestamp def _set_cached_xlog_location(self, location: str) -> None: self._cached_xlog_location = location - self._cached_xlog_location_modified_timestamp = time.time() + self._cached_xlog_location_modified_timestamp = int(time.time()) @catch_kubernetes_errors def touch_member(self, data: Dict[str, Any]) -> bool: @@ -1343,7 +1343,7 @@ def touch_member(self, data: Dict[str, Any]) -> bool: replaced_xlog_location: Optional[str] = data.get('xlog_location', None) cached_xlog_location, last_updated = self._get_cached_xlog_location() - now = time.time() + now = int(time.time()) use_cached_xlog = False if last_updated is not None and last_updated + self._xlog_cache_ttl > now: if cached_xlog_location is not None and replaced_xlog_location is not None: