patroni.postgresql.slots module
Replication slot handling.
Provides classes for the creation, monitoring, management and synchronisation of PostgreSQL replication slots.
- class _patroni.postgresql.slots.SlotsAdvanceThread(_slots_handler: SlotsHandler) View on GitHub
-
Bases:
Thread
+ Daemon process :class:`Thread` object for advancing logical replication slots on replicas. + This ensures that slot advancing queries sent to postgres do not block the main loop. +- __init\\__(slots_handler: SlotsHandler) → None View on GitHub
-
Create and start a new thread for handling slot advance queries. +
- Parameters
-
slots_handler – The calling class instance for reference to slot information attributes. +
- on_promote() → None View on GitHub
-
Reset state of the daemon. +
- run() → None View on GitHub
-
Thread main loop entrypoint. + Note
Thread will wait until a sync is scheduled from outside, normally triggered during the HA loop or a wakeup call. +
- schedule(advance_slots: Dict[ str, Dict[ str, int]]) → Tuple[ bool, List[ str]] View on GitHub
-
Trigger a synchronisation of slots. + This is the main entrypoint for Patroni HA loop wakeup call. +
- Parameters
-
advance_slots – dictionary containing slots that need to be advanced
- Returns
-
tuple of failure status and a list of slots to be copied +
- sync_slot(cur: cursor | Cursor[Any], database: str, slot: str, lsn: int) → None View on GitHub
-
Execute a
pg_replication_slot_advance
query and store success for scheduled synchronisation task. +- Parameters
-
+
- sync_slots() → None View on GitHub
-
Synchronise slots for all scheduled databases. +
- sync_slots_in_database(database: str, slots: List[ str]) → None View on GitHub
-
Synchronise slots for a single database. +
- Parameters
- class _patroni.postgresql.slots.SlotsHandler(_postgresql: Postgresql) View on GitHub
-
Bases:
object
+ Handler for managing and storing information on replication slots in PostgreSQL. +- Variables:
-
+
- __init\\__(postgresql: Postgresql) → None View on GitHub
-
Create an instance with storage attributes for replication slots and schedule the first synchronisation. +
- Parameters
-
postgresql – Calling class instance providing interface to PostgreSQL. +
- static \\__copy_items(_src: Dict[ str, Any], dst: Dict[ str, Any], keys: Collection[ str] | None = None) → None View on GitHub
-
Select values from src dictionary to update in dst dictionary for optional supplied keys. +
- Parameters
-
+
- drop_incorrect_slots(_cluster: Cluster, slots: Dict[ str, Any]) → None View on GitHub
-
Compare required slots and configured as permanent slots with those found, dropping extraneous ones. + Note
Slots that are not contained in _slots_ will be dropped. Slots can be filtered out with `+ignore_slots+` configuration.
Slots that have matching names but do not match attributes in _slots_ will also be dropped. + Parameters::: +
- ensure_logical_slots_primary(_slots: Dict[ str, Any]) → None View on GitHub
-
Create any missing logical replication slots on the primary. + If the logical slot already exists, copy state information into the replication slots structure stored in the class instance. +
- Parameters
-
slots – Slots that should exist are supplied in a dictionary, mapping slot name to any attributes. The method will only consider slots that have a value that is a dictionary with a key
type
with a value that islogical
. +
- ensure_logical_slots_replica(_slots: Dict[ str, Any]) → List[ str] View on GitHub
-
Update logical slots on replicas. + If the logical slot already exists, copy state information into the replication slots structure stored in the class instance. Slots that exist are also advanced if their
confirmed_flush_lsn
is greater than the stored state of the slot. + As logical slots can only be created when the primary is available, pass the list of slots that need to be copied back to the caller. They will be created on replicas withSlotsHandler.copy_logical_slots()
. +- Parameters
-
slots – A dictionary mapping slot name to slot attributes. This method only considers a slot if the value is a dictionary with the key
type
and a value oflogical
. - Returns
-
list of slots to be copied from the primary. +
- ensure_physical_slots(_slots: Dict[ str, Any]) → None View on GitHub
-
Create or advance physical replication slots. + Any failures are logged and do not interrupt creation of all slots. +
- Parameters
-
slots – A dictionary mapping slot name to slot attributes. This method only considers a slot if the value is a dictionary with the key
type
and a value ofphysical
. +
- get_leader_connection_cursor(_leader: Leader) → Iterator[cursor | Cursor[Any]] View on GitHub
-
Create a new database connection to the leader. + Note
Uses rewind user credentials because it has enough permissions to read files from PGDATA. Sets the options `+connect_timeout+` to `+3+` and `+statement_timeout+` to `+2000+`. + Parameters::: *leader* – object with information on the leader Yields::: connection cursor object, note implementation varies depending on version of `+psycopg+`. +
- query(_sql: str, *params: Any) → List[ Tuple[ Any, …]] View on GitHub
-
Helper method for
Postgresql.query()
. +- Parameters
- Returns
-
query response. +
- ready_logical_slots(_primary_physical_catalog_xmin: int | None = None) → None View on GitHub
-
Ready logical slots by comparing primary physical slot
catalog_xmin
to logicalcatalog_xmin
. + The logical slot on a replica is safe to use when the physical replica slot on the primary: + __\\__-
has a nonzero/non-null
catalog_xmin
represented byprimary_physical_xmin
. -
has a
catalog_xmin
that is not newer (greater) than thecatalog_xmin
of any slot on the standby -
overtook the
catalog_xmin
of remembered values of logical slots on the primary. __\\__ +- Parameters
-
primary_physical_catalog_xmin – is the value retrieved from
pg_catalog.pg_get_replication_slots()
for the physical replication slot on the primary. +
-
- update_pending_logical_slot_primary(_slots: Dict[ str, Any], catalog_xmin: int | None = None) → bool View on GitHub
-
Store pending logical slot information for
catalog_xmin
on the primary. + Remembercatalog_xmin
of logical slots on the primary whencatalog_xmin
of the physical slot became valid. Logical slots on replica will be safe to use after promote whencatalog_xmin
of the physical slot overtakes these values. +- Parameters
- Returns
-
False
if any issue was faced while processing,True
otherwise. +
- check_logical_slots_readiness(cluster: Cluster, tags: Tags) → bool View on GitHub
-
Determine whether all known logical slots are synchronised from the leader. +
-
Retrieve the current
catalog_xmin
value for the physical slot from the cluster leader, and -
using previously stored list of “unready” logical slots, those which have yet to be checked hence have no stored slot attributes,
-
store logical slot
catalog_xmin
when the physical slotcatalog_xmin
becomes valid. +- Parameters
- Returns
-
False
if any issue while checking logical slots readiness,True
otherwise. +
-
- copy_logical_slots(cluster: Cluster, tags: Tags, create_slots: List[ str]) → None View on GitHub
-
Create logical replication slots on standby nodes. +
- Parameters
-
+
- drop_replication_slot(name: str) → Tuple[ bool, bool] View on GitHub
-
Drop a named slot from Postgres. +
- Parameters
-
name – name of the slot to be dropped.
- Returns
-
a tuple of
active
anddropped
.active
isTrue
if the slot is active,dropped
isTrue
if the slot was successfully dropped. If the slot was not found returnFalse
for both. +
- get_local_connection_cursor(**kwargs: Any) → Iterator[cursor | Cursor[Any]] View on GitHub
-
Create a new database connection to local server. + Create a non-blocking connection cursor to avoid the situation where an execution of the query of
pg_replication_slot_advance
takes longer than the timeout on a HA loop, which could cause a false failure state. +- Parameters
-
kwargs – Any keyword arguments to pass to
psycopg.connect()
. - Yields
-
connection cursor object, note implementation varies depending on version of
psycopg
. +
- ignore_replication_slot(cluster: Cluster, name: str) → bool View on GitHub
-
Check if slot name should not be managed by Patroni. +
- Parameters
- Returns
-
True
if slot name matches any slot specified inignore_slots
configuration, otherwise will pass through and return result ofAbstractMPPHandler.ignore_replication_slot()
. +
- load_replication_slots() → None View on GitHub
-
Query replication slot information from the database and store it for processing by other tasks. + Note
Only supported from PostgreSQL version 9.4 onwards. + Store replication slot `+name+`, `+type+`, `+plugin+`, `+database+` and `+datoid+`. If PostgreSQL version is 10 or newer also store `+catalog_xmin+` and `+confirmed_flush_lsn+`. + When using logical slots, store information separately for slot synchronisation on replica nodes. +
- on_promote() → None View on GitHub
-
Entry point from HA cycle used when a standby node is to be promoted to primary. + Note
If logical replication slot synchronisation is enabled then slot advancement will be triggered. If any logical slots that were copied are yet to be confirmed as ready a warning message will be logged. +
- process_permanent_slots(slots: List[ Dict[ str, Any]]) → Dict[ str, int] View on GitHub
-
Process replication slot information from the host and prepare information used in subsequent cluster tasks. + Note
\\__\\__ This methods solves three problems.
The `+cluster_info_query+` from :class:`+Postgresql+` is executed every HA loop and returns information about all replication slots that exists on the current host.
Based on this information perform the following actions:
-
For the primary we want to expose to DCS permanent logical slots, therefore build (and return) a dict that maps permanent logical slot names to
confirmed_flush_lsn
. -
detect if one of the previously known permanent slots is missing and schedule resync.
-
Update the local cache with the fresh
catalog_xmin
andconfirmed_flush_lsn
for every known slot. __\\__This info is used when performing the check of logical slot readiness on standbys. + Parameters::: *slots* – replication slot information that exists on the current host. Returns::: dictionary of logical slot names to `+confirmed_flush_lsn+`. +
-
- schedule(value: bool | None = None) → None View on GitHub
-
Schedule the loading of slot information from the database. +
- Parameters
-
value – the optional value can be used to unschedule if set to
False
or force it to beTrue
. If it is omitted the value will beTrue
if this PostgreSQL node supports slot replication. +
- schedule_advance_slots(slots: Dict[ str, Dict[ str, int]]) → Tuple[ bool, List[ str]] View on GitHub
-
Wrapper to ensure slots advance daemon thread is started if not already. +
- Parameters
-
slots – dictionary containing slot information.
- Returns
-
tuple with the result of the scheduling of slot advancement:
failed
and list of slots to copy. +
- sync_replication_slots(cluster: Cluster, tags: Tags) → List[ str] View on GitHub
-
During the HA loop read, check and alter replication slots found in the cluster. + Read physical and logical slots from
pg_replication_slots
, then compare to those configured in the DCS. Drop any slots that do not match those required by configuration and are not configured as permanent. Create any missing physical slots, or advance their position according to feedback stored in DCS. If we are the primary then create logical slots, otherwise if logical slots are known and active create them on replica nodes by copying slot files from the primary. +- Parameters
- Returns
-
list of logical replication slots names that should be copied from the primary.
- patroni.postgresql.slots.compare_slots(s1: Dict[ str, Any], s2: Dict[ str, Any], dbid: str = 'database') → bool View on GitHub
-
Compare 2 replication slot objects for equality. +
- ..note ::
-
If the first argument is a
physical
replication slot then only the type of the second slot is compared. If the first argument is anothertype
(e.g.logical
) then dbid andplugin
are compared. + - Parameters:
- Returns:
-
True
if the slottype
of s1 and s2 is matches, and thetype
of s1 isphysical
, OR thetypes
match AND the dbid andplugin
attributes are equal.
© Copyright 2015 Compose, Zalando SE. Revision 9d231aee
.
Built with Sphinx using a theme provided by Read the Docs.
Read the Docs v: latest
+ Builds