[prev in list] [next in list] [prev in thread] [next in thread] 

List:       pgsql-hackers
Subject:    Re: Introduce XID age and inactive timeout based replication slot invalidation
From:       Bharath Rupireddy <bharath.rupireddyforpostgres () gmail ! com>
Date:       2024-01-31 13:17:00
Message-ID: CALj2ACWw9ohERck7Vm0oRvbGngPyp4ux2TKDwj4H-X7jN840JA () mail ! gmail ! com
[Download RAW message or body]

On Sat, Jan 27, 2024 at 1:18 AM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:
>
> On Thu, Jan 11, 2024 at 10:48 AM Bharath Rupireddy
> <bharath.rupireddyforpostgres@gmail.com> wrote:
> >
> > Hi,
> >
> > Replication slots in postgres will prevent removal of required
> > resources when there is no connection using them (inactive). This
> > consumes storage because neither required WAL nor required rows from
> > the user tables/system catalogs can be removed by VACUUM as long as
> > they are required by a replication slot. In extreme cases this could
> > cause the transaction ID wraparound.
> >
> > Currently postgres has the ability to invalidate inactive replication
> > slots based on the amount of WAL (set via max_slot_wal_keep_size GUC)
> > that will be needed for the slots in case they become active. However,
> > the wraparound issue isn't effectively covered by
> > max_slot_wal_keep_size - one can't tell postgres to invalidate a
> > replication slot if it is blocking VACUUM. Also, it is often tricky to
> > choose a default value for max_slot_wal_keep_size, because the amount
> > of WAL that gets generated and allocated storage for the database can
> > vary.
> >
> > Therefore, it is often easy for developers to do the following:
> > a) set an XID age (age of slot's xmin or catalog_xmin) of say 1 or 1.5
> > billion, after which the slots get invalidated.
> > b) set a timeout of say 1 or 2 or 3 days, after which the inactive
> > slots get invalidated.
> >
> > To implement (a), postgres needs a new GUC called max_slot_xid_age.
> > The checkpointer then invalidates all the slots whose xmin (the oldest
> > transaction that this slot needs the database to retain) or
> > catalog_xmin (the oldest transaction affecting the system catalogs
> > that this slot needs the database to retain) has reached the age
> > specified by this setting.
> >
> > To implement (b), first postgres needs to track the replication slot
> > metrics like the time at which the slot became inactive (inactive_at
> > timestamptz) and the total number of times the slot became inactive in
> > its lifetime (inactive_count numeric) in ReplicationSlotPersistentData
> > structure. And, then it needs a new timeout GUC called
> > inactive_replication_slot_timeout. Whenever a slot becomes inactive,
> > the current timestamp and inactive count are stored in
> > ReplicationSlotPersistentData structure and persisted to disk. The
> > checkpointer then invalidates all the slots that are lying inactive
> > for about inactive_replication_slot_timeout duration starting from
> > inactive_at.
> >
> > In addition to implementing (b), these two new metrics enable
> > developers to improve their monitoring tools as the metrics are
> > exposed via pg_replication_slots system view. For instance, one can
> > build a monitoring tool that signals when replication slots are lying
> > inactive for a day or so using inactive_at metric, and/or when a
> > replication slot is becoming inactive too frequently using inactive_at
> > metric.
> >
> > I'm attaching the v1 patch set as described below:
> > 0001 - Tracks invalidation_reason in pg_replication_slots. This is
> > needed because slots now have multiple reasons for slot invalidation.
> > 0002 - Tracks inactive replication slot information inactive_at and
> > inactive_timeout.
> > 0003 - Adds inactive_timeout based replication slot invalidation.
> > 0004 - Adds XID based replication slot invalidation.
> >
> > Thoughts?
>
> Needed a rebase due to c393308b. Please find the attached v2 patch set.

Needed a rebase due to commit 776621a (conflict in
src/test/recovery/meson.build for new TAP test file added). Please
find the attached v3 patch set.

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com

["v3-0004-Add-XID-based-replication-slot-invalidation.patch" (application/x-patch)]

From ecfb669fa1f4356d75ef9a8ef0560de804cdaf56 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 31 Jan 2024 12:16:33 +0000
Subject: [PATCH v3 4/4] Add XID based replication slot invalidation

Currently postgres has the ability to invalidate inactive
replication slots based on the amount of WAL (set via
max_slot_wal_keep_size GUC) that will be needed for the slots in
case they become active. However, choosing a default value for
max_slot_wal_keep_size is tricky. Because the amount of WAL a
customer generates, and their allocated storage will vary greatly
in production, making it difficult to pin down a one-size-fits-all
value. It is often easy for developers to set an XID age (age of
slot's xmin or catalog_xmin) of say 1 or 1.5 billion, after which
the slots get invalidated.

To achieve the above, postgres uses replication slot xmin (the
oldest transaction that this slot needs the database to retain) or
catalog_xmin (the oldest transaction affecting the system catalogs
that this slot needs the database to retain), and a new GUC
max_slot_xid_age. The checkpointer then looks at all replication
slots invalidating the slots based on the age set.
---
 doc/src/sgml/config.sgml                      | 21 +++++
 src/backend/access/transam/xlog.c             | 10 +++
 src/backend/replication/slot.c                | 41 ++++++++++
 src/backend/replication/slotfuncs.c           |  3 +
 src/backend/utils/misc/guc_tables.c           | 10 +++
 src/backend/utils/misc/postgresql.conf.sample |  1 +
 src/include/replication/slot.h                |  3 +
 src/test/recovery/t/050_invalidate_slots.pl   | 81 +++++++++++++++++++
 8 files changed, 170 insertions(+)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 099b3fc5cc..0204b1c86a 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4423,6 +4423,27 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # \
Windows  </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-max-slot-xid-age" xreflabel="max_slot_xid_age">
+      <term><varname>max_slot_xid_age</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>max_slot_xid_age</varname> configuration \
parameter</primary> +      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Invalidate replication slots whose <literal>xmin</literal> (the oldest
+        transaction that this slot needs the database to retain) or
+        <literal>catalog_xmin</literal> (the oldest transaction affecting the
+        system catalogs that this slot needs the database to retain) has reached
+        the age specified by this setting. A value of zero (which is default)
+        disables this feature. Users can set this value anywhere from zero to
+        two billion. This parameter can only be set in the
+        <filename>postgresql.conf</filename> file or on the server command
+        line.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-track-commit-timestamp" \
                xreflabel="track_commit_timestamp">
       <term><varname>track_commit_timestamp</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f7ce2cbbb4..a69099247a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7056,6 +7056,11 @@ CreateCheckPoint(int flags)
 		InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT, 0,
 										   InvalidOid, InvalidTransactionId);
 
+	/* Invalidate replication slots based on xmin or catalog_xmin age */
+	if (max_slot_xid_age > 0)
+		InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0,
+										   InvalidOid, InvalidTransactionId);
+
 	/*
 	 * Delete old log files, those no longer needed for last checkpoint to
 	 * prevent the disk holding the xlog from growing full.
@@ -7505,6 +7510,11 @@ CreateRestartPoint(int flags)
 		InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT, 0,
 										   InvalidOid, InvalidTransactionId);
 
+	/* Invalidate replication slots based on xmin or catalog_xmin age */
+	if (max_slot_xid_age > 0)
+		InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0,
+										   InvalidOid, InvalidTransactionId);
+
 	/*
 	 * Retreat _logSegNo using the current end of xlog replayed or received,
 	 * whichever is later.
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 57bca68547..644ff6f701 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -101,6 +101,7 @@ ReplicationSlot *MyReplicationSlot = NULL;
 /* GUC variables */
 int			max_replication_slots = 10;
 int			inactive_replication_slot_timeout = 0;
+int			max_slot_xid_age = 0;
 
 static void ReplicationSlotShmemExit(int code, Datum arg);
 static void ReplicationSlotDropAcquired(void);
@@ -1375,6 +1376,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 		case RS_INVAL_INACTIVE_TIMEOUT:
 			appendStringInfoString(&err_detail, _("The slot has been inactive for more than \
the time specified by inactive_replication_slot_timeout."));  break;
+		case RS_INVAL_XID_AGE:
+			appendStringInfoString(&err_detail, _("The replication slot's xmin or \
catalog_xmin reached the age specified by max_slot_xid_age.")); +			break;
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1487,6 +1491,42 @@ \
InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,  conflict = \
cause;  }
 					break;
+				case RS_INVAL_XID_AGE:
+					{
+						TransactionId xid_cur = ReadNextTransactionId();
+						TransactionId xid_limit;
+						TransactionId xid_slot;
+
+						if (TransactionIdIsNormal(s->data.xmin))
+						{
+							xid_slot = s->data.xmin;
+
+							xid_limit = xid_slot + max_slot_xid_age;
+							if (xid_limit < FirstNormalTransactionId)
+								xid_limit += FirstNormalTransactionId;
+
+							if (TransactionIdFollowsOrEquals(xid_cur, xid_limit))
+							{
+								conflict = cause;
+								break;
+							}
+						}
+						if (TransactionIdIsNormal(s->data.catalog_xmin))
+						{
+							xid_slot = s->data.catalog_xmin;
+
+							xid_limit = xid_slot + max_slot_xid_age;
+							if (xid_limit < FirstNormalTransactionId)
+								xid_limit += FirstNormalTransactionId;
+
+							if (TransactionIdFollowsOrEquals(xid_cur, xid_limit))
+							{
+								conflict = cause;
+								break;
+							}
+						}
+					}
+					break;
 				case RS_INVAL_NONE:
 					pg_unreachable();
 			}
@@ -1633,6 +1673,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause \
                cause,
  *   db; dboid may be InvalidOid for shared relations
  * - RS_INVAL_WAL_LEVEL: is logical
  * - RS_INVAL_INACTIVE_TIMEOUT: inactive slot timeout occurs
+ * - RS_INVAL_XID_AGE: slot's xmin or catalog_xmin has reached the age
  *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
diff --git a/src/backend/replication/slotfuncs.c \
b/src/backend/replication/slotfuncs.c index 972c7b2baf..21cd76d708 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -428,6 +428,9 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 			case RS_INVAL_INACTIVE_TIMEOUT:
 				values[i++] = CStringGetTextDatum("inactive_timeout");
 				break;
+			case RS_INVAL_XID_AGE:
+				values[i++] = CStringGetTextDatum("xid_aged");
+				break;
 		}
 
 		values[i++] = BoolGetDatum(slot_contents.data.failover);
diff --git a/src/backend/utils/misc/guc_tables.c \
b/src/backend/utils/misc/guc_tables.c index f08563479b..f2bf3d64d9 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2914,6 +2914,16 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_slot_xid_age", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Age of the transaction ID at which a replication slot gets \
invalidated."), +			gettext_noop("The transaction is the oldest transaction \
(including the one affecting the system catalogs) that a replication slot needs the \
database to retain.") +		},
+		&max_slot_xid_age,
+		0, 0, 2000000000,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"commit_delay", PGC_SUSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
diff --git a/src/backend/utils/misc/postgresql.conf.sample \
b/src/backend/utils/misc/postgresql.conf.sample index 9fc1f2faed..8743426b12 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -326,6 +326,7 @@
 #track_commit_timestamp = off	# collect timestamp of transaction commit
 				# (change requires restart)
 #inactive_replication_slot_timeout = 0	# in seconds; 0 disables
+#max_slot_xid_age = 0
 
 # - Primary Server -
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index b6607aa97b..71bc610ec9 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -52,6 +52,8 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_WAL_LEVEL,
 	/* inactive slot timeout has occurred */
 	RS_INVAL_INACTIVE_TIMEOUT,
+	/* slot's xmin or catalog_xmin has reached the age */
+	RS_INVAL_XID_AGE,
 } ReplicationSlotInvalidationCause;
 
 /*
@@ -225,6 +227,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
 extern PGDLLIMPORT int inactive_replication_slot_timeout;
+extern PGDLLIMPORT int max_slot_xid_age;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
diff --git a/src/test/recovery/t/050_invalidate_slots.pl \
b/src/test/recovery/t/050_invalidate_slots.pl index bf1cd4bbcc..e7da98412c 100644
--- a/src/test/recovery/t/050_invalidate_slots.pl
+++ b/src/test/recovery/t/050_invalidate_slots.pl
@@ -84,4 +84,85 @@ $primary->poll_query_until('postgres', qq[
 		invalidation_reason = 'inactive_timeout';
 ]) or die "Timed out while waiting for inactive replication slot sb1_slot to be \
invalidated";  
+$primary->safe_psql('postgres', qq[
+    SELECT pg_create_physical_replication_slot('sb2_slot');
+]);
+
+$primary->safe_psql('postgres', qq[
+    ALTER SYSTEM SET inactive_replication_slot_timeout TO 0;
+]);
+$primary->reload;
+
+# Create a standby linking to the primary using the replication slot
+my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
+$standby2->init_from_backup($primary, $backup_name,
+	has_streaming => 1);
+
+# Enable hs_feedback. The slot should gain an xmin. We set the status interval
+# so we'll see the results promptly.
+$standby2->append_conf('postgresql.conf', q{
+primary_slot_name = 'sb2_slot'
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+});
+$standby2->start;
+
+# Create some content on primary to move xmin
+$primary->safe_psql('postgres',
+	"CREATE TABLE tab_int AS SELECT generate_series(1,10) AS a");
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby2);
+
+$primary->poll_query_until('postgres', qq[
+	SELECT xmin IS NOT NULL
+		FROM pg_catalog.pg_replication_slots
+		WHERE slot_name = 'sb2_slot';
+]) or die "Timed out waiting for slot xmin to advance";
+
+$primary->safe_psql('postgres', qq[
+	ALTER SYSTEM SET max_slot_xid_age = 500;
+]);
+$primary->reload;
+
+# Stop standby to make the replication slot's xmin on primary to age
+$standby2->stop;
+
+# Do some work to advance xmin
+$primary->safe_psql(
+	'postgres', q{
+do $$
+begin
+  for i in 10000..11000 loop
+    -- use an exception block so that each iteration eats an XID
+    begin
+      insert into tab_int values (i);
+    exception
+      when division_by_zero then null;
+    end;
+  end loop;
+end$$;
+});
+
+$invalidated = 0;
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+	$primary->safe_psql('postgres', "CHECKPOINT");
+	if ($primary->log_contains(
+			'invalidating obsolete replication slot "sb2_slot"', $logstart))
+	{
+		$invalidated = 1;
+		last;
+	}
+	usleep(100_000);
+}
+ok($invalidated, 'check that slot sb2_slot invalidation has been logged');
+
+# Wait for the inactive replication slots to be invalidated.
+$primary->poll_query_until('postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE slot_name = 'sb2_slot' AND
+		invalidation_reason = 'xid_aged';
+]) or die "Timed out while waiting for replication slot sb2_slot to be invalidated";
+
 done_testing();
-- 
2.34.1


["v3-0002-Track-inactive-replication-slot-information.patch" (application/x-patch)]

From cc5ff196a3861a3e4c27b6d5925f2a09530de689 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 31 Jan 2024 12:15:04 +0000
Subject: [PATCH v3 2/4] Track inactive replication slot information

Currently postgres doesn't track metrics like the time at which
the slot became inactive, and the total number of times the slot
became inactive in its lifetime. This commit adds two new metrics
inactive_at of type timestamptz and inactive_count of type numeric
to ReplicationSlotPersistentData. Whenever a slot becomes
inactive, the current timestamp and inactive count are persisted
to disk.

These metrics are useful in the following ways:

- To improve replication slot monitoring tools. For instance, one
can build a monitoring tool that signals a) when replication slots
is lying inactive for a day or so using inactive_at metric,
b) when a replication slot is becoming inactive too frequently
using inactive_at metric.

- To implement timeout-based inactive replication slot management
capability in postgres.

Increases SLOT_VERSION due to the added two new metrics.
---
 doc/src/sgml/system-views.sgml       | 20 +++++++++++
 src/backend/catalog/system_views.sql |  4 ++-
 src/backend/replication/slot.c       | 50 +++++++++++++++++++++++-----
 src/backend/replication/slotfuncs.c  | 15 ++++++++-
 src/include/catalog/pg_proc.dat      |  6 ++--
 src/include/replication/slot.h       |  6 ++++
 src/test/regress/expected/rules.out  |  6 ++--
 7 files changed, 91 insertions(+), 16 deletions(-)

diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index c61312793c..75f99f4ca0 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2566,6 +2566,26 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
        Always false for physical slots.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>inactive_at</structfield> <type>timestamptz</type>
+      </para>
+      <para>
+        The time at which the slot became inactive.
+        <literal>NULL</literal> if the slot is currently actively being
+        used.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>inactive_count</structfield> <type>numeric</type>
+      </para>
+      <para>
+        The total number of times the slot became inactive in its lifetime.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/catalog/system_views.sql \
b/src/backend/catalog/system_views.sql index 9d401003e8..eba97e8494 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1024,7 +1024,9 @@ CREATE VIEW pg_replication_slots AS
             L.safe_wal_size,
             L.two_phase,
             L.invalidation_reason,
-            L.failover
+            L.failover,
+            L.inactive_at,
+            L.inactive_count
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 110cb59783..9662b7f70d 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -90,7 +90,7 @@ typedef struct ReplicationSlotOnDisk
 	sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
 
 #define SLOT_MAGIC		0x1051CA1	/* format identifier */
-#define SLOT_VERSION	4		/* version for new files */
+#define SLOT_VERSION	5		/* version for new files */
 
 /* Control array for replication slot management */
 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
@@ -315,6 +315,8 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->data.two_phase = two_phase;
 	slot->data.two_phase_at = InvalidXLogRecPtr;
 	slot->data.failover = failover;
+	slot->data.inactive_at = 0;
+	slot->data.inactive_count = 0;
 
 	/* and then data only present in shared memory */
 	slot->just_dirtied = false;
@@ -541,6 +543,17 @@ retry:
 
 	if (am_walsender)
 	{
+		if (s->data.persistency == RS_PERSISTENT)
+		{
+			SpinLockAcquire(&s->mutex);
+			s->data.inactive_at = 0;
+			SpinLockRelease(&s->mutex);
+
+			/* Write this slot to disk */
+			ReplicationSlotMarkDirty();
+			ReplicationSlotSave();
+		}
+
 		ereport(log_replication_commands ? LOG : DEBUG1,
 				SlotIsLogical(s)
 				? errmsg("acquired logical replication slot \"%s\"",
@@ -608,16 +621,27 @@ ReplicationSlotRelease(void)
 		ConditionVariableBroadcast(&slot->active_cv);
 	}
 
-	MyReplicationSlot = NULL;
-
-	/* might not have been set when we've been a plain slot */
-	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
-	MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
-	ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
-	LWLockRelease(ProcArrayLock);
-
 	if (am_walsender)
 	{
+		if (slot->data.persistency == RS_PERSISTENT)
+		{
+			SpinLockAcquire(&slot->mutex);
+			slot->data.inactive_at = GetCurrentTimestamp();
+
+			/*
+			 * XXX: Can inactive_count of type uint64 ever overflow? It takes
+			 * about a half-billion years for inactive_count to overflow even
+			 * if slot becomes inactive for every 1 millisecond. So, using
+			 * pg_add_u64_overflow might be an overkill.
+			 */
+			slot->data.inactive_count++;
+			SpinLockRelease(&slot->mutex);
+
+			/* Write this slot to disk */
+			ReplicationSlotMarkDirty();
+			ReplicationSlotSave();
+		}
+
 		ereport(log_replication_commands ? LOG : DEBUG1,
 				is_logical
 				? errmsg("released logical replication slot \"%s\"",
@@ -627,6 +651,14 @@ ReplicationSlotRelease(void)
 
 		pfree(slotname);
 	}
+
+	MyReplicationSlot = NULL;
+
+	/* might not have been set when we've been a plain slot */
+	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+	MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
+	ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
+	LWLockRelease(ProcArrayLock);
 }
 
 /*
diff --git a/src/backend/replication/slotfuncs.c \
b/src/backend/replication/slotfuncs.c index e53aeb37c9..3c53f4ac48 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -237,10 +237,11 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 16
+#define PG_GET_REPLICATION_SLOTS_COLS 18
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	XLogRecPtr	currlsn;
 	int			slotno;
+	char		buf[256];
 
 	/*
 	 * We don't require any special permission to see this function's data
@@ -428,6 +429,18 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
 		values[i++] = BoolGetDatum(slot_contents.data.failover);
 
+		if (slot_contents.data.inactive_at > 0)
+			values[i++] = TimestampTzGetDatum(slot_contents.data.inactive_at);
+		else
+			nulls[i++] = true;
+
+		/* Convert to numeric. */
+		snprintf(buf, sizeof buf, UINT64_FORMAT, slot_contents.data.inactive_count);
+		values[i++] = DirectFunctionCall3(numeric_in,
+										  CStringGetDatum(buf),
+										  ObjectIdGetDatum(0),
+										  Int32GetDatum(-1));
+
 		Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
 
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index de1115baa0..52e9fc4971 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11127,9 +11127,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => \
                '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool}',
                
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin \
,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,invalidation_reason,failover}',
 +  proallargtypes => \
'{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool,timestamptz,numeric}',
 +  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin \
,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,invalidation_reason,failover,inactive_at,inactive_count}',
  prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index da4c776492..380dcc90ca 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -117,6 +117,12 @@ typedef struct ReplicationSlotPersistentData
 	 * for logical slots on the primary server.
 	 */
 	bool		failover;
+
+	/* When did this slot become inactive last time? */
+	TimestampTz inactive_at;
+
+	/* How many times the slot has been inactive? */
+	uint64		inactive_count;
 } ReplicationSlotPersistentData;
 
 /*
diff --git a/src/test/regress/expected/rules.out \
b/src/test/regress/expected/rules.out index 022f9bccb0..4a3cb182e6 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1474,8 +1474,10 @@ pg_replication_slots| SELECT l.slot_name,
     l.safe_wal_size,
     l.two_phase,
     l.invalidation_reason,
-    l.failover
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, \
temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, \
wal_status, safe_wal_size, two_phase, invalidation_reason, failover) +    l.failover,
+    l.inactive_at,
+    l.inactive_count
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, \
temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, \
wal_status, safe_wal_size, two_phase, invalidation_reason, failover, inactive_at, \
inactive_count)  LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.34.1


["v3-0001-Track-invalidation_reason-in-pg_replication_slots.patch" (application/x-patch)]

From 6c4760366ba9867a2baca9cedb3b58ef8924a1fe Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 31 Jan 2024 12:14:47 +0000
Subject: [PATCH v3 1/4] Track invalidation_reason in pg_replication_slots

Currently the reason for replication slot invalidation is not
tracked in pg_replication_slots. A recent commit 007693f2a added
conflict_reason to show the reasons for slot invalidation, but
only for logical slots. This commit renames conflict_reason to
invalidation_reason, and adds the support to show invalidation
reasons for both physical and logical slots.
---
 doc/src/sgml/system-views.sgml                | 11 +++---
 src/backend/catalog/system_views.sql          |  2 +-
 src/backend/replication/slotfuncs.c           | 37 ++++++++-----------
 src/bin/pg_upgrade/info.c                     |  4 +-
 src/include/catalog/pg_proc.dat               |  2 +-
 .../t/035_standby_logical_decoding.pl         | 32 ++++++++--------
 src/test/regress/expected/rules.out           |  4 +-
 7 files changed, 44 insertions(+), 48 deletions(-)

diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index dd468b31ea..c61312793c 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2525,13 +2525,14 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>conflict_reason</structfield> <type>text</type>
+       <structfield>invalidation_reason</structfield> <type>text</type>
       </para>
       <para>
-       The reason for the logical slot's conflict with recovery. It is always
-       NULL for physical slots, as well as for logical slots which are not
-       invalidated. The non-NULL values indicate that the slot is marked
-       as invalidated. Possible values are:
+       The reason for the slot's invalidation. <literal>NULL</literal> if the
+       slot is currently actively being used. The non-NULL values indicate that
+       the slot is marked as invalidated. In case of logical slots, it
+       represents the reason for the logical slot's conflict with recovery.
+       Possible values are:
        <itemizedlist spacing="compact">
         <listitem>
          <para>
diff --git a/src/backend/catalog/system_views.sql \
b/src/backend/catalog/system_views.sql index 6791bff9dd..9d401003e8 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1023,7 +1023,7 @@ CREATE VIEW pg_replication_slots AS
             L.wal_status,
             L.safe_wal_size,
             L.two_phase,
-            L.conflict_reason,
+            L.invalidation_reason,
             L.failover
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
diff --git a/src/backend/replication/slotfuncs.c \
b/src/backend/replication/slotfuncs.c index eb685089b3..e53aeb37c9 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -407,28 +407,23 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
 		values[i++] = BoolGetDatum(slot_contents.data.two_phase);
 
-		if (slot_contents.data.database == InvalidOid)
-			nulls[i++] = true;
-		else
+		switch (slot_contents.data.invalidated)
 		{
-			switch (slot_contents.data.invalidated)
-			{
-				case RS_INVAL_NONE:
-					nulls[i++] = true;
-					break;
-
-				case RS_INVAL_WAL_REMOVED:
-					values[i++] = CStringGetTextDatum("wal_removed");
-					break;
-
-				case RS_INVAL_HORIZON:
-					values[i++] = CStringGetTextDatum("rows_removed");
-					break;
-
-				case RS_INVAL_WAL_LEVEL:
-					values[i++] = CStringGetTextDatum("wal_level_insufficient");
-					break;
-			}
+			case RS_INVAL_NONE:
+				nulls[i++] = true;
+				break;
+
+			case RS_INVAL_WAL_REMOVED:
+				values[i++] = CStringGetTextDatum("wal_removed");
+				break;
+
+			case RS_INVAL_HORIZON:
+				values[i++] = CStringGetTextDatum("rows_removed");
+				break;
+
+			case RS_INVAL_WAL_LEVEL:
+				values[i++] = CStringGetTextDatum("wal_level_insufficient");
+				break;
 		}
 
 		values[i++] = BoolGetDatum(slot_contents.data.failover);
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index 183c2f84eb..9683c91d4a 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -667,13 +667,13 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool \
                live_check)
 	 * removed.
 	 */
 	res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, "
-							"%s as caught_up, conflict_reason IS NOT NULL as invalid "
+							"%s as caught_up, invalidation_reason IS NOT NULL as invalid "
 							"FROM pg_catalog.pg_replication_slots "
 							"WHERE slot_type = 'logical' AND "
 							"database = current_database() AND "
 							"temporary IS FALSE;",
 							live_check ? "FALSE" :
-							"(CASE WHEN conflict_reason IS NOT NULL THEN FALSE "
+							"(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE "
 							"ELSE (SELECT \
pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "  "END)");
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 29af4ce65d..de1115baa0 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11129,7 +11129,7 @@
   proargtypes => '',
   proallargtypes => \
'{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool}', \
                proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin \
,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover}',
 +  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmi \
n,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,invalidation_reason,failover}',
  prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl \
b/src/test/recovery/t/035_standby_logical_decoding.pl index cebfa52d0f..f2c58a8a06 \
                100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -168,7 +168,7 @@ sub change_hot_standby_feedback_and_wait_for_xmins
 	}
 }
 
-# Check conflict_reason in pg_replication_slots.
+# Check invalidation_reason in pg_replication_slots.
 sub check_slots_conflict_reason
 {
 	my ($slot_prefix, $reason) = @_;
@@ -178,15 +178,15 @@ sub check_slots_conflict_reason
 
 	$res = $node_standby->safe_psql(
 		'postgres', qq(
-			 select conflict_reason from pg_replication_slots where slot_name = \
'$active_slot';)); +			 select invalidation_reason from pg_replication_slots where \
slot_name = '$active_slot';));  
-	is($res, "$reason", "$active_slot conflict_reason is $reason");
+	is($res, "$reason", "$active_slot invalidation_reason is $reason");
 
 	$res = $node_standby->safe_psql(
 		'postgres', qq(
-			 select conflict_reason from pg_replication_slots where slot_name = \
'$inactive_slot';)); +			 select invalidation_reason from pg_replication_slots where \
slot_name = '$inactive_slot';));  
-	is($res, "$reason", "$inactive_slot conflict_reason is $reason");
+	is($res, "$reason", "$inactive_slot invalidation_reason is $reason");
 }
 
 # Drop the slots, re-create them, change hot_standby_feedback,
@@ -293,13 +293,13 @@ $node_primary->safe_psql('testdb',
 	qq[SELECT * FROM pg_create_physical_replication_slot('$primary_slotname');]
 );
 
-# Check conflict_reason is NULL for physical slot
+# Check invalidation_reason is NULL for physical slot
 $res = $node_primary->safe_psql(
 	'postgres', qq[
-		 SELECT conflict_reason is null FROM pg_replication_slots where slot_name = \
'$primary_slotname';] +		 SELECT invalidation_reason is null FROM \
pg_replication_slots where slot_name = '$primary_slotname';]  );
 
-is($res, 't', "Physical slot reports conflict_reason as NULL");
+is($res, 't', "Physical slot reports invalidation_reason as NULL");
 
 my $backup_name = 'b1';
 $node_primary->backup($backup_name);
@@ -512,7 +512,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('vacuum_full_', 1, 'with vacuum FULL on pg_class');
 
-# Verify conflict_reason is 'rows_removed' in pg_replication_slots
+# Verify invalidation_reason is 'rows_removed' in pg_replication_slots
 check_slots_conflict_reason('vacuum_full_', 'rows_removed');
 
 $handle =
@@ -531,7 +531,7 @@ change_hot_standby_feedback_and_wait_for_xmins(1, 1);
 ##################################################
 $node_standby->restart;
 
-# Verify conflict_reason is retained across a restart.
+# Verify invalidation_reason is retained across a restart.
 check_slots_conflict_reason('vacuum_full_', 'rows_removed');
 
 ##################################################
@@ -540,7 +540,7 @@ check_slots_conflict_reason('vacuum_full_', 'rows_removed');
 
 # Get the restart_lsn from an invalidated slot
 my $restart_lsn = $node_standby->safe_psql('postgres',
-	"SELECT restart_lsn from pg_replication_slots WHERE slot_name = \
'vacuum_full_activeslot' and conflict_reason is not null;" +	"SELECT restart_lsn from \
pg_replication_slots WHERE slot_name = 'vacuum_full_activeslot' and \
invalidation_reason is not null;"  );
 
 chomp($restart_lsn);
@@ -591,7 +591,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('row_removal_', $logstart, 'with vacuum on pg_class');
 
-# Verify conflict_reason is 'rows_removed' in pg_replication_slots
+# Verify invalidation_reason is 'rows_removed' in pg_replication_slots
 check_slots_conflict_reason('row_removal_', 'rows_removed');
 
 $handle =
@@ -627,7 +627,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 check_for_invalidation('shared_row_removal_', $logstart,
 	'with vacuum on pg_authid');
 
-# Verify conflict_reason is 'rows_removed' in pg_replication_slots
+# Verify invalidation_reason is 'rows_removed' in pg_replication_slots
 check_slots_conflict_reason('shared_row_removal_', 'rows_removed');
 
 $handle = make_slot_active($node_standby, 'shared_row_removal_', 0, \$stdout,
@@ -680,7 +680,7 @@ ok( $node_standby->poll_query_until(
 is( $node_standby->safe_psql(
 		'postgres',
 		q[select bool_or(conflicting) from
-		  (select conflict_reason is not NULL as conflicting
+		  (select invalidation_reason is not NULL as conflicting
 		   from pg_replication_slots WHERE slot_type = 'logical')]),
 	'f',
 	'Logical slots are reported as non conflicting');
@@ -719,7 +719,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('pruning_', $logstart, 'with on-access pruning');
 
-# Verify conflict_reason is 'rows_removed' in pg_replication_slots
+# Verify invalidation_reason is 'rows_removed' in pg_replication_slots
 check_slots_conflict_reason('pruning_', 'rows_removed');
 
 $handle = make_slot_active($node_standby, 'pruning_', 0, \$stdout, \$stderr);
@@ -763,7 +763,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('wal_level_', $logstart, 'due to wal_level');
 
-# Verify conflict_reason is 'wal_level_insufficient' in pg_replication_slots
+# Verify invalidation_reason is 'wal_level_insufficient' in pg_replication_slots
 check_slots_conflict_reason('wal_level_', 'wal_level_insufficient');
 
 $handle =
diff --git a/src/test/regress/expected/rules.out \
b/src/test/regress/expected/rules.out index abc944e8b8..022f9bccb0 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1473,9 +1473,9 @@ pg_replication_slots| SELECT l.slot_name,
     l.wal_status,
     l.safe_wal_size,
     l.two_phase,
-    l.conflict_reason,
+    l.invalidation_reason,
     l.failover
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, \
temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, \
wal_status, safe_wal_size, two_phase, conflict_reason, failover) +   FROM \
(pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, \
active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, \
safe_wal_size, two_phase, invalidation_reason, failover)  LEFT JOIN pg_database d ON \
((l.datoid = d.oid)));  pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.34.1


["v3-0003-Add-inactive_timeout-based-replication-slot-inval.patch" (application/x-patch)]

From 0850c7762bed95dee650e35433a8e9d2ab54d50e Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 31 Jan 2024 12:16:14 +0000
Subject: [PATCH v3 3/4] Add inactive_timeout based replication slot
 invalidation

Currently postgres has the ability to invalidate inactive
replication slots based on the amount of WAL (set via
max_slot_wal_keep_size GUC) that will be needed for the slots in
case they become active. However, choosing a default value for
max_slot_wal_keep_size is tricky. Because the amount of WAL a
customer generates, and their allocated storage will vary greatly
in production, making it difficult to pin down a one-size-fits-all
value. It is often easy for developers to set a timeout of say 1
or 2 or 3 days, after which the inactive slots get dropped.

To achieve the above, postgres uses replication slot metric
inactive_at (the time at which the slot became inactive), and a
new GUC inactive_replication_slot_timeout. The checkpointer then
looks at all replication slots invalidating the inactive slots
based on the timeout set.
---
 doc/src/sgml/config.sgml                      | 18 ++++
 src/backend/access/transam/xlog.c             | 10 +++
 src/backend/replication/slot.c                | 24 ++++-
 src/backend/replication/slotfuncs.c           |  3 +
 src/backend/utils/misc/guc_tables.c           | 12 +++
 src/backend/utils/misc/postgresql.conf.sample |  1 +
 src/include/replication/slot.h                |  3 +
 src/test/recovery/meson.build                 |  1 +
 src/test/recovery/t/050_invalidate_slots.pl   | 87 +++++++++++++++++++
 9 files changed, 156 insertions(+), 3 deletions(-)
 create mode 100644 src/test/recovery/t/050_invalidate_slots.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 61038472c5..099b3fc5cc 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4405,6 +4405,24 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # \
Windows  </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-inactive-replication-slot-timeout" \
xreflabel="inactive_replication_slot_timeout"> +      \
<term><varname>inactive_replication_slot_timeout</varname> (<type>integer</type>) +   \
<indexterm> +       <primary><varname>inactive_replication_slot_timeout</varname> \
configuration parameter</primary> +      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Invalidate replication slots that are inactive for longer than this
+        amount of time at the next checkpoint. If this value is specified
+        without units, it is taken as seconds. A value of zero (which is
+        default) disables the timeout mechanism. This parameter can only be
+        set in the <filename>postgresql.conf</filename> file or on the server
+        command line.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-track-commit-timestamp" \
                xreflabel="track_commit_timestamp">
       <term><varname>track_commit_timestamp</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 478377c4a2..f7ce2cbbb4 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7051,6 +7051,11 @@ CreateCheckPoint(int flags)
 	if (PriorRedoPtr != InvalidXLogRecPtr)
 		UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr);
 
+	/* Invalidate inactive replication slots based on timeout */
+	if (inactive_replication_slot_timeout > 0)
+		InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT, 0,
+										   InvalidOid, InvalidTransactionId);
+
 	/*
 	 * Delete old log files, those no longer needed for last checkpoint to
 	 * prevent the disk holding the xlog from growing full.
@@ -7495,6 +7500,11 @@ CreateRestartPoint(int flags)
 	 */
 	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 
+	/* Invalidate inactive replication slots based on timeout */
+	if (inactive_replication_slot_timeout > 0)
+		InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT, 0,
+										   InvalidOid, InvalidTransactionId);
+
 	/*
 	 * Retreat _logSegNo using the current end of xlog replayed or received,
 	 * whichever is later.
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 9662b7f70d..57bca68547 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -98,9 +98,9 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
 /* My backend's replication slot in the shared memory array */
 ReplicationSlot *MyReplicationSlot = NULL;
 
-/* GUC variable */
-int			max_replication_slots = 10; /* the maximum number of replication
-										 * slots */
+/* GUC variables */
+int			max_replication_slots = 10;
+int			inactive_replication_slot_timeout = 0;
 
 static void ReplicationSlotShmemExit(int code, Datum arg);
 static void ReplicationSlotDropAcquired(void);
@@ -1372,6 +1372,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 		case RS_INVAL_WAL_LEVEL:
 			appendStringInfoString(&err_detail, _("Logical decoding on standby requires \
wal_level >= logical on the primary server."));  break;
+		case RS_INVAL_INACTIVE_TIMEOUT:
+			appendStringInfoString(&err_detail, _("The slot has been inactive for more than \
the time specified by inactive_replication_slot_timeout.")); +			break;
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1470,6 +1473,20 @@ \
InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,  if \
(SlotIsLogical(s))  conflict = cause;
 					break;
+				case RS_INVAL_INACTIVE_TIMEOUT:
+					if (s->data.inactive_at > 0)
+					{
+						TimestampTz now;
+
+						Assert(s->data.persistency == RS_PERSISTENT);
+						Assert(s->active_pid == 0);
+
+						now = GetCurrentTimestamp();
+						if (TimestampDifferenceExceeds(s->data.inactive_at, now,
+													   inactive_replication_slot_timeout * 1000))
+							conflict = cause;
+					}
+					break;
 				case RS_INVAL_NONE:
 					pg_unreachable();
 			}
@@ -1615,6 +1632,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause \
                cause,
  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
  *   db; dboid may be InvalidOid for shared relations
  * - RS_INVAL_WAL_LEVEL: is logical
+ * - RS_INVAL_INACTIVE_TIMEOUT: inactive slot timeout occurs
  *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
diff --git a/src/backend/replication/slotfuncs.c \
b/src/backend/replication/slotfuncs.c index 3c53f4ac48..972c7b2baf 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -425,6 +425,9 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 			case RS_INVAL_WAL_LEVEL:
 				values[i++] = CStringGetTextDatum("wal_level_insufficient");
 				break;
+			case RS_INVAL_INACTIVE_TIMEOUT:
+				values[i++] = CStringGetTextDatum("inactive_timeout");
+				break;
 		}
 
 		values[i++] = BoolGetDatum(slot_contents.data.failover);
diff --git a/src/backend/utils/misc/guc_tables.c \
b/src/backend/utils/misc/guc_tables.c index 7fe58518d7..f08563479b 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2902,6 +2902,18 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"inactive_replication_slot_timeout", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the amount of time to wait before invalidating an "
+						 "inactive replication slot."),
+			NULL,
+			GUC_UNIT_S
+		},
+		&inactive_replication_slot_timeout,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"commit_delay", PGC_SUSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
diff --git a/src/backend/utils/misc/postgresql.conf.sample \
b/src/backend/utils/misc/postgresql.conf.sample index da10b43dac..9fc1f2faed 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -325,6 +325,7 @@
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 #track_commit_timestamp = off	# collect timestamp of transaction commit
 				# (change requires restart)
+#inactive_replication_slot_timeout = 0	# in seconds; 0 disables
 
 # - Primary Server -
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 380dcc90ca..b6607aa97b 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -50,6 +50,8 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_HORIZON,
 	/* wal_level insufficient for slot */
 	RS_INVAL_WAL_LEVEL,
+	/* inactive slot timeout has occurred */
+	RS_INVAL_INACTIVE_TIMEOUT,
 } ReplicationSlotInvalidationCause;
 
 /*
@@ -222,6 +224,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
+extern PGDLLIMPORT int inactive_replication_slot_timeout;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index bf087ac2a9..e07b941d73 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -46,6 +46,7 @@ tests += {
       't/038_save_logical_slots_shutdown.pl',
       't/039_end_of_wal.pl',
       't/040_standby_failover_slots_sync.pl',
+      't/050_invalidate_slots.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/050_invalidate_slots.pl \
b/src/test/recovery/t/050_invalidate_slots.pl new file mode 100644
index 0000000000..bf1cd4bbcc
--- /dev/null
+++ b/src/test/recovery/t/050_invalidate_slots.pl
@@ -0,0 +1,87 @@
+
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# Test for replication slots invalidation
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Utils;
+use PostgreSQL::Test::Cluster;
+use Test::More;
+use Time::HiRes qw(usleep);
+
+# Initialize primary node, setting wal-segsize to 1MB
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 1, extra => ['--wal-segsize=1']);
+$primary->append_conf('postgresql.conf', q{
+checkpoint_timeout = 1h
+});
+$primary->start;
+$primary->safe_psql('postgres', qq[
+    SELECT pg_create_physical_replication_slot('sb1_slot');
+]);
+
+# Take backup
+my $backup_name = 'my_backup';
+$primary->backup($backup_name);
+
+# Create a standby linking to the primary using the replication slot
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup($primary, $backup_name,
+	has_streaming => 1);
+$standby1->append_conf('postgresql.conf', q{
+primary_slot_name = 'sb1_slot'
+});
+$standby1->start;
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby1);
+
+# The inactive replication slot info should be null when the slot is active
+my $result = $primary->safe_psql('postgres', qq[
+	SELECT inactive_at IS NULL, inactive_count = 0 AS OK
+		FROM pg_replication_slots WHERE slot_name = 'sb1_slot';
+]);
+is($result, "t|t", 'check the inactive replication slot info for an active slot');
+
+# Set timeout so that the next checkpoint will invalidate the inactive
+# replication slot.
+$primary->safe_psql('postgres', qq[
+    ALTER SYSTEM SET inactive_replication_slot_timeout TO '1s';
+]);
+$primary->reload;
+
+my $logstart = -s $primary->logfile;
+
+# Stop standby to make the replication slot on primary inactive
+$standby1->stop;
+
+# Wait for the inactive replication slot info to be updated
+$primary->poll_query_until('postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE inactive_at IS NOT NULL AND
+		inactive_count = 1 AND slot_name = 'sb1_slot';
+]) or die "Timed out while waiting for inactive replication slot info to be \
updated"; +
+my $invalidated = 0;
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+	$primary->safe_psql('postgres', "CHECKPOINT");
+	if ($primary->log_contains(
+			'invalidating obsolete replication slot "sb1_slot"', $logstart))
+	{
+		$invalidated = 1;
+		last;
+	}
+	usleep(100_000);
+}
+ok($invalidated, 'check that slot sb1_slot invalidation has been logged');
+
+# Wait for the inactive replication slots to be invalidated.
+$primary->poll_query_until('postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE slot_name = 'sb1_slot' AND
+		invalidation_reason = 'inactive_timeout';
+]) or die "Timed out while waiting for inactive replication slot sb1_slot to be \
invalidated"; +
+done_testing();
-- 
2.34.1



[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic