patroni.postgresql.slots module

Replication slot handling.

Provides classes for the creation, monitoring, management and synchronisation of PostgreSQL replication slots.

class _patroni.postgresql.slots.SlotsAdvanceThread(_slots_handler: SlotsHandler) View on GitHub

Bases: Thread + Daemon process :class:`Thread` object for advancing logical replication slots on replicas. + This ensures that slot advancing queries sent to postgres do not block the main loop. +

__init\\__(slots_handler: SlotsHandler) → None View on GitHub

Create and start a new thread for handling slot advance queries. +

Parameters

slots_handler – The calling class instance for reference to slot information attributes. +

on_promote() → None View on GitHub

Reset state of the daemon. +

run() → None View on GitHub

Thread main loop entrypoint. + Note

  Thread will wait until a sync is scheduled from outside, normally triggered during the HA loop or a wakeup call.
+
schedule(advance_slots: Dict[ str, Dict[ str, int]]) → Tuple[ bool, List[ str]] View on GitHub

Trigger a synchronisation of slots. + This is the main entrypoint for Patroni HA loop wakeup call. +

Parameters

advance_slots – dictionary containing slots that need to be advanced

Returns

tuple of failure status and a list of slots to be copied +

sync_slot(cur: cursor | Cursor[Any], database: str, slot: str, lsn: int) → None View on GitHub

Execute a pg_replication_slot_advance query and store success for scheduled synchronisation task. +

Parameters

+

sync_slots() → None View on GitHub

Synchronise slots for all scheduled databases. +

sync_slots_in_database(database: str, slots: List[ str]) → None View on GitHub

Synchronise slots for a single database. +

Parameters
class _patroni.postgresql.slots.SlotsHandler(_postgresql: Postgresql) View on GitHub

Bases: object + Handler for managing and storing information on replication slots in PostgreSQL. +

Variables:

+

__init\\__(postgresql: Postgresql) → None View on GitHub

Create an instance with storage attributes for replication slots and schedule the first synchronisation. +

Parameters

postgresql – Calling class instance providing interface to PostgreSQL. +

static \\__copy_items(_src: Dict[ str, Any], dst: Dict[ str, Any], keys: Collection[ str] | None = None) → None View on GitHub

Select values from src dictionary to update in dst dictionary for optional supplied keys. +

Parameters

+

drop_incorrect_slots(_cluster: Cluster, slots: Dict[ str, Any]) → None View on GitHub

Compare required slots and configured as permanent slots with those found, dropping extraneous ones. + Note

Slots that are not contained in _slots_ will be dropped. Slots can be filtered out with `+ignore_slots+` configuration.
  Slots that have matching names but do not match attributes in _slots_ will also be dropped.
  +
  Parameters:::
+
ensure_logical_slots_primary(_slots: Dict[ str, Any]) → None View on GitHub

Create any missing logical replication slots on the primary. + If the logical slot already exists, copy state information into the replication slots structure stored in the class instance. +

Parameters

slots – Slots that should exist are supplied in a dictionary, mapping slot name to any attributes. The method will only consider slots that have a value that is a dictionary with a key type with a value that is logical. +

ensure_logical_slots_replica(_slots: Dict[ str, Any]) → List[ str] View on GitHub

Update logical slots on replicas. + If the logical slot already exists, copy state information into the replication slots structure stored in the class instance. Slots that exist are also advanced if their confirmed_flush_lsn is greater than the stored state of the slot. + As logical slots can only be created when the primary is available, pass the list of slots that need to be copied back to the caller. They will be created on replicas with SlotsHandler.copy_logical_slots(). +

Parameters

slots – A dictionary mapping slot name to slot attributes. This method only considers a slot if the value is a dictionary with the key type and a value of logical.

Returns

list of slots to be copied from the primary. +

ensure_physical_slots(_slots: Dict[ str, Any]) → None View on GitHub

Create or advance physical replication slots. + Any failures are logged and do not interrupt creation of all slots. +

Parameters

slots – A dictionary mapping slot name to slot attributes. This method only considers a slot if the value is a dictionary with the key type and a value of physical. +

get_leader_connection_cursor(_leader: Leader) → Iterator[cursor | Cursor[Any]] View on GitHub

Create a new database connection to the leader. + Note

  Uses rewind user credentials because it has enough permissions to read files from PGDATA. Sets the options `+connect_timeout+` to `+3+` and `+statement_timeout+` to `+2000+`.
  +
  Parameters:::
    *leader* – object with information on the leader
  Yields:::
    connection cursor object, note implementation varies depending on version of `+psycopg+`.
+
query(_sql: str, *params: Any) → List[ Tuple[ Any, …​]] View on GitHub

Helper method for Postgresql.query(). +

Parameters
Returns

query response. +

ready_logical_slots(_primary_physical_catalog_xmin: int | None = None) → None View on GitHub

Ready logical slots by comparing primary physical slot catalog_xmin to logical catalog_xmin. + The logical slot on a replica is safe to use when the physical replica slot on the primary: + __\\__

  1. has a nonzero/non-null catalog_xmin represented by primary_physical_xmin.

  2. has a catalog_xmin that is not newer (greater) than the catalog_xmin of any slot on the standby

  3. overtook the catalog_xmin of remembered values of logical slots on the primary. __\\__ +

    Parameters

    primary_physical_catalog_xmin – is the value retrieved from pg_catalog.pg_get_replication_slots() for the physical replication slot on the primary. +

update_pending_logical_slot_primary(_slots: Dict[ str, Any], catalog_xmin: int | None = None) → bool View on GitHub

Store pending logical slot information for catalog_xmin on the primary. + Remember catalog_xmin of logical slots on the primary when catalog_xmin of the physical slot became valid. Logical slots on replica will be safe to use after promote when catalog_xmin of the physical slot overtakes these values. +

Parameters
Returns

False if any issue was faced while processing, True otherwise. +

check_logical_slots_readiness(cluster: Cluster, tags: Tags) → bool View on GitHub

Determine whether all known logical slots are synchronised from the leader. +

  1. Retrieve the current catalog_xmin value for the physical slot from the cluster leader, and

  2. using previously stored list of “unready” logical slots, those which have yet to be checked hence have no stored slot attributes,

  3. store logical slot catalog_xmin when the physical slot catalog_xmin becomes valid. +

    Parameters
    Returns

    False if any issue while checking logical slots readiness, True otherwise. +

copy_logical_slots(cluster: Cluster, tags: Tags, create_slots: List[ str]) → None View on GitHub

Create logical replication slots on standby nodes. +

Parameters

+

drop_replication_slot(name: str) → Tuple[ bool, bool] View on GitHub

Drop a named slot from Postgres. +

Parameters

name – name of the slot to be dropped.

Returns

a tuple of active and dropped. active is True if the slot is active, dropped is True if the slot was successfully dropped. If the slot was not found return False for both. +

get_local_connection_cursor(**kwargs: Any) → Iterator[cursor | Cursor[Any]] View on GitHub

Create a new database connection to local server. + Create a non-blocking connection cursor to avoid the situation where an execution of the query of pg_replication_slot_advance takes longer than the timeout on a HA loop, which could cause a false failure state. +

Parameters

kwargs – Any keyword arguments to pass to psycopg.connect().

Yields

connection cursor object, note implementation varies depending on version of psycopg. +

ignore_replication_slot(cluster: Cluster, name: str) → bool View on GitHub

Check if slot name should not be managed by Patroni. +

Parameters
Returns

True if slot name matches any slot specified in ignore_slots configuration, otherwise will pass through and return result of AbstractMPPHandler.ignore_replication_slot(). +

load_replication_slots() → None View on GitHub

Query replication slot information from the database and store it for processing by other tasks. + Note

  Only supported from PostgreSQL version 9.4 onwards.
  +
  Store replication slot `+name+`, `+type+`, `+plugin+`, `+database+` and `+datoid+`. If PostgreSQL version is 10 or newer also store `+catalog_xmin+` and `+confirmed_flush_lsn+`.
  +
  When using logical slots, store information separately for slot synchronisation on replica nodes.
+
on_promote() → None View on GitHub

Entry point from HA cycle used when a standby node is to be promoted to primary. + Note

  If logical replication slot synchronisation is enabled then slot advancement will be triggered. If any logical slots that were copied are yet to be confirmed as ready a warning message will be logged.
+
process_permanent_slots(slots: List[ Dict[ str, Any]]) → Dict[ str, int] View on GitHub

Process replication slot information from the host and prepare information used in subsequent cluster tasks. + Note

\\__\\__
This methods solves three problems.
The `+cluster_info_query+` from :class:`+Postgresql+` is executed every HA loop and returns information about all replication slots that exists on the current host.
Based on this information perform the following actions:
  1. For the primary we want to expose to DCS permanent logical slots, therefore build (and return) a dict that maps permanent logical slot names to confirmed_flush_lsn.

  2. detect if one of the previously known permanent slots is missing and schedule resync.

  3. Update the local cache with the fresh catalog_xmin and confirmed_flush_lsn for every known slot. __\\__

      This info is used when performing the check of logical slot readiness on standbys.
      +
      Parameters:::
        *slots* – replication slot information that exists on the current host.
      Returns:::
        dictionary of logical slot names to `+confirmed_flush_lsn+`.
    +
schedule(value: bool | None = None) → None View on GitHub

Schedule the loading of slot information from the database. +

Parameters

value – the optional value can be used to unschedule if set to False or force it to be True. If it is omitted the value will be True if this PostgreSQL node supports slot replication. +

schedule_advance_slots(slots: Dict[ str, Dict[ str, int]]) → Tuple[ bool, List[ str]] View on GitHub

Wrapper to ensure slots advance daemon thread is started if not already. +

Parameters

slots – dictionary containing slot information.

Returns

tuple with the result of the scheduling of slot advancement: failed and list of slots to copy. +

sync_replication_slots(cluster: Cluster, tags: Tags) → List[ str] View on GitHub

During the HA loop read, check and alter replication slots found in the cluster. + Read physical and logical slots from pg_replication_slots, then compare to those configured in the DCS. Drop any slots that do not match those required by configuration and are not configured as permanent. Create any missing physical slots, or advance their position according to feedback stored in DCS. If we are the primary then create logical slots, otherwise if logical slots are known and active create them on replica nodes by copying slot files from the primary. +

Parameters
Returns

list of logical replication slots names that should be copied from the primary.

patroni.postgresql.slots.compare_slots(s1: Dict[ str, Any], s2: Dict[ str, Any], dbid: str = 'database') → bool View on GitHub

Compare 2 replication slot objects for equality. +

..note ::

If the first argument is a physical replication slot then only the type of the second slot is compared. If the first argument is another type (e.g. logical) then dbid and plugin are compared. +

Parameters:
Returns:

True if the slot type of s1 and s2 is matches, and the type of s1 is physical, OR the types match AND the dbid and plugin attributes are equal.


© Copyright 2015 Compose, Zalando SE. Revision 9d231aee.

Built with Sphinx using a theme provided by Read the Docs.

Read the Docs v: latest

+ Builds