Bug#1109865: unblock: patroni/4.0.6-1
Package: release.debian.org
Severity: normal
User: release.debian.org@packages.debian.org
Usertags: unblock
Please unblock package patroni
[ Reason ]
This is a stable bugfix-only release that should be in trixie. It has baked in
unstable and on apt.postgresql.org long enough.
This would make trixie release with the current 4.0.x point release of
Patroni.
[ Tests ]
Patroni has extensive autopkgtests that ran successfully on all release
architectures, AFAICT.
Also, all/most upstream changes have their own tests added.
[ Risks ]
Minimal risk of regressions. There have been no follow-up commits in the stable
branch (https://github.com/patroni/patroni/commits/REL_4_0/) nor obviously any
releases, so nothing appears to be broken by 4.0.6.
I have reviewed all changes and approve them.
unblock patroni/4.0.6-1
-- System Information:
Debian Release: 10.13
APT prefers oldoldstable-updates
APT policy: (500, 'oldoldstable-updates'), (500, 'oldoldstable')
Architecture: amd64 (x86_64)
Foreign Architectures: i386
Kernel: Linux 4.19.0-22-amd64 (SMP w/2 CPU cores)
Kernel taint flags: TAINT_UNSIGNED_MODULE
Locale: LANG=en_US.UTF-8, LC_CTYPE=en_US.UTF-8 (charmap=UTF-8), LANGUAGE=en_US:en (charmap=UTF-8)
Shell: /bin/sh linked to /bin/dash
Init: systemd (via /run/systemd/system)
LSM: AppArmor: enabled
diff -Nru patroni-4.0.5/debian/changelog patroni-4.0.6/debian/changelog
--- patroni-4.0.5/debian/changelog 2025-03-18 11:01:27.000000000 +0100
+++ patroni-4.0.6/debian/changelog 2025-06-08 08:52:19.000000000 +0200
@@ -1,3 +1,10 @@
+patroni (4.0.6-1) unstable; urgency=medium
+
+ * New upstream release.
+ * debian/patches/fix_conffile_perms.patch: Removed, included upstream.
+
+ -- Michael Banck <mbanck@debian.org> Sun, 08 Jun 2025 08:52:19 +0200
+
patroni (4.0.5-1) unstable; urgency=medium
* New upstream release.
diff -Nru patroni-4.0.5/debian/patches/fix_conffile_perms.patch patroni-4.0.6/debian/patches/fix_conffile_perms.patch
--- patroni-4.0.5/debian/patches/fix_conffile_perms.patch 2025-03-15 12:01:02.000000000 +0100
+++ patroni-4.0.6/debian/patches/fix_conffile_perms.patch 1970-01-01 01:00:00.000000000 +0100
@@ -1,44 +0,0 @@
-From a3c772dfc9642407a35ee4b7559c03e938671349 Mon Sep 17 00:00:00 2001
-From: Michael Banck <michael.banck@credativ.de>
-Date: Fri, 14 Mar 2025 12:21:25 +0100
-Subject: [PATCH] Fix permissions of out-of-PGDATA created postgresql.conf.
- (#3308)
-
-Since 01d07f86c, the permissions of postgresql.conf created in PGDATA was
-explicitly set. However, the umask of the Patroni process was adjusted as well
-and as a result of this, Patroni would write postgresql.conf with 600
-permissions if the configuration files are outside PGDATA.
-
-Fix this by using the original umask as mode for files created outside PGDATA.
-
-Fixes: #3302
----
- patroni/postgresql/config.py | 7 +++++--
- 1 file changed, 5 insertions(+), 2 deletions(-)
-
-Index: patroni/patroni/postgresql/config.py
-===================================================================
---- patroni.orig/patroni/postgresql/config.py
-+++ patroni/patroni/postgresql/config.py
-@@ -477,16 +477,19 @@ class ConfigHandler(object):
- return configuration
-
- def set_file_permissions(self, filename: str) -> None:
-- """Set permissions of file *filename* according to the expected permissions if it resides under PGDATA.
-+ """Set permissions of file *filename* according to the expected permissions.
-
- .. note::
-- Do nothing if the file is not under PGDATA.
-+ Use original umask if the file is not under PGDATA, use PGDATA
-+ permissions otherwise.
-
- :param filename: path to a file which permissions might need to be adjusted.
- """
- if is_subpath(self._postgresql.data_dir, filename):
- pg_perm.set_permissions_from_data_directory(self._postgresql.data_dir)
- os.chmod(filename, pg_perm.file_create_mode)
-+ else:
-+ os.chmod(filename, 0o666 & ~pg_perm.orig_umask)
-
- @contextmanager
- def config_writer(self, filename: str) -> Iterator[ConfigWriter]:
diff -Nru patroni-4.0.5/debian/patches/series patroni-4.0.6/debian/patches/series
--- patroni-4.0.5/debian/patches/series 2025-03-17 23:26:15.000000000 +0100
+++ patroni-4.0.6/debian/patches/series 2025-06-07 16:15:00.000000000 +0200
@@ -6,4 +6,3 @@
disable_sphinx_github_style.patch
replslot-cluster-type-attribute.patch
reproducible_docs.patch
-fix_conffile_perms.patch
diff -Nru patroni-4.0.5/docs/patroni_configuration.rst patroni-4.0.6/docs/patroni_configuration.rst
--- patroni-4.0.5/docs/patroni_configuration.rst 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/docs/patroni_configuration.rst 2025-06-06 19:27:48.000000000 +0200
@@ -38,21 +38,22 @@
PostgreSQL parameters controlled by Patroni
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-Some of the PostgreSQL parameters **must hold the same values on the primary and the replicas**. For those, **values set either in the local patroni configuration files or via the environment variables take no effect**. To alter or set their values one must change the shared configuration in the DCS. Below is the actual list of such parameters together with the default values:
+Some of the PostgreSQL parameters **must hold the same values on the primary and the replicas**. For those, **values set either in the local patroni configuration files or via the environment variables take no effect**. To alter or set their values one must change the shared configuration in the DCS. Below is the actual list of such parameters together with the default and minimal values:
-- **max_connections**: 100
-- **max_locks_per_transaction**: 64
-- **max_worker_processes**: 8
-- **max_prepared_transactions**: 0
-- **wal_level**: hot_standby
-- **track_commit_timestamp**: off
+- **max_connections**: default value 100, minimal value 25
+- **max_locks_per_transaction**: default value 64, minimal value 32
+- **max_worker_processes**: default value 8, minimal value 2
+- **max_prepared_transactions**: default value 0, minimal value 0
+- **wal_level**: default value hot_standby, accepted values: hot_standby, replica, logical
+- **track_commit_timestamp**: default value off
For the parameters below, PostgreSQL does not require equal values among the primary and all the replicas. However, considering the possibility of a replica to become the primary at any time, it doesn't really make sense to set them differently; therefore, **Patroni restricts setting their values to the** :ref:`dynamic configuration <dynamic_configuration>`.
-- **max_wal_senders**: 10
-- **max_replication_slots**: 10
-- **wal_keep_segments**: 8
-- **wal_keep_size**: 128MB
+- **max_wal_senders**: default value 10, minimal value 3
+- **max_replication_slots**: default value 10, minimal value 4
+- **wal_keep_segments**: default value 8, minimal value 1
+- **wal_keep_size**: default value 128MB, minimal value 16MB
+- **wal_log_hints**: on
These parameters are validated to ensure they are sane, or meet a minimum value.
@@ -63,7 +64,7 @@
- **cluster_name** - is set either from ``scope`` or from ``PATRONI_SCOPE`` environment variable
- **hot_standby: on**
-To be on the safe side parameters from the above lists are not written into ``postgresql.conf``, but passed as a list of arguments to the ``pg_ctl start`` which gives them the highest precedence, even above `ALTER SYSTEM <https://www.postgresql.org/docs/current/static/sql-altersystem.html>`__
+To be on the safe side parameters from the above lists are written into ``postgresql.conf``, and passed as a list of arguments to the ``postgres`` which gives them the highest precedence (except ``wal_keep_segments`` and ``wal_keep_size``), even above `ALTER SYSTEM <https://www.postgresql.org/docs/current/static/sql-altersystem.html>`__
There also are some parameters like **postgresql.listen**, **postgresql.data_dir** that **can be set only locally**, i.e. in the Patroni :ref:`config file <yaml_configuration>` or via :ref:`configuration <environment>` variable. In most cases the local configuration will override the dynamic configuration.
diff -Nru patroni-4.0.5/docs/releases.rst patroni-4.0.6/docs/releases.rst
--- patroni-4.0.5/docs/releases.rst 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/docs/releases.rst 2025-06-06 19:27:48.000000000 +0200
@@ -3,6 +3,66 @@
Release notes
=============
+Version 4.0.6
+-------------
+
+Released 2025-06-06
+
+**Bugfixes**
+
+- Fix bug in failover from a leader with a higher priority (Alexander Kukushkin)
+
+ Make sure Patroni ignores the former leader with higher priority when it reports the same ``LSN`` as the current node.
+
+- Fix permissions for the ``postgresql.conf`` file created outside of ``PGDATA`` (Michael Banck)
+
+ Respect the system-wide umask value when creating the ``postgresql.conf`` file outside of the ``PGDATA`` directory.
+
+- Fix bug with switchover in ``synchronous_mode=quorum`` (Alexander Kukushkin)
+
+ Do not check quorum requirements when a candidate is specified.
+
+- Ignore stale Etcd nodes by comparing cluster term (Alexander Kukushkin)
+
+ Memorize the last known "raft_term" of the Etcd cluster, and when executing client requests, compare it with the "raft_term" reported by an Etcd node.
+
+- Update PostgreSQL configuration files on ``SIGHUP`` (Alexander Kukushkin)
+
+ Previously, Patroni was only replacing PostgreSQL configuration files if a change in global or local configuration was detected.
+
+- Properly handle ``Unavailable`` exception raised by ``etcd3`` (Alexander Kukushkin)
+
+ Patroni used to retry such requests on the same ``etcd3`` node, while switching to another node is a better strategy.
+
+- Improve ``etcd3`` lease handling (Alexander Kukushkin)
+
+ Make sure Patroni refreshes the ``etcd3`` lease at least once per HA loop.
+
+- Recheck annotations on 409 status code when attempting to acquire leader lock (Alexander Kukushkin)
+
+ Implement the same behavior as was done for the leader object read in Patroni version 4.0.3.
+
+- Consider ``replay_lsn`` when advancing slots (Polina Bungina)
+
+ Do not try to advance slots on replicas past the ``replay_lsn``. Additionally, advance the slot to the ``replay_lsn`` position if it is already past the ``confirmed_flush_lsn`` of this slot on the replica but the replica has still not replayed the actual ``LSN`` at which this slot is on the primary.
+
+- Make sure ``CHECKPOINT`` is executed after promote (Alexander Kukushkin)
+
+ It was possible that checkpoint task wasn't reset on demote because ``CHECKPOINT`` wasn't yet finished. This resulted in using a stale ``result`` when the next promote is triggered.
+
+- Avoid running "offline" demotion concurrently (Alexander Kukushkin)
+
+ In case of a slow shutdown, it might happen that the next heartbeat loop hits the DCS error handling method again, resulting in ``AsyncExecutor is busy, demoting from the main thread`` warning and starting offline demotion again.
+
+- Normalize the ``data_dir`` value before renaming the data directory on initialization failure (Waynerv)
+
+ Prevent a trailing slash in the ``data_dir`` parameter value from breaking the renaming process after an initialization failure.
+
+- Check that ``synchronous_standby_names`` contains the expected value (Alexander Kukushkin)
+
+ Previously, the mechanism implementing the state machine for non-quorum synchronous replication didn't check the actual value of ``synchronous_standby_names``, what resulted in a stale value of ``synchronous_standby_names`` being used when ``pg_stat_replication`` is a subset of ``synchronous_standby_names``.
+
+
Version 4.0.5
-------------
diff -Nru patroni-4.0.5/docs/replica_bootstrap.rst patroni-4.0.6/docs/replica_bootstrap.rst
--- patroni-4.0.5/docs/replica_bootstrap.rst 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/docs/replica_bootstrap.rst 2025-06-06 19:27:48.000000000 +0200
@@ -52,7 +52,7 @@
If a ``recovery_conf`` block is defined in the same section as the custom bootstrap method, Patroni will generate a
``recovery.conf`` before starting the newly bootstrapped instance (or set the recovery settings on Postgres configuration if
running PostgreSQL >= 12).
-Typically, such recovery configuration should contain at least one of the ``recovery_target_*`` parameters, together with the ``recovery_target_timeline`` set to ``promote``.
+Typically, such recovery configuration should contain at least one of the ``recovery_target_*`` parameters, together with the ``recovery_target_action`` set to ``promote``.
If ``keep_existing_recovery_conf`` is defined and set to ``True``, Patroni will not remove the existing ``recovery.conf`` file if it exists (PostgreSQL <= 11).
Similarly, in that case Patroni will not remove the existing ``recovery.signal`` or ``standby.signal`` if either exists, nor will it override the configured recovery settings (PostgreSQL >= 12).
diff -Nru patroni-4.0.5/.github/workflows/install_deps.py patroni-4.0.6/.github/workflows/install_deps.py
--- patroni-4.0.5/.github/workflows/install_deps.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/.github/workflows/install_deps.py 2025-06-06 19:27:48.000000000 +0200
@@ -27,12 +27,17 @@
requirements += ['psycopg[binary]'] if sys.version_info >= (3, 8, 0) and\
(sys.platform != 'darwin' or what == 'etcd3') else ['psycopg2-binary==2.9.9'
if sys.platform == 'darwin' else 'psycopg2-binary']
+
+ from pip._vendor.distlib.markers import evaluator, DEFAULT_CONTEXT
+ from pip._vendor.distlib.util import parse_requirement
+
for r in read('requirements.txt').split('\n'):
- r = r.strip()
- if r != '':
- extras = {e for e, v in EXTRAS_REQUIRE.items() if v and any(r.startswith(x) for x in v)}
- if not extras or what == 'all' or what in extras:
- requirements.append(r)
+ r = parse_requirement(r)
+ if not r or r.marker and not evaluator.evaluate(r.marker, DEFAULT_CONTEXT):
+ continue
+ extras = {e for e, v in EXTRAS_REQUIRE.items() if v and any(r.requirement.startswith(x) for x in v)}
+ if not extras or what == 'all' or what in extras:
+ requirements.append(r.requirement)
return subprocess.call([sys.executable, '-m', 'pip', 'install'] + requirements)
diff -Nru patroni-4.0.5/.github/workflows/release.yaml patroni-4.0.6/.github/workflows/release.yaml
--- patroni-4.0.5/.github/workflows/release.yaml 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/.github/workflows/release.yaml 2025-06-06 19:27:48.000000000 +0200
@@ -34,10 +34,10 @@
- name: Publish distribution to Test PyPI
if: github.event_name == 'push'
- uses: pypa/gh-action-pypi-publish@v1.9.0
+ uses: pypa/gh-action-pypi-publish@v1.12.4
with:
repository_url: https://test.pypi.org/legacy/
- name: Publish distribution to PyPI
if: github.event_name == 'release'
- uses: pypa/gh-action-pypi-publish@v1.9.0
+ uses: pypa/gh-action-pypi-publish@v1.12.4
diff -Nru patroni-4.0.5/.github/workflows/tests.yaml patroni-4.0.6/.github/workflows/tests.yaml
--- patroni-4.0.5/.github/workflows/tests.yaml 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/.github/workflows/tests.yaml 2025-06-06 19:27:48.000000000 +0200
@@ -198,7 +198,7 @@
- uses: jakebailey/pyright-action@v2
with:
- version: 1.1.394
+ version: 1.1.401
ydiff:
name: Test compatibility with the latest version of ydiff
diff -Nru patroni-4.0.5/patroni/dcs/etcd3.py patroni-4.0.6/patroni/dcs/etcd3.py
--- patroni-4.0.5/patroni/dcs/etcd3.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/patroni/dcs/etcd3.py 2025-06-06 19:27:48.000000000 +0200
@@ -18,12 +18,14 @@
from urllib3.exceptions import ProtocolError, ReadTimeoutError
+from ..collections import EMPTY_DICT
from ..exceptions import DCSError, PatroniException
from ..postgresql.mpp import AbstractMPP
from ..utils import deep_compare, enable_keepalive, iter_response_objects, RetryFailedError, USER_AGENT
from . import catch_return_false_exception, Cluster, ClusterConfig, \
Failover, Leader, Member, Status, SyncState, TimelineHistory
-from .etcd import AbstractEtcd, AbstractEtcdClientWithFailover, catch_etcd_errors, DnsCachingResolver, Retry
+from .etcd import AbstractEtcd, AbstractEtcdClientWithFailover, catch_etcd_errors, \
+ DnsCachingResolver, Retry, StaleEtcdNode, StaleEtcdNodeGuard
logger = logging.getLogger(__name__)
@@ -240,13 +242,24 @@
try:
data = data.decode('utf-8')
ret: Dict[str, Any] = json.loads(data)
+
+ header = ret.get('header', EMPTY_DICT)
+ self._check_cluster_raft_term(header.get('cluster_id'), header.get('raft_term'))
+
if response.status < 400:
return ret
except (TypeError, ValueError, UnicodeError) as e:
if response.status < 400:
raise etcd.EtcdException('Server response was not valid JSON: %r' % e)
ret = {}
- raise _raise_for_data(ret or data, response.status)
+ ex = _raise_for_data(ret or data, response.status)
+ if isinstance(ex, Unavailable):
+ # <Unavailable error: 'etcdserver: request timed out', code: 14>
+ # Pretend that we got `socket.timeout` and let `AbstractEtcdClientWithFailover._do_http_request()`
+ # method handle it by switching to another etcd node and retrying request.
+ raise socket.timeout from ex
+ else:
+ raise ex
def _ensure_version_prefix(self, base_uri: str, **kwargs: Any) -> None:
if self.version_prefix != '/v3':
@@ -420,10 +433,11 @@
return self.watchrange(key, prefix_range_end(key), start_revision, filters, read_timeout)
-class KVCache(Thread):
+class KVCache(StaleEtcdNodeGuard, Thread):
def __init__(self, dcs: 'Etcd3', client: 'PatroniEtcd3Client') -> None:
- super(KVCache, self).__init__()
+ Thread.__init__(self)
+ StaleEtcdNodeGuard.__init__(self)
self.daemon = True
self._dcs = dcs
self._client = client
@@ -493,7 +507,10 @@
logger.debug('Received message: %s', message)
if 'error' in message:
raise _raise_for_data(message)
- events: List[Dict[str, Any]] = message.get('result', {}).get('events', [])
+ result = message.get('result', EMPTY_DICT)
+ header = result.get('header', EMPTY_DICT)
+ self._check_cluster_raft_term(header.get('cluster_id'), header.get('raft_term'))
+ events: List[Dict[str, Any]] = result.get('events', [])
for event in events:
self._process_event(event)
@@ -527,8 +544,11 @@
def _build_cache(self) -> None:
result = self._dcs.retry(self._client.prefix, self._dcs.cluster_prefix)
+ header = result.get('header', EMPTY_DICT)
with self._object_cache_lock:
+ self._reset_cluster_raft_term()
self._object_cache = {node['key']: node for node in result.get('kvs', [])}
+ self._check_cluster_raft_term(header.get('cluster_id'), header.get('raft_term'))
with self.condition:
self._is_ready = True
self.condition.notify()
@@ -574,6 +594,12 @@
def is_ready(self) -> bool:
"""Must be called only when holding the lock on `condition`"""
+ if self._is_ready:
+ try:
+ self._client._check_cluster_raft_term(self._cluster_id, self._raft_term)
+ except StaleEtcdNode:
+ self._is_ready = False
+ self.kill_stream()
return self._is_ready
@@ -662,8 +688,7 @@
class Etcd3(AbstractEtcd):
def __init__(self, config: Dict[str, Any], mpp: AbstractMPP) -> None:
- super(Etcd3, self).__init__(config, mpp, PatroniEtcd3Client,
- (DeadlineExceeded, Unavailable, FailedPrecondition))
+ super(Etcd3, self).__init__(config, mpp, PatroniEtcd3Client, (DeadlineExceeded, FailedPrecondition))
self.__do_not_watch = False
self._lease = None
self._last_lease_refresh = 0
@@ -961,6 +986,7 @@
return self.retry(self._client.deleterange, self.sync_path, mod_revision=version)
def watch(self, leader_version: Optional[str], timeout: float) -> bool:
+ self._last_lease_refresh = 0
if self.__do_not_watch:
self.__do_not_watch = False
return True
diff -Nru patroni-4.0.5/patroni/dcs/etcd.py patroni-4.0.6/patroni/dcs/etcd.py
--- patroni-4.0.5/patroni/dcs/etcd.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/patroni/dcs/etcd.py 2025-06-06 19:27:48.000000000 +0200
@@ -41,6 +41,10 @@
"""Raft Internal Error"""
+class StaleEtcdNode(Exception):
+ """Node is stale (raft term is older than previous known)."""
+
+
class EtcdError(DCSError):
pass
@@ -97,11 +101,51 @@
return []
-class AbstractEtcdClientWithFailover(abc.ABC, etcd.Client):
+class StaleEtcdNodeGuard(object):
+
+ def __init__(self) -> None:
+ self._reset_cluster_raft_term()
+
+ def _reset_cluster_raft_term(self) -> None:
+ self._cluster_id = None
+ self._raft_term = 0
+
+ def _check_cluster_raft_term(self, cluster_id: Optional[str], value: Union[None, str, int]) -> None:
+ """Check that observed Raft Term in Etcd cluster is increasing.
+
+ :param cluster_id: last observed Etcd Cluster ID
+ :param raft_term: last observed Raft Term
+
+ :raises:
+ :exc::`StaleEtcdNode` if last observed *raft_term* is smaller than previously known *raft_term*.
+ """
+ if not (cluster_id and value):
+ return
+
+ # We need to reset the memorized value when we notice that Cluster ID changed.
+ if self._cluster_id and self._cluster_id != cluster_id:
+ logger.warning('Etcd Cluster ID changed from %s to %s', self._cluster_id, cluster_id)
+ self._raft_term = 0
+ self._cluster_id = cluster_id
+
+ try:
+ raft_term = int(value)
+ except Exception:
+ return
+
+ if raft_term < self._raft_term:
+ logger.warning('Connected to Etcd node with term %d. Old known term %d. Switching to another node.',
+ raft_term, self._raft_term)
+ raise StaleEtcdNode
+ self._raft_term = raft_term
+
+
+class AbstractEtcdClientWithFailover(abc.ABC, etcd.Client, StaleEtcdNodeGuard):
ERROR_CLS: Type[Exception]
def __init__(self, config: Dict[str, Any], dns_resolver: DnsCachingResolver, cache_ttl: int = 300) -> None:
+ StaleEtcdNodeGuard.__init__(self)
self._dns_resolver = dns_resolver
self.set_machines_cache_ttl(cache_ttl)
self._machines_cache_updated = 0
@@ -227,7 +271,7 @@
def _do_http_request(self, retry: Optional[Retry], machines_cache: List[str],
request_executor: Callable[..., urllib3.response.HTTPResponse],
method: str, path: str, fields: Optional[Dict[str, Any]] = None,
- **kwargs: Any) -> urllib3.response.HTTPResponse:
+ **kwargs: Any) -> Any:
is_watch_request = isinstance(fields, dict) and fields.get('wait') == 'true'
if fields is not None:
kwargs['fields'] = fields
@@ -241,8 +285,8 @@
if some_request_failed:
self.set_base_uri(base_uri)
self._refresh_machines_cache()
- return response
- except (HTTPError, HTTPException, socket.error, socket.timeout) as e:
+ return self._handle_server_response(response)
+ except (HTTPError, HTTPException, socket.error, socket.timeout, StaleEtcdNode) as e:
self.http.clear()
if not retry:
if len(machines_cache) == 1:
@@ -285,8 +329,7 @@
while True:
try:
- response = self._do_http_request(retry, machines_cache, request_executor, method, path, **kwargs)
- return self._handle_server_response(response)
+ return self._do_http_request(retry, machines_cache, request_executor, method, path, **kwargs)
except etcd.EtcdWatchTimedOut:
raise
except etcd.EtcdConnectionFailed as ex:
@@ -463,6 +506,10 @@
def _prepare_get_members(self, etcd_nodes: int) -> Dict[str, Any]:
return self._prepare_common_parameters(etcd_nodes)
+ def _handle_server_response(self, response: urllib3.response.HTTPResponse) -> Any:
+ self._check_cluster_raft_term(response.headers.get('x-etcd-cluster-id'), response.headers.get('x-raft-term'))
+ return super(EtcdClient, self)._handle_server_response(response)
+
def _get_members(self, base_uri: str, **kwargs: Any) -> List[str]:
response = self.http.request(self._MGET, base_uri + self.version_prefix + '/machines', **kwargs)
data = self._handle_server_response(response).data.decode('utf-8')
diff -Nru patroni-4.0.5/patroni/dcs/kubernetes.py patroni-4.0.6/patroni/dcs/kubernetes.py
--- patroni-4.0.5/patroni/dcs/kubernetes.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/patroni/dcs/kubernetes.py 2025-06-06 19:27:48.000000000 +0200
@@ -14,7 +14,7 @@
from copy import deepcopy
from http.client import HTTPException
from threading import Condition, Lock, Thread
-from typing import Any, Callable, Collection, Dict, List, Optional, Tuple, Type, TYPE_CHECKING, Union
+from typing import Any, Callable, cast, Collection, Dict, List, Optional, Tuple, Type, TYPE_CHECKING, Union
import urllib3
import yaml
@@ -189,13 +189,12 @@
@classmethod
def _wrap(cls, parent: Optional[str], value: Any) -> Any:
if isinstance(value, dict):
- data_dict: Dict[str, Any] = value
+ data_dict: Dict[str, Any] = cast(Dict[str, Any], value)
# we know that `annotations` and `labels` are dicts and therefore don't want to convert them into K8sObject
return data_dict if parent in {'annotations', 'labels'} and \
all(isinstance(v, str) for v in data_dict.values()) else cls(data_dict)
elif isinstance(value, list):
- data_list: List[Any] = value
- return [cls._wrap(None, v) for v in data_list]
+ return [cls._wrap(None, v) for v in cast(List[Any], value)]
else:
return value
@@ -1233,6 +1232,10 @@
return bool(_run_and_handle_exceptions(self._patch_or_create, self.leader_path, annotations,
kind_resource_version, ips=ips, retry=_retry))
+ @staticmethod
+ def _isotime() -> str:
+ return datetime.datetime.now(tzutc).isoformat()
+
def update_leader(self, cluster: Cluster, last_lsn: Optional[int],
slots: Optional[Dict[str, int]] = None, failsafe: Optional[Dict[str, str]] = None) -> bool:
kind = self._kinds.get(self.leader_path)
@@ -1241,7 +1244,7 @@
if kind and kind_annotations.get(self._LEADER) != self._name:
return False
- now = datetime.datetime.now(tzutc).isoformat()
+ now = self._isotime()
leader_observed_record = kind_annotations or self._leader_observed_record
annotations = {self._LEADER: self._name, 'ttl': str(self._ttl), 'renewTime': now,
'acquireTime': leader_observed_record.get('acquireTime') or now,
@@ -1259,7 +1262,7 @@
return self._update_leader_with_retry(annotations, resource_version, self.__ips)
def attempt_to_acquire_leader(self) -> bool:
- now = datetime.datetime.now(tzutc).isoformat()
+ now = self._isotime()
annotations = {self._LEADER: self._name, 'ttl': str(self._ttl),
'renewTime': now, 'acquireTime': now, 'transitions': '0'}
if self._leader_observed_record:
@@ -1274,18 +1277,55 @@
annotations['acquireTime'] = self._leader_observed_record.get('acquireTime') or now
annotations['transitions'] = str(transitions)
+ resource_version = self._leader_resource_version
+ if resource_version:
+ kind = self._kinds.get(self.leader_path)
+ # If leader object in cache was updated we should better use fresh resource_version
+ if kind and kind.metadata.resource_version != resource_version:
+ kind_annotations = kind and kind.metadata.annotations or EMPTY_DICT
+ # But, only in case if leader annotations didn't change
+ if all(kind_annotations.get(k) == self._leader_observed_record.get(k) for k in annotations.keys()):
+ resource_version = kind.metadata.resource_version
+
+ retry = self._retry.copy()
+
+ def _retry(*args: Any, **kwargs: Any) -> Any:
+ kwargs['_retry'] = retry
+ return retry(*args, **kwargs)
+
+ handle_conflict = False
try:
ret = bool(self._patch_or_create(self.leader_path, annotations,
- self._leader_resource_version, retry=self.retry, ips=self.__ips))
+ resource_version, retry=_retry, ips=self.__ips))
except k8s_client.rest.ApiException as e:
- if e.status == 409 and self._leader_resource_version: # Conflict in resource_version
+ if e.status == 409 and resource_version: # Conflict in resource_version
# Terminate watchers, it could be a sign that K8s API is in a failed state
self._kinds.kill_stream()
self._pods.kill_stream()
+ handle_conflict = True
ret = False
except (RetryFailedError, K8sException) as e:
raise KubernetesError(e)
+ if handle_conflict and retry.ensure_deadline(1):
+ # if we are here, that means update failed with 409
+ # Try to get the latest version directly from K8s API instead of relying on async cache
+ try:
+ kind = _retry(self._api.read_namespaced_kind, self.leader_path, self._namespace)
+ except (RetryFailedError, K8sException) as e:
+ raise KubernetesError(e)
+ except Exception as e:
+ logger.error('Failed to get the leader object "%s": %r', self.leader_path, e)
+ else:
+ kind_annotations = kind and kind.metadata.annotations or EMPTY_DICT
+ kind_resource_version = kind and kind.metadata.resource_version
+
+ # We can get 409 because we do at least one retry, and the first update might have succeeded,
+ # therefore we will check if annotations on the read object match expectations.
+ if kind and kind_resource_version != resource_version and\
+ all(kind_annotations.get(k) == v for k, v in annotations.items()):
+ ret = True
+
if not ret:
logger.info('Could not take out TTL lock')
return ret
diff -Nru patroni-4.0.5/patroni/ha.py patroni-4.0.6/patroni/ha.py
--- patroni-4.0.5/patroni/ha.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/patroni/ha.py 2025-06-06 19:27:48.000000000 +0200
@@ -263,9 +263,6 @@
# used only in backoff after failing a pre_promote script
self._released_leader_key_timestamp = 0
- # Initialize global config
- global_config.update(None, self.patroni.config.dynamic_configuration)
-
def primary_stop_timeout(self) -> Union[int, None]:
""":returns: "primary_stop_timeout" from the global configuration or `None` when not in synchronous mode."""
ret = global_config.primary_stop_timeout
@@ -853,7 +850,7 @@
voters=sync.voters,
numsync=sync_state.numsync,
sync=sync_state.sync,
- numsync_confirmed=sync_state.numsync_confirmed,
+ numsync_confirmed=len(sync_state.sync_confirmed),
active=sync_state.active,
sync_wanted=sync_wanted,
leader_wanted=self.state_handler.name):
@@ -899,7 +896,7 @@
current_state = self.state_handler.sync_handler.current_state(self.cluster)
picked = current_state.active
- allow_promote = current_state.sync
+ allow_promote = current_state.sync_confirmed
voters = CaseInsensitiveSet(sync.voters)
if picked == voters and voters != allow_promote:
@@ -910,7 +907,7 @@
return logger.warning("Updating sync state failed")
voters = CaseInsensitiveSet(sync.voters)
- if picked == voters:
+ if picked == voters == current_state.sync and current_state.numsync == len(picked):
return
# update synchronous standby list in dcs temporarily to point to common nodes in current and picked
@@ -934,7 +931,7 @@
if picked and picked != CaseInsensitiveSet('*') and allow_promote != picked:
# Wait for PostgreSQL to enable synchronous mode and see if we can immediately set sync_standby
time.sleep(2)
- allow_promote = self.state_handler.sync_handler.current_state(self.cluster).sync
+ allow_promote = self.state_handler.sync_handler.current_state(self.cluster).sync_confirmed
if allow_promote and allow_promote != sync_common:
if self.dcs.write_sync_state(self.state_handler.name, allow_promote, 0, version=sync.version):
@@ -1114,6 +1111,7 @@
self._failsafe.set_is_active(0)
def before_promote():
+ self._rewind.reset_state() # make sure we will trigger checkpoint after promote
self.notify_mpp_coordinator('before_promote')
with self._async_response:
@@ -1249,12 +1247,15 @@
lag = self.cluster.status.last_lsn - wal_position
return lag > global_config.maximum_lag_on_failover
- def _is_healthiest_node(self, members: Collection[Member], check_replication_lag: bool = True) -> bool:
+ def _is_healthiest_node(self, members: Collection[Member],
+ check_replication_lag: bool = True,
+ leader: Optional[Leader] = None) -> bool:
"""Determine whether the current node is healthy enough to become a new leader candidate.
:param members: the list of nodes to check against
:param check_replication_lag: whether to take the replication lag into account.
If the lag exceeds configured threshold the node disqualifies itself.
+ :param leader: the old cluster leader, it will be used to ignore its ``failover_priority`` value.
:returns: ``True`` if the node is eligible to become the new leader. Since this method is executed
on multiple nodes independently it is possible that multiple nodes could count
themselves as the healthiest because they received/replayed up to the same LSN,
@@ -1296,6 +1297,12 @@
quorum_votes = 0 if self.state_handler.name in voting_set else -1
nodes_ahead = 0
+ # we need to know the name of the former leader to ignore it if it has higher failover_priority
+ if self.sync_mode_is_active():
+ leader_name = self.cluster.sync.leader
+ else:
+ leader_name = leader and leader.name
+
for st in self.fetch_nodes_statuses(members):
if st.failover_limitation() is None:
if st.in_recovery is False:
@@ -1314,6 +1321,11 @@
low_priority = my_wal_position == st.wal_position \
and self.patroni.failover_priority < st.failover_priority
+ if low_priority and leader_name and leader_name == st.member.name:
+ logger.info('Ignoring former leader %s having priority %s higher than this nodes %s priority',
+ leader_name, st.failover_priority, self.patroni.failover_priority)
+ low_priority = False
+
if low_priority and (not self.sync_mode_is_active() or quorum_vote):
# There's a higher priority non-lagging replica
logger.info(
@@ -1364,7 +1376,14 @@
quorum_votes += 1
# In case of quorum replication we need to make sure that there is enough healthy synchronous replicas!
- return quorum_votes >= (self.cluster.sync.quorum if self.quorum_commit_mode_is_active() else 0)
+ # However, when failover candidate is set, we can ignore quorum requirements.
+ check_quorum = self.quorum_commit_mode_is_active() and\
+ not (self.cluster.failover and self.cluster.failover.candidate and not exclude_failover_candidate)
+ if check_quorum and quorum_votes < self.cluster.sync.quorum:
+ logger.info('Quorum requirement %d can not be reached', self.cluster.sync.quorum)
+ return False
+
+ return quorum_votes >= 0
def manual_failover_process_no_leader(self) -> Optional[bool]:
"""Handles manual failover/switchover when the old leader already stepped down.
@@ -1504,7 +1523,7 @@
# run usual health check
members = {m.name: m for m in all_known_members}
- return self._is_healthiest_node(members.values())
+ return self._is_healthiest_node(members.values(), leader=self.old_cluster.leader)
def _delete_leader(self, last_lsn: Optional[int] = None) -> None:
self.set_is_leader(False)
@@ -2253,10 +2272,7 @@
self._sync_replication_slots(True)
return 'continue to run as a leader because failsafe mode is enabled and all members are accessible'
self._failsafe.set_is_active(0)
- msg = 'demoting self because DCS is not accessible and I was a leader'
- if not self._async_executor.try_run_async(msg, self.demote, ('offline',)):
- return msg
- logger.warning('AsyncExecutor is busy, demoting from the main thread')
+ logger.info('demoting self because DCS is not accessible and I was a leader')
self.demote('offline')
return 'demoted self because DCS is not accessible and I was a leader'
else:
@@ -2404,8 +2420,9 @@
return False
# Don't spend time on "nofailover" nodes checking.
# We also don't need nodes which we can't query with the api in the list.
- return node.name not in exclude and \
- not node.nofailover and bool(node.api_url) and \
- (not failover or not failover.candidate or node.name == failover.candidate)
+ # And, if exclude_failover_candidate is True we want to skip node.name == failover.candidate check.
+ return node.name not in exclude and not node.nofailover and bool(node.api_url) and \
+ (exclude_failover_candidate or not failover
+ or not failover.candidate or node.name == failover.candidate)
return list(filter(is_eligible, self.cluster.members))
diff -Nru patroni-4.0.5/patroni/__main__.py patroni-4.0.6/patroni/__main__.py
--- patroni-4.0.5/patroni/__main__.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/patroni/__main__.py 2025-06-06 19:27:48.000000000 +0200
@@ -12,7 +12,7 @@
from argparse import Namespace
from typing import Any, Dict, List, Optional, TYPE_CHECKING
-from patroni import MIN_PSYCOPG2, MIN_PSYCOPG3, parse_version
+from patroni import global_config, MIN_PSYCOPG2, MIN_PSYCOPG3, parse_version
from patroni.daemon import abstract_main, AbstractPatroniDaemon, get_base_arg_parser
from patroni.tags import Tags
@@ -70,6 +70,9 @@
self.watchdog = Watchdog(self.config)
self.apply_dynamic_configuration(cluster)
+ # Initialize global config
+ global_config.update(None, self.config.dynamic_configuration)
+
self.postgresql = Postgresql(self.config['postgresql'], self.dcs.mpp)
self.api = RestApiServer(self, self.config['restapi'])
self.ha = Ha(self)
diff -Nru patroni-4.0.5/patroni/postgresql/config.py patroni-4.0.6/patroni/postgresql/config.py
--- patroni-4.0.5/patroni/postgresql/config.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/patroni/postgresql/config.py 2025-06-06 19:27:48.000000000 +0200
@@ -477,16 +477,19 @@
return configuration
def set_file_permissions(self, filename: str) -> None:
- """Set permissions of file *filename* according to the expected permissions if it resides under PGDATA.
+ """Set permissions of file *filename* according to the expected permissions.
.. note::
- Do nothing if the file is not under PGDATA.
+ Use original umask if the file is not under PGDATA, use PGDATA
+ permissions otherwise.
:param filename: path to a file which permissions might need to be adjusted.
"""
if is_subpath(self._postgresql.data_dir, filename):
pg_perm.set_permissions_from_data_directory(self._postgresql.data_dir)
os.chmod(filename, pg_perm.file_create_mode)
+ else:
+ os.chmod(filename, 0o666 & ~pg_perm.orig_umask)
@contextmanager
def config_writer(self, filename: str) -> Iterator[ConfigWriter]:
@@ -1247,7 +1250,7 @@
and config.get('pg_hba'):
hba_changed = self._config.get('pg_hba', []) != config['pg_hba']
- if (not server_parameters.get('ident_file') or server_parameters['ident_file'] == self._pg_hba_conf) \
+ if (not server_parameters.get('ident_file') or server_parameters['ident_file'] == self._pg_ident_conf) \
and config.get('pg_ident'):
ident_changed = self._config.get('pg_ident', []) != config['pg_ident']
@@ -1266,13 +1269,13 @@
proxy_addr = config.get('proxy_address')
self._postgresql.proxy_url = uri('postgres', proxy_addr, self._postgresql.database) if proxy_addr else None
- if conf_changed:
+ if conf_changed or sighup:
self.write_postgresql_conf()
- if hba_changed:
+ if hba_changed or sighup:
self.replace_pg_hba()
- if ident_changed:
+ if ident_changed or sighup:
self.replace_pg_ident()
if sighup or conf_changed or hba_changed or ident_changed:
diff -Nru patroni-4.0.5/patroni/postgresql/__init__.py patroni-4.0.6/patroni/postgresql/__init__.py
--- patroni-4.0.5/patroni/postgresql/__init__.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/patroni/postgresql/__init__.py 2025-06-06 19:27:48.000000000 +0200
@@ -83,6 +83,8 @@
self._connection = self.connection_pool.get('heartbeat')
self.mpp_handler = mpp.get_handler_impl(self)
self._bin_dir = config.get('bin_dir') or ''
+ self._role_lock = Lock()
+ self.set_role('uninitialized')
self.config = ConfigHandler(self, config)
self.config.check_directories()
@@ -107,7 +109,6 @@
self._is_leader_retry = Retry(max_tries=1, deadline=config['retry_timeout'] / 2.0, max_delay=1,
retry_exceptions=PostgresConnectionException)
- self._role_lock = Lock()
self.set_role(self.get_postgres_role_from_data_directory())
self._state_entry_timestamp = 0
@@ -1328,7 +1329,7 @@
os.unlink(source)
os.symlink(new_name, source)
- new_name = '{0}.{1}'.format(self._data_dir, postfix)
+ new_name = '{0}.{1}'.format(self._data_dir.rstrip(os.sep), postfix)
logger.info('renaming data directory to %s', new_name)
if os.path.exists(new_name):
shutil.rmtree(new_name)
diff -Nru patroni-4.0.5/patroni/postgresql/rewind.py patroni-4.0.6/patroni/postgresql/rewind.py
--- patroni-4.0.5/patroni/postgresql/rewind.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/patroni/postgresql/rewind.py 2025-06-06 19:27:48.000000000 +0200
@@ -73,6 +73,8 @@
def trigger_check_diverged_lsn(self) -> None:
if self.can_rewind_or_reinitialize_allowed and self._state != REWIND_STATUS.NEED:
self._state = REWIND_STATUS.CHECK
+ with self._checkpoint_task_lock:
+ self._checkpoint_task = None
@staticmethod
def check_leader_is_not_in_recovery(conn_kwargs: Dict[str, Any]) -> Optional[bool]:
@@ -305,10 +307,16 @@
if self._state != REWIND_STATUS.CHECKPOINT and self._postgresql.is_primary():
with self._checkpoint_task_lock:
if self._checkpoint_task:
+ result = None
+
with self._checkpoint_task:
- if self._checkpoint_task.result is not None:
- self._state = REWIND_STATUS.CHECKPOINT
- self._checkpoint_task = None
+ result = self._checkpoint_task.result
+
+ if result is True:
+ self._state = REWIND_STATUS.CHECKPOINT
+
+ if result is not None:
+ self._checkpoint_task = None
elif self._postgresql.get_primary_timeline() == self._postgresql.pg_control_timeline():
self._state = REWIND_STATUS.CHECKPOINT
else:
diff -Nru patroni-4.0.5/patroni/postgresql/slots.py patroni-4.0.6/patroni/postgresql/slots.py
--- patroni-4.0.5/patroni/postgresql/slots.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/patroni/postgresql/slots.py 2025-06-06 19:27:48.000000000 +0200
@@ -488,8 +488,8 @@
"""Update logical *slots* on replicas.
If the logical slot already exists, copy state information into the replication slots structure stored in the
- class instance. Slots that exist are also advanced if their ``confirmed_flush_lsn`` is greater than the stored
- state of the slot.
+ class instance. Slots that exist are also advanced if their ``confirmed_flush_lsn`` is smaller than the state
+ of the slot stored in DCS or at least the ``replay_lsn`` of this replica.
As logical slots can only be created when the primary is available, pass the list of slots that need to be
copied back to the caller. They will be created on replicas with :meth:`SlotsHandler.copy_logical_slots`.
@@ -510,9 +510,14 @@
# If the logical already exists, copy some information about it into the original structure
if name in self._replication_slots and compare_slots(value, self._replication_slots[name]):
self._copy_items(self._replication_slots[name], value)
- if 'lsn' in value and value['confirmed_flush_lsn'] < value['lsn']: # The slot has feedback in DCS
+
+ # The slot has feedback in DCS
+ if 'lsn' in value:
+ # we can not advance past replay_lsn
+ advance_value = min(value['lsn'], self._postgresql.replayed_location())
# Skip slots that don't need to be advanced
- advance_slots[value['database']][name] = value['lsn']
+ if value['confirmed_flush_lsn'] < advance_value:
+ advance_slots[value['database']][name] = advance_value
elif name not in self._replication_slots and 'lsn' in value:
# We want to copy only slots with feedback in a DCS
create_slots.append(name)
diff -Nru patroni-4.0.5/patroni/postgresql/sync.py patroni-4.0.6/patroni/postgresql/sync.py
--- patroni-4.0.5/patroni/postgresql/sync.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/patroni/postgresql/sync.py 2025-06-06 19:27:48.000000000 +0200
@@ -7,7 +7,7 @@
from .. import global_config
from ..collections import CaseInsensitiveDict, CaseInsensitiveSet
-from ..dcs import Cluster
+from ..dcs import Cluster, Member
from ..psycopg import quote_ident
if TYPE_CHECKING: # pragma: no cover
@@ -166,17 +166,16 @@
:ivar sync_type: possible values: ``off``, ``priority``, ``quorum``
:ivar numsync: how many nodes are required to be synchronous (according to ``synchronous_standby_names``).
Is ``0`` if ``synchronous_standby_names`` value is invalid or contains ``*``.
- :ivar numsync_confirmed: how many nodes are known to be synchronous according to the ``pg_stat_replication`` view.
- Only nodes that caught up with the :attr:`SyncHandler._primary_flush_lsn` are counted.
- :ivar sync: collection of synchronous node names. In case of quorum commit all nodes listed
- in ``synchronous_standby_names``, otherwise nodes that are confirmed to be synchronous according
- to the ``pg_stat_replication`` view.
+ :ivar sync: collection of synchronous node names from ``synchronous_standby_names``.
+ :ivar sync_confirmed: collection of synchronous node names from ``synchronous_standby_names`` that are
+ confirmed to be synchronous according to the ``pg_stat_replication`` view.
+ Only nodes that caught up with the :attr:`SyncHandler._primary_flush_lsn` are counted.
:ivar active: collection of node names that are streaming and have no restrictions to become synchronous.
"""
sync_type: str
numsync: int
- numsync_confirmed: int
sync: CaseInsensitiveSet
+ sync_confirmed: CaseInsensitiveSet
active: CaseInsensitiveSet
@@ -227,8 +226,11 @@
'remote_write': 'write'
}.get(postgresql.synchronous_commit(), 'flush') + '_lsn'
- members = CaseInsensitiveDict({m.name: m for m in cluster.members if m.name.lower() != postgresql.name.lower()})
- for row in postgresql.pg_stat_replication():
+ members = CaseInsensitiveDict({m.name: m for m in cluster.members
+ if m.is_running and not m.nosync and m.name.lower() != postgresql.name.lower()})
+ replication = CaseInsensitiveDict({row['application_name']: row for row in postgresql.pg_stat_replication()
+ if row[sort_col] is not None and row['application_name'] in members})
+ for row in replication.values():
member = members.get(row['application_name'])
# We want to consider only rows from ``pg_stat_replication` that:
@@ -236,7 +238,8 @@
# 2. can be mapped to a ``Member`` of the ``Cluster``:
# a. ``Member`` doesn't have ``nosync`` tag set;
# b. PostgreSQL on the member is known to be running and accepting client connections.
- if member and row[sort_col] is not None and member.is_running and not member.nosync:
+ # c. ``Member`` isn't supposed to stream from another standby (``replicatefrom`` tag).
+ if member and row[sort_col] is not None and not self._should_cascade(members, replication, member):
self.append(_Replica(row['pid'], row['application_name'],
row['sync_state'], row[sort_col], bool(member.nofailover)))
@@ -247,6 +250,27 @@
# up-to-date replica otherwise with cluster LSN if there is only one replica.
self.max_lsn = max(self, key=lambda x: x.lsn).lsn if len(self) > 1 else postgresql.last_operation()
+ @staticmethod
+ def _should_cascade(members: CaseInsensitiveDict, replication: CaseInsensitiveDict, member: Member) -> bool:
+ """Check whether *member* is supposed to cascade from another standby node.
+
+ :param members: members that are eligible to stream (state=running and don't have nosync tag)
+ :param replication: state of ``pg_stat_replication``, already filtered by member names from *members*
+ :param member: member that we want to check
+
+ :returns: ``True`` if provided member should stream from other standby node in
+ the cluster (according to ``replicatefrom`` tag), because some standbys
+ in a chain already streaming from the primary, otherwise ``False``
+ """
+ if not member.replicatefrom or member.replicatefrom not in members:
+ return False
+
+ member = members[member.replicatefrom]
+ if not member.replicatefrom:
+ return member.name in replication
+
+ return _ReplicaList._should_cascade(members, replication, member)
+
class SyncHandler(object):
"""Class responsible for working with the `synchronous_standby_names`.
@@ -346,8 +370,7 @@
self._process_replica_readiness(cluster, replica_list)
active = CaseInsensitiveSet()
- sync_nodes = CaseInsensitiveSet()
- numsync_confirmed = 0
+ sync_confirmed = CaseInsensitiveSet()
sync_node_count = global_config.synchronous_node_count if self._postgresql.supports_multiple_sync else 1
sync_node_maxlag = global_config.maximum_lag_on_syncnode
@@ -361,24 +384,20 @@
# there is a chance that a non-promotable node is ahead of a promotable one.
if not replica.nofailover or len(active) < sync_node_count:
if replica.application_name in self._ready_replicas:
- numsync_confirmed += 1
+ sync_confirmed.add(replica.application_name)
active.add(replica.application_name)
else:
active.add(replica.application_name)
if replica.sync_state == 'sync' and replica.application_name in self._ready_replicas:
- sync_nodes.add(replica.application_name)
- numsync_confirmed += 1
+ sync_confirmed.add(replica.application_name)
if len(active) >= sync_node_count:
break
- if global_config.is_quorum_commit_mode:
- sync_nodes = CaseInsensitiveSet() if self._ssn_data.has_star else self._ssn_data.members
-
return _SyncState(
self._ssn_data.sync_type,
- 0 if self._ssn_data.has_star else self._ssn_data.num,
- numsync_confirmed,
- sync_nodes,
+ self._ssn_data.num,
+ CaseInsensitiveSet() if self._ssn_data.has_star else self._ssn_data.members,
+ sync_confirmed,
active)
def set_synchronous_standby_names(self, sync: Collection[str], num: Optional[int] = None) -> None:
diff -Nru patroni-4.0.5/patroni/postgresql/validator.py patroni-4.0.6/patroni/postgresql/validator.py
--- patroni-4.0.5/patroni/postgresql/validator.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/patroni/postgresql/validator.py 2025-06-06 19:27:48.000000000 +0200
@@ -2,7 +2,7 @@
import logging
from copy import deepcopy
-from typing import Any, Dict, Iterator, List, MutableMapping, Optional, Tuple, Type, Union
+from typing import Any, cast, Dict, Iterator, List, MutableMapping, Optional, Tuple, Type, Union
import yaml
@@ -223,8 +223,7 @@
for key, value in validator.items():
# :func:`_transform_parameter_value` expects :class:`tuple` instead of :class:`list`
if isinstance(value, list):
- tmp_value: List[Any] = value
- validator[key] = tuple(tmp_value)
+ validator[key] = tuple(cast(List[Any], value))
try:
return cls.TYPES[type_](**validator)
diff -Nru patroni-4.0.5/patroni/version.py patroni-4.0.6/patroni/version.py
--- patroni-4.0.5/patroni/version.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/patroni/version.py 2025-06-06 19:27:48.000000000 +0200
@@ -2,4 +2,4 @@
:var __version__: the current Patroni version.
"""
-__version__ = '4.0.5'
+__version__ = '4.0.6'
diff -Nru patroni-4.0.5/requirements.txt patroni-4.0.6/requirements.txt
--- patroni-4.0.5/requirements.txt 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/requirements.txt 2025-06-06 19:27:48.000000000 +0200
@@ -3,8 +3,10 @@
PyYAML
kazoo>=1.3.1
python-etcd>=0.4.3,<0.5
-py-consul>=1.1.1
-click>=4.1
+py-consul>=1.1.1,<1.5.4; python_version=="3.6"
+py-consul>=1.1.1,<1.6.0; python_version>"3.6" and python_version<"3.9"
+py-consul>=1.1.1; python_version>="3.9"
+click>=5.0
prettytable>=0.7
python-dateutil
pysyncobj>=0.3.8
diff -Nru patroni-4.0.5/tests/test_api.py patroni-4.0.6/tests/test_api.py
--- patroni-4.0.5/tests/test_api.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/tests/test_api.py 2025-06-06 19:27:48.000000000 +0200
@@ -18,6 +18,7 @@
from patroni.utils import RetryFailedError, tzutc
from . import MockConnect, psycopg_connect
+from .test_etcd import socket_getaddrinfo
from .test_ha import get_cluster_initialized_without_leader
future_restart_time = datetime.datetime.now(tzutc) + datetime.timedelta(days=5)
@@ -328,10 +329,14 @@
'tag_key1=true&tag_key2=false&'
'tag_key3=1&tag_key4=1.4&tag_key5=RandomTag&tag_key6=RandomTag2')
- def test_do_OPTIONS(self):
+ @patch.object(MockPatroni, 'dcs')
+ def test_do_OPTIONS(self, mock_dcs):
+ mock_dcs.cluster.status.last_lsn = 20
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'OPTIONS / HTTP/1.0'))
- def test_do_HEAD(self):
+ @patch.object(MockPatroni, 'dcs')
+ def test_do_HEAD(self, mock_dcs):
+ mock_dcs.cluster.status.last_lsn = 20
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'HEAD / HTTP/1.0'))
@patch.object(MockPatroni, 'dcs')
@@ -706,6 +711,7 @@
patch.object(MockRestApiServer, 'server_close', Mock()):
self.srv.reload_config({'listen': ':8008'})
+ @patch('socket.getaddrinfo', socket_getaddrinfo)
@patch.object(MockPatroni, 'dcs')
def test_check_access(self, mock_dcs):
mock_dcs.cluster = get_cluster_initialized_without_leader()
diff -Nru patroni-4.0.5/tests/test_ctl.py patroni-4.0.6/tests/test_ctl.py
--- patroni-4.0.5/tests/test_ctl.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/tests/test_ctl.py 2025-06-06 19:27:48.000000000 +0200
@@ -230,7 +230,7 @@
@patch('patroni.dcs.AbstractDCS.set_failover_value', Mock())
def test_failover(self):
# No candidate specified
- result = self.runner.invoke(ctl, ['failover', 'dummy'], input='0\n')
+ result = self.runner.invoke(ctl, ['failover', 'dummy'], input='0\n\n')
self.assertIn('Failover could be performed only to a specific candidate', result.output)
# Candidate is the same as the leader
@@ -346,7 +346,7 @@
@patch('patroni.ctl.request_patroni')
def test_restart_reinit(self, mock_post):
mock_post.return_value.status = 503
- result = self.runner.invoke(ctl, ['restart', 'alpha'], input='now\ny\n')
+ result = self.runner.invoke(ctl, ['restart', 'alpha'], input='now\ny\n\n')
assert 'Failed: restart for' in result.output
assert result.exit_code == 0
@@ -354,7 +354,7 @@
assert result.exit_code == 1
# successful reinit
- result = self.runner.invoke(ctl, ['reinit', 'alpha', 'other'], input='y\ny')
+ result = self.runner.invoke(ctl, ['reinit', 'alpha', 'other'], input='y\ny\nn')
assert result.exit_code == 0
# Aborted restart
diff -Nru patroni-4.0.5/tests/test_etcd3.py patroni-4.0.6/tests/test_etcd3.py
--- patroni-4.0.5/tests/test_etcd3.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/tests/test_etcd3.py 2025-06-06 19:27:48.000000000 +0200
@@ -1,4 +1,5 @@
import json
+import socket
import unittest
from threading import Thread
@@ -50,7 +51,7 @@
elif url.endswith('/watch'):
key = base64_encode('/patroni/test/config')
ret.read_chunked = Mock(return_value=[json.dumps({
- 'result': {'events': [
+ 'result': {'header': {'cluster_id': '1', 'raft_term': 1}, 'events': [
{'kv': {'key': key, 'value': base64_encode('bar'), 'mod_revision': '2'}},
{'kv': {'key': key, 'value': base64_encode('buzz'), 'mod_revision': '3'}},
{'type': 'DELETE', 'kv': {'key': key, 'mod_revision': '4'}},
@@ -119,6 +120,14 @@
type(mock_conn).sock = PropertyMock(side_effect=Exception)
self.kv_cache.kill_stream()
+ @patch.object(urllib3.PoolManager, 'urlopen', mock_urlopen)
+ def test_is_ready(self):
+ self.kv_cache._build_cache()
+ with self.kv_cache.condition:
+ self.kv_cache._is_ready = True
+ self.client._raft_term = 2
+ self.assertFalse(self.kv_cache.is_ready())
+
class TestPatroniEtcd3Client(BaseTestEtcd3):
@@ -184,6 +193,8 @@
self.assertRaises(etcd.EtcdException, self.client._handle_server_response, response)
response.status_code = 400
self.assertRaises(Unknown, self.client._handle_server_response, response)
+ response.content = '{"error":{"grpc_code":14,"message":"","http_code":400}}'
+ self.assertRaises(socket.timeout, self.client._handle_server_response, response)
response.content = '{"error":{"grpc_code":0,"message":"","http_code":400}}'
try:
self.client._handle_server_response(response)
diff -Nru patroni-4.0.5/tests/test_etcd.py patroni-4.0.6/tests/test_etcd.py
--- patroni-4.0.5/tests/test_etcd.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/tests/test_etcd.py 2025-06-06 19:27:48.000000000 +0200
@@ -117,6 +117,12 @@
ret.content = 'http://localhost:2379,http://localhost:4001'
elif url == 'http://localhost:4001/v2/machines':
ret.content = ''
+ elif url == 'http://localhost:4001/term/':
+ ret.headers['x-etcd-cluster-id'] = 'a'
+ ret.headers['x-raft-term'] = '1'
+ elif url == 'http://localhost:2379/term/':
+ ret.headers['x-etcd-cluster-id'] = 'b'
+ ret.headers['x-raft-term'] = 'x'
elif url != 'http://localhost:2379/':
raise socket.error
return ret
@@ -165,6 +171,23 @@
except Exception:
self.assertIsNone(machines)
+ def test__check_cluster_raft_term(self):
+ self.client._raft_term = 2
+ self.client._base_uri = 'http://localhost:4001/term'
+ self.client._machines_cache = [self.client._base_uri, 'http://localhost:2379/term']
+ rtry = Retry(deadline=10, max_delay=1, max_tries=-1, retry_exceptions=(etcd.EtcdLeaderElectionInProgress,))
+ with patch('patroni.dcs.etcd.logger.warning') as mock_logger:
+ rtry(self.client.api_execute, '/', 'POST', timeout=0, params={'retry': rtry})
+ self.assertEqual(mock_logger.call_args_list[0][0],
+ ('Connected to Etcd node with term %d. Old known term %d. Switching to another node.',
+ 1, 2))
+ self.assertEqual(mock_logger.call_args_list[1][0], ('Etcd Cluster ID changed from %s to %s', 'a', 'b'))
+ self.client._base_uri = self.client._machines_cache[0]
+ with patch('patroni.dcs.etcd.logger.warning') as mock_logger:
+ rtry(self.client.api_execute, '/', 'POST', timeout=0, params={'retry': rtry})
+ self.assertEqual(mock_logger.call_args[0], ('Etcd Cluster ID changed from %s to %s', 'b', 'a'))
+
+ @patch('time.sleep', Mock())
@patch.object(EtcdClient, '_get_machines_list',
Mock(return_value=['http://localhost:4001', 'http://localhost:2379']))
def test_api_execute(self):
diff -Nru patroni-4.0.5/tests/test_ha.py patroni-4.0.6/tests/test_ha.py
--- patroni-4.0.5/tests/test_ha.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/tests/test_ha.py 2025-06-06 19:27:48.000000000 +0200
@@ -67,7 +67,8 @@
'tags': {'clonefrom': True},
'scheduled_restart': {'schedule': "2100-01-01 10:53:07.560445+00:00",
'postgres_version': '99.0.0'}})
- syncstate = SyncState(0 if sync else None, sync and sync[0], sync and sync[1], 0)
+ syncstate = SyncState(0 if sync else None, sync and sync[0],
+ sync and sync[1], sync[2] if sync and len(sync) > 2 else 0)
failsafe = {m.name: m.api_url for m in (m1, m2)} if failsafe else None
return get_cluster(SYSID, leader, [m1, m2], failover, syncstate, cluster_config, failsafe)
@@ -509,8 +510,6 @@
def test_no_dcs_connection_primary_demote(self):
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
- self.assertEqual(self.ha.run_cycle(), 'demoting self because DCS is not accessible and I was a leader')
- self.ha._async_executor.schedule('dummy')
self.assertEqual(self.ha.run_cycle(), 'demoted self because DCS is not accessible and I was a leader')
def test_check_failsafe_topology(self):
@@ -518,7 +517,7 @@
self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe()
global_config.update(self.ha.cluster)
self.ha.dcs._last_failsafe = self.ha.cluster.failsafe
- self.assertEqual(self.ha.run_cycle(), 'demoting self because DCS is not accessible and I was a leader')
+ self.assertEqual(self.ha.run_cycle(), 'demoted self because DCS is not accessible and I was a leader')
self.ha.state_handler.name = self.ha.cluster.leader.name
self.assertFalse(self.ha.failsafe_is_active())
self.assertEqual(self.ha.run_cycle(),
@@ -526,7 +525,7 @@
self.assertTrue(self.ha.failsafe_is_active())
with patch.object(Postgresql, 'slots', Mock(side_effect=Exception)):
self.ha.patroni.request = Mock(side_effect=Exception)
- self.assertEqual(self.ha.run_cycle(), 'demoting self because DCS is not accessible and I was a leader')
+ self.assertEqual(self.ha.run_cycle(), 'demoted self because DCS is not accessible and I was a leader')
self.assertFalse(self.ha.failsafe_is_active())
self.ha.dcs._last_failsafe.clear()
self.ha.dcs._last_failsafe[self.ha.cluster.leader.name] = self.ha.cluster.leader.member.api_url
@@ -977,7 +976,6 @@
with patch('patroni.ha.logger.warning') as mock_warning:
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None),
sync=('leader1', 'postgresql0'))
- self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(), CaseInsensitiveSet()))
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.assertEqual(mock_warning.call_args_list[0][0],
@@ -988,8 +986,6 @@
self.p.set_role('replica')
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'postgresql0', None),
sync=('leader1', 'other'))
- self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(['leader1']),
- CaseInsensitiveSet(['leader1'])))
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
def test_manual_switchover_process_no_leader_in_synchronous_mode(self):
@@ -1012,7 +1008,7 @@
# switchover from a specific leader, but the only sync node (us, postgresql0) has nofailover tag
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', '', None),
- sync=('postgresql0'))
+ sync=('postgresql0', None))
self.ha.patroni.nofailover = True
self.assertEqual(self.ha.run_cycle(), 'following a different leader because I am not allowed to promote')
@@ -1074,6 +1070,9 @@
# if there is a higher-priority node but it has a lower WAL position then this node should race
self.ha.fetch_node_status = get_node_status(failover_priority=6, wal_position=9)
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
+ # if the old leader is a higher-priority node on the same WAL position then this node should race
+ self.ha.fetch_node_status = get_node_status(failover_priority=6)
+ self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members, leader=self.ha.old_cluster.leader))
self.ha.fetch_node_status = get_node_status(wal_position=11) # accessible, in_recovery, wal position ahead
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
# in synchronous_mode consider itself healthy if the former leader is accessible in read-only and ahead of us
@@ -1361,7 +1360,8 @@
self.ha.is_synchronous_mode = true
# Test sync standby not touched when picking the same node
- self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1,
+ self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1,
+ CaseInsensitiveSet(['other']),
CaseInsensitiveSet(['other']),
CaseInsensitiveSet(['other'])))
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
@@ -1372,7 +1372,8 @@
mock_cfg_set_sync.reset_mock()
# Test sync standby is replaced when switching standbys
- self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0, CaseInsensitiveSet(),
+ self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, CaseInsensitiveSet(),
+ CaseInsensitiveSet(),
CaseInsensitiveSet(['other2'])))
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.ha.run_cycle()
@@ -1380,7 +1381,8 @@
mock_cfg_set_sync.assert_not_called()
# Test sync standby is replaced when new standby is joined
- self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1,
+ self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1,
+ CaseInsensitiveSet(['other2']),
CaseInsensitiveSet(['other2']),
CaseInsensitiveSet(['other2', 'other3'])))
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
@@ -1403,7 +1405,8 @@
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.ha.dcs.get_cluster = Mock(return_value=get_cluster_initialized_with_leader(sync=('leader', 'other')))
# self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
- self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1,
+ self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1,
+ CaseInsensitiveSet(['other2']),
CaseInsensitiveSet(['other2']),
CaseInsensitiveSet(['other2'])))
self.ha.run_cycle()
@@ -1428,8 +1431,8 @@
# Test sync set to '*' when synchronous_mode_strict is enabled
mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
- self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0, CaseInsensitiveSet(),
- CaseInsensitiveSet()))
+ self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, CaseInsensitiveSet(),
+ CaseInsensitiveSet(), CaseInsensitiveSet()))
with patch.object(global_config.__class__, 'is_synchronous_mode_strict', PropertyMock(return_value=True)):
self.ha.run_cycle()
mock_set_sync.assert_called_once_with(CaseInsensitiveSet('*'))
@@ -1540,7 +1543,7 @@
self.ha.is_synchronous_mode = true
self.ha.has_lock = true
self.p.name = 'leader'
- self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0,
+ self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, CaseInsensitiveSet(),
CaseInsensitiveSet(), CaseInsensitiveSet()))
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
with patch('patroni.ha.logger.info') as mock_logger:
@@ -1557,7 +1560,7 @@
self.ha.has_lock = true
self.p.name = 'leader'
self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'a'))
- self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0,
+ self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, CaseInsensitiveSet(),
CaseInsensitiveSet(), CaseInsensitiveSet('a')))
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock()
@@ -1785,7 +1788,8 @@
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=None)
# Test /sync key is attempted to set and failed when missing or invalid
- self.p.sync_handler.current_state = Mock(return_value=_SyncState('quorum', 1, 1, CaseInsensitiveSet(['other']),
+ self.p.sync_handler.current_state = Mock(return_value=_SyncState('quorum', 1, CaseInsensitiveSet(['other']),
+ CaseInsensitiveSet(['other']),
CaseInsensitiveSet(['other'])))
self.ha.run_cycle()
self.assertEqual(mock_write_sync.call_count, 1)
@@ -1805,9 +1809,11 @@
self.assertEqual(mock_write_sync.call_args_list[1][1], {'version': None})
self.assertEqual(mock_set_sync.call_count, 0)
- self.p.sync_handler.current_state = Mock(side_effect=[_SyncState('quorum', 1, 0, CaseInsensitiveSet(['foo']),
+ self.p.sync_handler.current_state = Mock(side_effect=[_SyncState('quorum', 1, CaseInsensitiveSet(['foo']),
+ CaseInsensitiveSet(),
CaseInsensitiveSet(['other'])),
- _SyncState('quorum', 1, 1, CaseInsensitiveSet(['foo']),
+ _SyncState('quorum', 1, CaseInsensitiveSet(['foo']),
+ CaseInsensitiveSet(['foo']),
CaseInsensitiveSet(['foo']))])
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=SyncState(1, 'leader', 'foo', 0))
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'foo'))
@@ -1822,8 +1828,9 @@
self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 1 (other)',))
# Test ANY 1 (*) when synchronous_mode_strict and no nodes available
- self.p.sync_handler.current_state = Mock(return_value=_SyncState('quorum', 1, 0,
+ self.p.sync_handler.current_state = Mock(return_value=_SyncState('quorum', 1,
CaseInsensitiveSet(['other', 'foo']),
+ CaseInsensitiveSet(),
CaseInsensitiveSet()))
mock_write_sync.reset_mock()
mock_set_sync.reset_mock()
@@ -1838,3 +1845,42 @@
# Test that _process_quorum_replication doesn't take longer than loop_wait
with patch('time.time', Mock(side_effect=[30, 60, 90, 120])):
self.ha.process_sync_replication()
+
+ def test_is_failover_possible(self):
+ self.p._major_version = 140000 # supports_multiple_sync
+ self.p.name = 'leader'
+ self.ha.fetch_node_status = get_node_status()
+ self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'foo,other', 1),
+ failover=Failover(0, 'leader', 'other', None))
+ self.ha.cluster.members.append(Member(0, 'foo', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
+ # switchover when synchronous_mode = off
+ self.assertTrue(self.ha.is_failover_possible())
+
+ with patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True)):
+ # switchover to synchronous node when synchronous_mode = on
+ self.assertTrue(self.ha.is_failover_possible())
+ with patch.object(global_config.__class__, 'is_quorum_commit_mode', PropertyMock(return_value=True)):
+ # switchover to synchronous node when synchronous_mode = quorum
+ self.assertTrue(self.ha.is_failover_possible()) # success, despite the fact that quorum is low
+ # failover candidate is unhealthy, we are checking if there are other good candidates, but quorum is low
+ self.assertFalse(self.ha.is_failover_possible(exclude_failover_candidate=True))
+ # now we satisfy quorum requirements
+ with patch.object(SyncState, 'quorum', PropertyMock(return_value=0)):
+ self.assertTrue(self.ha.is_failover_possible(exclude_failover_candidate=True))
+
+ self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'foo,other', 1),
+ failover=Failover(0, '', 'foo', None))
+ # failover to missing node foo
+ self.assertFalse(self.ha.is_failover_possible())
+
+ self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'foo,other', 1),
+ failover=Failover(0, 'leader', '', None))
+ # switchover from leader when synchronous_mode = off
+ self.assertTrue(self.ha.is_failover_possible())
+
+ with patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True)):
+ # switchover from leader when synchronous_mode = on
+ self.assertTrue(self.ha.is_failover_possible())
+ with patch.object(global_config.__class__, 'is_quorum_commit_mode', PropertyMock(return_value=True)):
+ # switchover from leader when synchronous_mode = quorum
+ self.assertFalse(self.ha.is_failover_possible()) # failure, because quorum is low
diff -Nru patroni-4.0.5/tests/test_kubernetes.py patroni-4.0.6/tests/test_kubernetes.py
--- patroni-4.0.5/tests/test_kubernetes.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/tests/test_kubernetes.py 2025-06-06 19:27:48.000000000 +0200
@@ -161,6 +161,7 @@
@patch('urllib3.PoolManager.request')
+@patch.object(K8sConfig, '_server', '', create=True)
class TestApiClient(unittest.TestCase):
@patch.object(K8sConfig, '_server', '', create=True)
@@ -291,13 +292,27 @@
self.assertEqual(mock_logger.call_args[0][1], 'Citus')
self.assertIsInstance(mock_logger.call_args[0][2], KubernetesError)
- def test_attempt_to_acquire_leader(self):
+ @patch.object(k8s_client.CoreV1Api, 'read_namespaced_config_map', create=True)
+ def test_attempt_to_acquire_leader(self, mock_read):
+ metadata = k8s_client.V1ObjectMeta(resource_version='2', labels={'f': 'b'}, name='test',
+ annotations={'optime': '1234', 'leader': 'p-0', 'transitions': '0',
+ 'renewTime': 'now', 'acquireTime': 'now', 'ttl': '30'})
+ mock_read.return_value = k8s_client.V1ConfigMap(metadata=metadata)
with patch.object(k8s_client.CoreV1Api, 'patch_namespaced_config_map', create=True) as mock_patch:
mock_patch.side_effect = K8sException
self.assertRaises(KubernetesError, self.k.attempt_to_acquire_leader)
+
mock_patch.side_effect = k8s_client.rest.ApiException(409, '')
+ self.k._leader_resource_version = '0'
+ self.k._isotime = Mock(return_value='now')
+ self.assertTrue(self.k.attempt_to_acquire_leader())
+
+ mock_read.side_effect = Exception
self.assertFalse(self.k.attempt_to_acquire_leader())
+ mock_read.side_effect = RetryFailedError('')
+ self.assertRaises(KubernetesError, self.k.attempt_to_acquire_leader)
+
def test_take_leader(self):
self.k.take_leader()
self.k._leader_observed_record['leader'] = 'test'
diff -Nru patroni-4.0.5/tests/test_log.py patroni-4.0.6/tests/test_log.py
--- patroni-4.0.5/tests/test_log.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/tests/test_log.py 2025-06-06 19:27:48.000000000 +0200
@@ -83,6 +83,8 @@
self.assertRaises(Exception, logger.shutdown)
self.assertLessEqual(logger.queue_size, 2) # "Failed to close the old log handler" could be still in the queue
self.assertEqual(logger.records_lost, 0)
+ del config['log']['traceback_level']
+ logger.reload_config(config)
def test_interceptor(self):
logger = PatroniLogger()
diff -Nru patroni-4.0.5/tests/test_postgresql.py patroni-4.0.6/tests/test_postgresql.py
--- patroni-4.0.5/tests/test_postgresql.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/tests/test_postgresql.py 2025-06-06 19:27:48.000000000 +0200
@@ -1,6 +1,7 @@
import datetime
import os
import re
+import stat
import subprocess
import time
@@ -18,6 +19,7 @@
from patroni.collections import CaseInsensitiveDict, CaseInsensitiveSet
from patroni.dcs import RemoteMember
from patroni.exceptions import PatroniException, PostgresConnectionException
+from patroni.file_perm import pg_perm
from patroni.postgresql import Postgresql, STATE_NO_RESPONSE, STATE_REJECT
from patroni.postgresql.bootstrap import Bootstrap
from patroni.postgresql.callback_executor import CallbackAction
@@ -1145,6 +1147,19 @@
"directory:", mock_warning.call_args[0][0]
)
+ @patch('os.chmod')
+ @patch('os.stat')
+ @patch('os.umask')
+ def test_set_file_permissions(self, mock_umask, mock_stat, mock_chmod):
+ pg_conf = os.path.join(self.p.data_dir, 'postgresql.conf')
+ mock_stat.return_value.st_mode = stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP # MODE_GROUP
+ self.p.config.set_file_permissions(pg_conf)
+ mock_chmod.assert_called_with(pg_conf, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
+
+ pg_conf = os.path.join(os.path.abspath(os.sep) + 'tmp', 'postgresql.conf')
+ self.p.config.set_file_permissions(pg_conf)
+ mock_chmod.assert_called_with(pg_conf, 0o666 & ~pg_perm.orig_umask)
+
@patch('subprocess.call', Mock(return_value=0))
@patch('patroni.psycopg.connect', psycopg_connect)
diff -Nru patroni-4.0.5/tests/test_slots.py patroni-4.0.6/tests/test_slots.py
--- patroni-4.0.5/tests/test_slots.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/tests/test_slots.py 2025-06-06 19:27:48.000000000 +0200
@@ -220,18 +220,32 @@
@patch.object(Postgresql, 'is_primary', Mock(return_value=False))
def test__ensure_logical_slots_replica(self):
self.p.set_role('replica')
- self.cluster.status.slots['ls'] = 12346
+ self.cluster.status.slots['ls'] = 800
+
with patch.object(SlotsHandler, 'check_logical_slots_readiness', Mock(return_value=False)):
self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), [])
+
with patch.object(SlotsHandler, '_query', Mock(return_value=[('ls', 'logical', 1, 499, 'b',
'a', 5, 100, 500)])), \
patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.OperationalError)), \
- patch.object(SlotsAdvanceThread, 'schedule', Mock(return_value=(True, ['ls']))), \
- patch.object(psycopg.OperationalError, 'diag') as mock_diag:
- type(mock_diag).sqlstate = PropertyMock(return_value='58P01')
- self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), ['ls'])
+ patch.object(SlotsAdvanceThread, 'schedule', Mock(return_value=(True, ['ls']))):
+ # copy invalidated slot
+ with patch.object(psycopg.OperationalError, 'diag') as mock_diag:
+ type(mock_diag).sqlstate = PropertyMock(return_value='58P01')
+ self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), ['ls'])
+ # advance slots based on the replay lsn value
+ with patch.object(Postgresql, 'replayed_location', Mock(side_effect=[200, 700, 900])), \
+ patch.object(SlotsHandler, 'schedule_advance_slots') as advance_mock:
+ self.s.sync_replication_slots(self.cluster, self.tags)
+ advance_mock.assert_called_with(dict())
+ self.s.sync_replication_slots(self.cluster, self.tags)
+ advance_mock.assert_called_with({'a': {'ls': 700}})
+ self.s.sync_replication_slots(self.cluster, self.tags)
+ advance_mock.assert_called_with({'a': {'ls': 800}})
+
self.cluster.status.slots['ls'] = 'a'
self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), [])
+
self.cluster.config.data['slots']['ls']['database'] = 'b'
self.cluster.status.slots['ls'] = '500'
with patch.object(MockCursor, 'rowcount', PropertyMock(return_value=1), create=True):
diff -Nru patroni-4.0.5/tests/test_sync.py patroni-4.0.6/tests/test_sync.py
--- patroni-4.0.5/tests/test_sync.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/tests/test_sync.py 2025-06-06 19:27:48.000000000 +0200
@@ -40,7 +40,8 @@
# sync node is a bit behind of async, but we prefer it anyway
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=[self.leadermem.name,
'on', pg_stat_replication]):
- self.assertEqual(self.s.current_state(self.cluster), ('priority', 1, 1,
+ self.assertEqual(self.s.current_state(self.cluster), ('priority', 1,
+ CaseInsensitiveSet([self.leadermem.name]),
CaseInsensitiveSet([self.leadermem.name]),
CaseInsensitiveSet([self.leadermem.name])))
@@ -49,7 +50,7 @@
for r in pg_stat_replication:
r['write_lsn'] = r.pop('flush_lsn')
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=['', 'remote_write', pg_stat_replication]):
- self.assertEqual(self.s.current_state(self.cluster), ('off', 0, 0, CaseInsensitiveSet(),
+ self.assertEqual(self.s.current_state(self.cluster), ('off', 0, CaseInsensitiveSet(), CaseInsensitiveSet(),
CaseInsensitiveSet([self.leadermem.name])))
# when there are no sync or potential candidates we pick async with the minimal replication lag
@@ -57,20 +58,20 @@
r.update(replay_lsn=3 - i, application_name=r['application_name'].upper())
missing = pg_stat_replication.pop(0)
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=['', 'remote_apply', pg_stat_replication]):
- self.assertEqual(self.s.current_state(self.cluster), ('off', 0, 0, CaseInsensitiveSet(),
+ self.assertEqual(self.s.current_state(self.cluster), ('off', 0, CaseInsensitiveSet(), CaseInsensitiveSet(),
CaseInsensitiveSet([self.me.name])))
# unknown sync node is ignored
missing.update(application_name='missing', sync_state='sync')
pg_stat_replication.insert(0, missing)
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=['', 'remote_apply', pg_stat_replication]):
- self.assertEqual(self.s.current_state(self.cluster), ('off', 0, 0, CaseInsensitiveSet(),
+ self.assertEqual(self.s.current_state(self.cluster), ('off', 0, CaseInsensitiveSet(), CaseInsensitiveSet(),
CaseInsensitiveSet([self.me.name])))
# invalid synchronous_standby_names and empty pg_stat_replication
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=['a b', 'remote_apply', None]):
self.p._major_version = 90400
- self.assertEqual(self.s.current_state(self.cluster), ('off', 0, 0, CaseInsensitiveSet(),
+ self.assertEqual(self.s.current_state(self.cluster), ('off', 0, CaseInsensitiveSet(), CaseInsensitiveSet(),
CaseInsensitiveSet()))
@patch.object(Postgresql, 'last_operation', Mock(return_value=1))
@@ -80,16 +81,34 @@
pg_stat_replication = [
{'pid': 100, 'application_name': self.leadermem.name, 'sync_state': 'quorum', 'flush_lsn': 1},
- {'pid': 101, 'application_name': self.other.name, 'sync_state': 'quorum', 'flush_lsn': 2}]
+ {'pid': 101, 'application_name': self.me.name, 'sync_state': 'quorum', 'flush_lsn': 2}]
# sync node is a bit behind of async, but we prefer it anyway
with patch.object(Postgresql, "_cluster_info_state_get",
- side_effect=['ANY 1 ({0},"{1}")'.format(self.leadermem.name, self.other.name),
+ side_effect=['ANY 1 ({0},"{1}")'.format(self.leadermem.name, self.me.name),
+ 'on', pg_stat_replication]):
+ self.assertEqual(self.s.current_state(self.cluster),
+ ('quorum', 1, CaseInsensitiveSet([self.me.name, self.leadermem.name]),
+ CaseInsensitiveSet([self.me.name, self.leadermem.name]),
+ CaseInsensitiveSet([self.leadermem.name, self.me.name])))
+
+ @patch.object(Postgresql, 'last_operation', Mock(return_value=1))
+ def test_current_state_cascading(self):
+ pg_stat_replication = [
+ {'pid': 100, 'application_name': self.me.name, 'sync_state': 'async', 'flush_lsn': 1},
+ {'pid': 101, 'application_name': self.other.name, 'sync_state': 'sync', 'flush_lsn': 2}]
+
+ # nodes that are supposed to replicate from other standby nodes are not
+ # returned if at least one standby in a chain is streaming from primary
+ self.leadermem.data['tags'] = {'replicatefrom': self.me.name}
+ with patch.object(Postgresql, "_cluster_info_state_get",
+ side_effect=['2 ({0},"{1}")'.format(self.leadermem.name, self.other.name),
'on', pg_stat_replication]):
self.assertEqual(self.s.current_state(self.cluster),
- ('quorum', 1, 2, CaseInsensitiveSet([self.other.name, self.leadermem.name]),
- CaseInsensitiveSet([self.leadermem.name, self.other.name])))
+ ('priority', 2, CaseInsensitiveSet([self.other.name, self.leadermem.name]),
+ CaseInsensitiveSet(), CaseInsensitiveSet([self.me.name])))
+ @patch('time.sleep', Mock())
def test_set_sync_standby(self):
def value_in_conf():
with open(os.path.join(self.p.data_dir, 'postgresql.conf')) as f:
@@ -181,5 +200,5 @@
# the pg_stat_replication. We need to check that primary is not selected as the synchronous node.
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=[self.leadermem.name,
'on', pg_stat_replication]):
- self.assertEqual(self.s.current_state(cluster), ('priority', 1, 0, CaseInsensitiveSet(),
- CaseInsensitiveSet([self.me.name])))
+ self.assertEqual(self.s.current_state(cluster), ('priority', 1, CaseInsensitiveSet([self.leadermem.name]),
+ CaseInsensitiveSet(), CaseInsensitiveSet([self.me.name])))
diff -Nru patroni-4.0.5/tests/test_validator.py patroni-4.0.6/tests/test_validator.py
--- patroni-4.0.5/tests/test_validator.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/tests/test_validator.py 2025-06-06 19:27:48.000000000 +0200
@@ -228,11 +228,10 @@
c["kubernetes"]["pod_ip"] = "::1"
c["consul"]["host"] = "127.0.0.1:50000"
c["etcd"]["host"] = "127.0.0.1:237"
- c["postgresql"]["listen"] = "127.0.0.1:5432"
with patch('patroni.validator.open', mock_open(read_data='9')):
errors = schema(c)
output = "\n".join(errors)
- self.assertEqual(['consul.host', 'etcd.host', 'postgresql.bin_dir', 'postgresql.data_dir', 'postgresql.listen',
+ self.assertEqual(['consul.host', 'etcd.host', 'postgresql.bin_dir', 'postgresql.data_dir',
'raft.bind_addr', 'raft.self_addr', 'restapi.connect_address'], parse_output(output))
def test_bin_dir_is_empty_string_executables_in_path(self, mock_out, mock_err):
Reply to: