[prev in list] [next in list] [prev in thread] [next in thread] 

List:       postgresql-hackers
Subject:    Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions
From:       Ajin Cherian <itsajin () gmail ! com>
Date:       2020-07-31 9:52:27
Message-ID: CAFPTHDaMoJVosg6795xJMuFFOorK-3uFRmxqbzrx=LEyeFw5Qg () mail ! gmail ! com
[Download RAW message or body]

[Attachment #2 (multipart/alternative)]


> Attaching an updated patch for the stats for streaming based on v2 of
> Sawada's san replication slot stats framework and v44 of this patch series
> . This is one patch that has both the stats framework from Sawada-san (1)
> as well as my update for streaming, so it can be applied easily on top of
> v44.


regards,
Ajin Cherian
Fujitsu Australia

[Attachment #5 (text/html)]

<div dir="ltr"><div dir="ltr"><br></div><div class="gmail_quote"><blockquote \
class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid \
rgb(204,204,204);padding-left:1ex">Attaching an updated patch for the stats for \
streaming based on v2 of Sawada&#39;s san replication slot stats framework and v44 of \
this patch series . This is one patch that has both the stats framework from \
Sawada-san (1) as well as my update for streaming, so it can be applied easily on top \
of v44.</blockquote><div><br></div><div>regards,</div><div>Ajin \
Cherian</div><div>Fujitsu Australia  </div></div></div>

--00000000000028a19f05abb9c0a9--


["streaming_stats_update.patch" (application/octet-stream)]

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 7dcddf4..d77fbeb 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -315,6 +315,15 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   \
0:00 postgres: ser  </row>
 
      <row>
+      <entry><structname>pg_stat_replication_slots</structname><indexterm><primary>pg_stat_replication_slots</primary></indexterm></entry>
 +      <entry>One row per replication slot, showing statistics about
+       replication slot usage.
+       See <link linkend="monitoring-pg-stat-replication-slots-view">
+       <structname>pg_stat_replication_slots</structname></link> for details.
+      </entry>
+     </row>
+
+     <row>
       <entry><structname>pg_stat_wal_receiver</structname><indexterm><primary>pg_stat_wal_receiver</primary></indexterm></entry>
  <entry>Only one row, showing statistics about the WAL receiver from
        that receiver's connected server.
@@ -2508,7 +2517,119 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity \
WHERE wait_event i  
  </sect2>
 
- <sect2 id="monitoring-pg-stat-wal-receiver-view">
+ <sect2 id="monitoring-pg-stat-replication-slots-view">
+  <title><structname>pg_stat_replication_slots</structname></title>
+
+  <indexterm>
+   <primary>pg_stat_replication_slots</primary>
+  </indexterm>
+
+   <para>
+    The <structname>pg_stat_replication_slots</structname> view will contain
+    one row per replication slot, showing statistics about replication
+    slot usage.
+   </para>
+
+   <table id="pg-stat-replication-slots-view" xreflabel="pg_stat_replication_slots">
+    <title><structname>pg_stat_replication_slots</structname> View</title>
+    <tgroup cols="1">
+     <thead>
+      <row>
+       <entry role="catalog_table_entry"><para role="column_definition">
+         Column Type
+        </para>
+        <para>
+         Description
+       </para></entry>
+      </row>
+     </thead>
+
+     <tbody>
+      <row>
+       <entry role="catalog_table_entry"><para role="column_definition">
+         <structfield>name</structfield> <type>text</type>
+        </para>
+        <para>
+         A unique, cluster-wide identifier for the replication slot
+       </para></entry>
+      </row>
+
+      <row>
+       <entry role="catalog_table_entry"><para role="column_definition">
+         <structfield>spill_txns</structfield> <type>bigint</type>
+        </para>
+        <para>
+         Number of transactions spilled to disk after the memory used by
+         logical decoding exceeds <literal>logical_decoding_work_mem</literal>. The
+         counter gets incremented both for toplevel transactions and
+         subtransactions.
+       </para></entry>
+      </row>
+
+      <row>
+       <entry role="catalog_table_entry"><para role="column_definition">
+         <structfield>spill_count</structfield> <type>bigint</type>
+        </para>
+        <para>
+         Number of times transactions were spilled to disk. Transactions
+         may get spilled repeatedly, and this counter gets incremented on every
+         such invocation.
+       </para></entry>
+      </row>
+
+      <row>
+       <entry role="catalog_table_entry"><para role="column_definition">
+         <structfield>spill_bytes</structfield> <type>bigint</type>
+        </para>
+        <para>
+         Amount of decoded transaction data spilled to disk.
+       </para></entry>
+      </row>
+
+      <row>
+       <entry role="catalog_table_entry"><para role="column_definition">
+         <structfield>stream_txns</structfield> <type>bigint</type>
+        </para>
+        <para>
+         Number of in-progress transactions streamed to subscriber after
+         memory used by logical decoding exceeds \
<literal>logical_decoding_work_mem</literal>. +         Streaming only works with \
toplevel transactions (subtransactions can't +         be streamed independently), so \
the counter does not get incremented for +         subtransactions.
+       </para></entry>
+      </row>
+
+      <row>
+       <entry role="catalog_table_entry"><para role="column_definition">
+         <structfield>stream_count</structfield> <type>bigint</type>
+        </para>
+        <para>
+         Number of times in-progress transactions were streamed to subscriber.
+         Transactions may get streamed repeatedly, and this counter gets incremented
+         on every such invocation.
+       </para></entry>
+      </row>
+
+      <row>
+       <entry role="catalog_table_entry"><para role="column_definition">
+         <structfield>stream_bytes</structfield> <type>bigint</type>
+        </para>
+        <para>
+         Amount of decoded in-progress transaction data streamed to subscriber.
+       </para></entry>
+      </row>
+     </tbody>
+    </tgroup>
+   </table>
+
+   <para>
+    Tracking of spilled transactions works only for logical replication.  In
+    physical replication, the tracking mechanism will display 0 for spilled
+    statistics.
+   </para>
+  </sect2>
+
+  <sect2 id="monitoring-pg-stat-wal-receiver-view">
   <title><structname>pg_stat_wal_receiver</structname></title>
 
   <indexterm>
@@ -4707,6 +4828,26 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity \
WHERE wait_event i  can be granted EXECUTE to run the function.
        </para></entry>
       </row>
+
+      <row>
+        <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+          <primary>pg_stat_reset_replication_slot</primary>
+        </indexterm>
+        <function>pg_stat_reset_replication_slot</function> ( <type>text</type> )
+        <returnvalue>void</returnvalue>
+       </para>
+       <para>
+         Resets statistics to zero for a single replication slot, or for all
+         replication slots in the cluster.  If the argument is NULL, all counters
+         shown in the <structname>pg_stat_replication_slots</structname> view for
+         all replication slots are reset.
+       </para>
+       <para>
+         This function is restricted to superusers by default, but other users
+         can be granted EXECUTE to run the function.
+       </para></entry>
+      </row>
      </tbody>
     </tgroup>
    </table>
diff --git a/src/backend/catalog/system_views.sql \
b/src/backend/catalog/system_views.sql index 8625cbe..ceba837 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -790,6 +790,17 @@ CREATE VIEW pg_stat_replication AS
         JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
 
+CREATE VIEW pg_stat_replication_slots AS
+    SELECT
+            s.name,
+            s.spill_txns,
+            s.spill_count,
+            s.spill_bytes,
+            s.stream_txns,
+            s.stream_count,
+            s.stream_bytes
+    FROM pg_stat_get_replication_slots() AS s;
+
 CREATE VIEW pg_stat_slru AS
     SELECT
             s.name,
@@ -1441,6 +1452,7 @@ REVOKE EXECUTE ON FUNCTION pg_stat_reset_shared(text) FROM \
public;  REVOKE EXECUTE ON FUNCTION pg_stat_reset_slru(text) FROM public;
 REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_table_counters(oid) FROM public;
 REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_function_counters(oid) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_stat_reset_replication_slot(text) FROM public;
 
 REVOKE EXECUTE ON FUNCTION lo_import(text) FROM public;
 REVOKE EXECUTE ON FUNCTION lo_import(text, oid) FROM public;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 479e3ca..7aba571 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -51,6 +51,7 @@
 #include "postmaster/fork_process.h"
 #include "postmaster/interrupt.h"
 #include "postmaster/postmaster.h"
+#include "replication/slot.h"
 #include "replication/walsender.h"
 #include "storage/backendid.h"
 #include "storage/dsm.h"
@@ -282,6 +283,8 @@ static int	localNumBackends = 0;
 static PgStat_ArchiverStats archiverStats;
 static PgStat_GlobalStats globalStats;
 static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
+static PgStat_ReplSlotStats	*replSlotStats;
+static int	nReplSlotStats;
 
 /*
  * List of OIDs of databases we need to write out.  If an entry is InvalidOid,
@@ -340,6 +343,8 @@ static const char *pgstat_get_wait_io(WaitEventIO w);
 static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);
 static void pgstat_send(void *msg, int len);
 
+static int pgstat_replslot_index(const char *name, bool create_it);
+
 static void pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len);
 static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
 static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
@@ -348,6 +353,7 @@ static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, \
int len);  static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter \
*msg, int len);  static void \
pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len);  static \
void pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len); +static \
void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len);  \
static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);  static void \
pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);  static void \
pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len); @@ -360,6 +366,7 @@ static void \
pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int le  static void \
pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len);  static void \
pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len);  static void \
pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len); +static void \
pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);  
 /* ------------------------------------------------------------
  * Public functions called from postmaster follow
@@ -1430,6 +1437,36 @@ pgstat_reset_slru_counter(const char *name)
 }
 
 /* ----------
+ * pgstat_reset_replslot_counter() -
+ *
+ *	Tell the statistics collector to reset a single replication slot
+ *	counter, or all replication slots counters (when name is null).
+ *
+ *	Permission checking for this function is managed through the normal
+ *	GRANT system.
+ * ----------
+ */
+void
+pgstat_reset_replslot_counter(const char *name)
+{
+	PgStat_MsgResetreplslotcounter msg;
+
+	if (pgStatSock == PGINVALID_SOCKET)
+		return;
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETREPLSLOTCOUNTER);
+	if (name)
+	{
+		memcpy(&msg.m_slotname, name, NAMEDATALEN);
+		msg.clearall = false;
+	}
+	else
+		msg.clearall = true;
+
+	pgstat_send(&msg, sizeof(msg));
+}
+
+/* ----------
  * pgstat_report_autovac() -
  *
  *	Called from autovacuum.c to report startup of an autovacuum process.
@@ -1629,6 +1666,49 @@ pgstat_report_tempfile(size_t filesize)
 	pgstat_send(&msg, sizeof(msg));
 }
 
+/* ----------
+ * pgstat_report_replslot() -
+ *
+ *	Tell the collector about replication slot statistics.
+ * ----------
+ */
+void
+pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
+					   int spillbytes, int streamtxns, int streamcount, int  streambytes)
+{
+	PgStat_MsgReplSlot msg;
+
+	/*
+	 * Prepare and send the message
+	 */
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
+	memcpy(&msg.m_slotname, slotname, NAMEDATALEN);
+	msg.m_drop = false;
+	msg.m_spill_txns = spilltxns;
+	msg.m_spill_count = spillcount;
+	msg.m_spill_bytes = spillbytes;
+	msg.m_stream_txns = streamtxns;
+	msg.m_stream_count = streamcount;
+	msg.m_stream_bytes = streambytes;
+	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
+}
+
+/* ----------
+ * pgstat_report_replslot_drop() -
+ *
+ *	Tell the collector about dropping the replication slot.
+ * ----------
+ */
+void
+pgstat_report_replslot_drop(const char *slotname)
+{
+	PgStat_MsgReplSlot msg;
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
+	memcpy(&msg.m_slotname, slotname, NAMEDATALEN);
+	msg.m_drop = true;
+	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
+}
 
 /* ----------
  * pgstat_ping() -
@@ -2691,6 +2771,23 @@ pgstat_fetch_slru(void)
 	return slruStats;
 }
 
+/*
+ * ---------
+ * pgstat_fetch_replslot() -
+ *
+ *	Support function for the SQL-callable pgstat* functions. Returns
+ *	a pointer to the replication slot statistics struct and set the
+ *	number of entries to nslots_p.
+ * ---------
+ */
+PgStat_ReplSlotStats *
+pgstat_fetch_replslot(int *nslots_p)
+{
+	backend_read_statsfile();
+
+	*nslots_p = nReplSlotStats;
+	return replSlotStats;
+}
 
 /* ------------------------------------------------------------
  * Functions for management of the shared-memory PgBackendStatus array
@@ -4630,6 +4727,11 @@ PgstatCollectorMain(int argc, char *argv[])
 												 len);
 					break;
 
+				case PGSTAT_MTYPE_RESETREPLSLOTCOUNTER:
+					pgstat_recv_resetreplslotcounter(&msg.msg_resetreplslotcounter,
+													 len);
+					break;
+
 				case PGSTAT_MTYPE_AUTOVAC_START:
 					pgstat_recv_autovac(&msg.msg_autovacuum_start, len);
 					break;
@@ -4680,6 +4782,10 @@ PgstatCollectorMain(int argc, char *argv[])
 												 len);
 					break;
 
+				case PGSTAT_MTYPE_REPLSLOT:
+					pgstat_recv_replslot(&msg.msg_replslot, len);
+					break;
+
 				default:
 					break;
 			}
@@ -4883,6 +4989,7 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 	const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : \
pgstat_stat_tmpname;  const char *statfile = permanent ? \
PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;  int			rc;
+	int			i;
 
 	elog(DEBUG2, "writing stats file \"%s\"", statfile);
 
@@ -4930,6 +5037,16 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 	(void) rc;					/* we'll check for error with ferror */
 
 	/*
+	 * Write replication slot stats struct
+	 */
+	for (i = 0; i < nReplSlotStats; i++)
+	{
+		fputc('R', fpout);
+		rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout);
+		(void) rc;				/* we'll check for error with ferror */
+	}
+
+	/*
 	 * Walk through the database table.
 	 */
 	hash_seq_init(&hstat, pgStatDBHash);
@@ -5181,6 +5298,10 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 	dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
 						 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
+	/* Allocate the space for replication slot statistics */
+	replSlotStats = palloc0(max_replication_slots * sizeof(PgStat_ReplSlotStats));
+	nReplSlotStats = 0;
+
 	/*
 	 * Clear out global and archiver statistics so they start from zero in
 	 * case we can't load an existing statsfile.
@@ -5203,6 +5324,12 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 		slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
 
 	/*
+	 * Set the same reset timestamp for all replication slots too.
+	 */
+	for (i = 0; i < max_replication_slots; i++)
+		replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
+
+	/*
 	 * Try to open the stats file. If it doesn't exist, the backends simply
 	 * return zero for anything and the collector simply starts from scratch
 	 * with empty counters.
@@ -5365,6 +5492,23 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 
 				break;
 
+				/*
+				 * 'R'	A PgStat_ReplSlotStats struct describing a replication slot
+				 * follows.
+				 */
+			case 'R':
+				if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin)
+					!= sizeof(PgStat_ReplSlotStats))
+				{
+					ereport(pgStatRunningInCollector ? LOG : WARNING,
+							(errmsg("corrupted statistics file \"%s\"",
+									statfile)));
+					memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
+					goto done;
+				}
+				nReplSlotStats++;
+				break;
+
 			case 'E':
 				goto done;
 
@@ -5574,6 +5718,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool \
permanent,  PgStat_GlobalStats myGlobalStats;
 	PgStat_ArchiverStats myArchiverStats;
 	PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
+	PgStat_ReplSlotStats myReplSlotStats;
 	FILE	   *fpin;
 	int32		format_id;
 	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : \
pgstat_stat_filename; @@ -5676,6 +5821,21 @@ pgstat_read_db_statsfile_timestamp(Oid \
databaseid, bool permanent,  
 				break;
 
+				/*
+				 * 'R'	A PgStat_ReplSlotStats struct describing a replication slot
+				 * follows.
+				 */
+			case 'R':
+				if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin)
+					!= sizeof(PgStat_ReplSlotStats))
+				{
+					ereport(pgStatRunningInCollector ? LOG : WARNING,
+							(errmsg("corrupted statistics file \"%s\"",
+									statfile)));
+					goto done;
+				}
+				break;
+
 			case 'E':
 				goto done;
 
@@ -6263,6 +6423,48 @@ pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, \
int len)  }
 
 /* ----------
+ * pgstat_recv_resetreplslotcounter() -
+ *
+ *	Reset some replication slot statistics of the cluster.
+ * ----------
+ */
+static void
+pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
+								 int len)
+{
+	int			i;
+	int			idx = -1;
+	TimestampTz ts;
+
+	if (!msg->clearall)
+	{
+		/* Get the index of replication slot statistics to reset */
+		idx = pgstat_replslot_index(msg->m_slotname, false);
+
+		if (idx < 0)
+			return;	/* not found */
+	}
+
+	ts = GetCurrentTimestamp();
+	for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
+	{
+		/* reset entry with the given index, or all entries (index is -1) */
+		if (msg->clearall || idx == i)
+		{
+			/* reset only counters. Don't clear slot name */
+			replSlotStats[i].spill_txns = 0;
+			replSlotStats[i].spill_count = 0;
+			replSlotStats[i].spill_bytes = 0;
+			replSlotStats[i].stream_txns = 0;
+			replSlotStats[i].stream_count = 0;
+			replSlotStats[i].stream_bytes = 0;
+			replSlotStats[i].stat_reset_timestamp = ts;
+		}
+	}
+}
+
+
+/* ----------
  * pgstat_recv_autovac() -
  *
  *	Process an autovacuum signaling message.
@@ -6509,6 +6711,80 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, \
int len)  dbentry->last_checksum_failure = msg->m_failure_time;
 }
 
+/*
+ * pgstat_replslot_index
+ *
+ * Return the index of entry of a replication slot with the given name, or
+ * -1 if the slot is not found.  If create_it is true, this function creates
+ * the statistics of the replication slot if not exists.
+ */
+static int
+pgstat_replslot_index(const char *name, bool create_it)
+{
+	int		i;
+
+	Assert(nReplSlotStats <= max_replication_slots);
+	for (i = 0; i < nReplSlotStats; i++)
+	{
+		if (strcmp(replSlotStats[i].slotname, name) == 0)
+			return i; /* found */
+	}
+
+	/*
+	 * The slot is not found.  We don't want to register the new statistics
+	 * if the list is already full or the caller didn't request.
+	 */
+	if (i == max_replication_slots || !create_it)
+		return -1;
+
+	/* Register new slot */
+	memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
+	memcpy(&replSlotStats[nReplSlotStats].slotname, name, NAMEDATALEN);
+	return nReplSlotStats++;
+}
+
+/* ----------
+ * pgstat_recv_replslot() -
+ *
+ *	Process a REPLSLOT message.
+ * ----------
+ */
+static void
+pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
+{
+	int idx;
+
+	/*
+	 * Get the index of replication slot statistics.  On dropping, we
+	 * don't create the new statistics.
+	 */
+	idx = pgstat_replslot_index(msg->m_slotname, !msg->m_drop);
+
+	/* the statistics is not found or is already full */
+	if (idx < 0)
+		return;
+
+	Assert(idx >= 0 && idx <= max_replication_slots);
+	if (msg->m_drop)
+	{
+		/* Remove the replication slot statistics with the given name */
+		memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1],
+			   sizeof(PgStat_ReplSlotStats));
+		nReplSlotStats--;
+		Assert(nReplSlotStats >= 0);
+	}
+	else
+	{
+		/* Update the replication slot statistics */
+		replSlotStats[idx].spill_txns += msg->m_spill_txns;
+		replSlotStats[idx].spill_count += msg->m_spill_count;
+		replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
+		replSlotStats[idx].stream_txns += msg->m_stream_txns;
+		replSlotStats[idx].stream_count += msg->m_stream_count;
+		replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
+	}
+}
+
 /* ----------
  * pgstat_recv_tempfile() -
  *
diff --git a/src/backend/replication/logical/logical.c \
b/src/backend/replication/logical/logical.c index 42f284b..9cfc48c 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -32,6 +32,7 @@
 #include "access/xlog_internal.h"
 #include "fmgr.h"
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/origin.h"
@@ -83,6 +84,7 @@ static void stream_truncate_cb_wrapper(ReorderBuffer *cache, \
                ReorderBufferTXN *t
 									   int nrelations, Relation relations[], ReorderBufferChange *change);
 
 static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
+static void UpdateSpillStats(LogicalDecodingContext *ctx);
 
 /*
  * Make sure the current settings & environment are capable of doing logical
@@ -740,6 +742,11 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	/*
+	 * Update statistics about transactions that spilled to disk.
+	 */
+	UpdateSpillStats(ctx);
 }
 
 static void
@@ -1452,3 +1459,28 @@ ResetLogicalStreamingState(void)
 	CheckXidAlive = InvalidTransactionId;
 	bsysscan = false;
 }
+
+static void
+UpdateSpillStats(LogicalDecodingContext *ctx)
+{
+   ReorderBuffer *rb = ctx->reorder;
+
+   elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld %lld %lld %lld",
+        rb,
+        (long long) rb->spillTxns,
+        (long long) rb->spillCount,
+        (long long) rb->spillBytes,
+        (long long) rb->streamTxns,
+        (long long) rb->streamCount,
+        (long long) rb->streamBytes);
+
+   pgstat_report_replslot(NameStr(ctx->slot->data.name),
+                          rb->spillTxns, rb->spillCount, rb->spillBytes,
+                          rb->streamTxns, rb->streamCount, rb->streamBytes);
+   rb->spillTxns = 0;
+   rb->spillCount = 0;
+   rb->spillBytes = 0;
+   rb->streamTxns = 0;
+   rb->streamCount = 0;
+   rb->streamBytes = 0;
+}
diff --git a/src/backend/replication/logical/reorderbuffer.c \
b/src/backend/replication/logical/reorderbuffer.c index c469536..ac4422b 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -344,6 +344,13 @@ ReorderBufferAllocate(void)
 	buffer->outbufsize = 0;
 	buffer->size = 0;
 
+	buffer->spillCount = 0;
+	buffer->spillTxns = 0;
+	buffer->spillBytes = 0;
+	buffer->streamCount = 0;
+	buffer->streamTxns = 0;
+	buffer->streamBytes = 0;
+
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
 	dlist_init(&buffer->toplevel_by_lsn);
@@ -3098,6 +3105,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN \
*txn)  int			fd = -1;
 	XLogSegNo	curOpenSegNo = 0;
 	Size		spilled = 0;
+	Size		size = txn->size;
 
 	elog(DEBUG2, "spill %u changes in XID %u to disk",
 		 (uint32) txn->nentries_mem, txn->xid);
@@ -3156,6 +3164,13 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN \
*txn)  spilled++;
 	}
 
+	/* update the statistics */
+	rb->spillCount += 1;
+	rb->spillBytes += size;
+
+	/* Don't consider already serialized transactions. */
+	rb->spillTxns += rbtxn_is_serialized(txn) ? 0 : 1;
+
 	Assert(spilled == txn->nentries_mem);
 	Assert(dlist_is_empty(&txn->changes));
 	txn->nentries_mem = 0;
@@ -3484,10 +3499,18 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN \
*txn)  txn->snapshot_now = NULL;
 	}
 
+
+	rb->streamCount += 1;
+	rb->streamBytes += txn->total_size;
+
+	/* Don't consider already streamed transaction. */
+	rb->streamTxns += (rbtxn_is_streamed(txn)) ? 0 : 1;
+
 	/* Process and send the changes to output plugin. */
 	ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
 							command_id, true);
 
+
 	Assert(dlist_is_empty(&txn->changes));
 	Assert(txn->nentries == 0);
 	Assert(txn->nentries_mem == 0);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 57bbb62..ba8a013 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -322,6 +322,9 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 
 	/* Let everybody know we've modified this slot */
 	ConditionVariableBroadcast(&slot->active_cv);
+
+	/* Create statistics entry for the new slot */
+	pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0);
 }
 
 /*
@@ -683,6 +686,17 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 				(errmsg("could not remove directory \"%s\"", tmppath)));
 
 	/*
+	 * Report to drop the replication slot to stats collector.  Since there
+	 * is no guarantees the order of message arrival on an UDP connection,
+	 * it's possible that a message for creating a new slot arrives before a
+	 * message for removing the old slot.  We send the drop message while
+	 * holding ReplicationSlotAllocationLock to reduce that possibility.
+	 * If the messages arrived in reverse, we would lose one statistics update
+	 * message.
+	 */
+	pgstat_report_replslot_drop(NameStr(slot->data.name));
+
+	/*
 	 * We release this at the very end, so that nobody starts trying to create
 	 * a slot while we're still cleaning up the detritus of the old one.
 	 */
diff --git a/src/backend/utils/adt/pgstatfuncs.c \
b/src/backend/utils/adt/pgstatfuncs.c index 95738a4..59fba37 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2033,6 +2033,20 @@ pg_stat_reset_slru(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+/* Reset replication slots counters (a specific one or all of them). */
+Datum
+pg_stat_reset_replication_slot(PG_FUNCTION_ARGS)
+{
+	char	   *target = NULL;
+
+	if (!PG_ARGISNULL(0))
+		target = text_to_cstring(PG_GETARG_TEXT_PP(0));
+
+	pgstat_reset_replslot_counter(target);
+
+	PG_RETURN_VOID();
+}
+
 Datum
 pg_stat_get_archiver(PG_FUNCTION_ARGS)
 {
@@ -2098,3 +2112,66 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
+
+Datum
+pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_REPLICATION_SLOT_CLOS 7
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	TupleDesc	tupdesc;
+	Tuplestorestate *tupstore;
+	MemoryContext per_query_ctx;
+	MemoryContext oldcontext;
+	PgStat_ReplSlotStats *stats;
+	int			nstats;
+	int			i;
+
+	/* check to see if caller supports us returning a tuplestore */
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mode required, but it is not allowed in this context")));
+
+	/* Build a tuple descriptor for our result type */
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+	tupstore = tuplestore_begin_heap(true, false, work_mem);
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	stats = pgstat_fetch_replslot(&nstats);
+	for (i = 0; i < nstats; i++)
+	{
+		Datum	values[PG_STAT_GET_REPLICATION_SLOT_CLOS];
+		bool	nulls[PG_STAT_GET_REPLICATION_SLOT_CLOS];
+		PgStat_ReplSlotStats stat = stats[i];
+
+		MemSet(values, 0, sizeof(values));
+		MemSet(nulls, 0, sizeof(nulls));
+
+		values[0] = PointerGetDatum(cstring_to_text(stat.slotname));
+		values[1] = Int64GetDatum(stat.spill_txns);
+		values[2] = Int64GetDatum(stat.spill_count);
+		values[3] = Int64GetDatum(stat.spill_bytes);
+		values[4] = Int64GetDatum(stat.stream_txns);
+		values[5] = Int64GetDatum(stat.stream_count);
+		values[6] = Int64GetDatum(stat.stream_bytes);
+
+		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+	}
+
+	tuplestore_donestoring(tupstore);
+
+	return (Datum) 0;
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 082a11f..2a316b1 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5251,6 +5251,14 @@
   proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
   proargnames => '{pid,status,receive_start_lsn,receive_start_tli,written_lsn,flushed \
_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}',
  prosrc => 'pg_stat_get_wal_receiver' },
+{ oid => '8595', descr => 'statistics: information about replication slots',
+  proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f',
+  proretset => 't', provolatile => 's', proparallel => 'r',
+  prorettype => 'record', proargtypes => '',
+  proallargtypes => '{text,int8,int8,int8,int8,int8,int8}',
+  proargmodes => '{o,o,o,o,o,o,o}',
+  proargnames => '{name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes}',
 +  prosrc => 'pg_stat_get_replication_slots' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
@@ -5592,6 +5600,10 @@
   descr => 'statistics: reset collected statistics for a single SLRU',
   proname => 'pg_stat_reset_slru', proisstrict => 'f', provolatile => 'v',
   prorettype => 'void', proargtypes => 'text', prosrc => 'pg_stat_reset_slru' },
+{ oid => '8596',
+  descr => 'statistics: reset collected statistics for a single replication slot',
+  proname => 'pg_stat_reset_replication_slot', proisstrict => 'f', provolatile => \
'v', +  prorettype => 'void', proargtypes => 'text', prosrc => \
'pg_stat_reset_replication_slot' },  
 { oid => '3163', descr => 'current trigger depth',
   proname => 'pg_trigger_depth', provolatile => 's', proparallel => 'r',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 0dfbac4..ce67740 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -56,6 +56,7 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_RESETSHAREDCOUNTER,
 	PGSTAT_MTYPE_RESETSINGLECOUNTER,
 	PGSTAT_MTYPE_RESETSLRUCOUNTER,
+	PGSTAT_MTYPE_RESETREPLSLOTCOUNTER,
 	PGSTAT_MTYPE_AUTOVAC_START,
 	PGSTAT_MTYPE_VACUUM,
 	PGSTAT_MTYPE_ANALYZE,
@@ -67,7 +68,8 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_RECOVERYCONFLICT,
 	PGSTAT_MTYPE_TEMPFILE,
 	PGSTAT_MTYPE_DEADLOCK,
-	PGSTAT_MTYPE_CHECKSUMFAILURE
+	PGSTAT_MTYPE_CHECKSUMFAILURE,
+	PGSTAT_MTYPE_REPLSLOT,
 } StatMsgType;
 
 /* ----------
@@ -357,6 +359,18 @@ typedef struct PgStat_MsgResetslrucounter
 } PgStat_MsgResetslrucounter;
 
 /* ----------
+ * PgStat_MsgResetreplslotcounter Sent by the backend to tell the collector
+ *								to reset replicatino slot counter(s)
+ * ----------
+ */
+typedef struct PgStat_MsgResetreplslotcounter
+{
+	PgStat_MsgHdr m_hdr;
+	char		m_slotname[NAMEDATALEN];
+	bool		clearall;
+} PgStat_MsgResetreplslotcounter;
+
+/* ----------
  * PgStat_MsgAutovacStart		Sent by the autovacuum daemon to signal
  *								that a database is going to be processed
  * ----------
@@ -454,6 +468,25 @@ typedef struct PgStat_MsgSLRU
 } PgStat_MsgSLRU;
 
 /* ----------
+ * PgStat_MsgReplSlot	Sent by a backend or a wal sender to update replication
+ *						slot statistics.
+ * ----------
+ */
+typedef struct PgStat_MsgReplSlot
+{
+	PgStat_MsgHdr	m_hdr;
+	char			m_slotname[NAMEDATALEN];
+	bool			m_drop;
+	PgStat_Counter	m_spill_txns;
+	PgStat_Counter	m_spill_count;
+	PgStat_Counter	m_spill_bytes;
+	PgStat_Counter	m_stream_txns;
+	PgStat_Counter	m_stream_count;
+	PgStat_Counter	m_stream_bytes;
+} PgStat_MsgReplSlot;
+
+
+/* ----------
  * PgStat_MsgRecoveryConflict	Sent by the backend upon recovery conflict
  * ----------
  */
@@ -591,6 +624,7 @@ typedef union PgStat_Msg
 	PgStat_MsgResetsharedcounter msg_resetsharedcounter;
 	PgStat_MsgResetsinglecounter msg_resetsinglecounter;
 	PgStat_MsgResetslrucounter msg_resetslrucounter;
+	PgStat_MsgResetreplslotcounter msg_resetreplslotcounter;
 	PgStat_MsgAutovacStart msg_autovacuum_start;
 	PgStat_MsgVacuum msg_vacuum;
 	PgStat_MsgAnalyze msg_analyze;
@@ -603,6 +637,7 @@ typedef union PgStat_Msg
 	PgStat_MsgDeadlock msg_deadlock;
 	PgStat_MsgTempFile msg_tempfile;
 	PgStat_MsgChecksumFailure msg_checksumfailure;
+	PgStat_MsgReplSlot msg_replslot;
 } PgStat_Msg;
 
 
@@ -760,6 +795,20 @@ typedef struct PgStat_SLRUStats
 	TimestampTz stat_reset_timestamp;
 } PgStat_SLRUStats;
 
+/*
+ * Replication slot statistics kept in the stats collector
+ */
+typedef struct PgStat_ReplSlotStats
+{
+	char			slotname[NAMEDATALEN];
+	PgStat_Counter	spill_txns;
+	PgStat_Counter	spill_count;
+	PgStat_Counter	spill_bytes;
+	PgStat_Counter  stream_txns;
+	PgStat_Counter  stream_count;
+	PgStat_Counter  stream_bytes;
+	TimestampTz		stat_reset_timestamp;
+} PgStat_ReplSlotStats;
 
 /* ----------
  * Backend states
@@ -1303,6 +1352,7 @@ extern void pgstat_reset_counters(void);
 extern void pgstat_reset_shared_counters(const char *);
 extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type \
type);  extern void pgstat_reset_slru_counter(const char *);
+extern void pgstat_reset_replslot_counter(const char *name);
 
 extern void pgstat_report_autovac(Oid dboid);
 extern void pgstat_report_vacuum(Oid tableoid, bool shared,
@@ -1315,6 +1365,9 @@ extern void pgstat_report_recovery_conflict(int reason);
 extern void pgstat_report_deadlock(void);
 extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
 extern void pgstat_report_checksum_failure(void);
+extern void pgstat_report_replslot(const char *slotname, int spilltxns, int \
spillcount, +								   int spillbytes, int streamtxns, int streamcount, int \
streambytes); +extern void pgstat_report_replslot_drop(const char *slotname);
 
 extern void pgstat_initialize(void);
 extern void pgstat_bestart(void);
@@ -1479,6 +1532,7 @@ extern int	pgstat_fetch_stat_numbackends(void);
 extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void);
 extern PgStat_GlobalStats *pgstat_fetch_global(void);
 extern PgStat_SLRUStats *pgstat_fetch_slru(void);
+extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p);
 
 extern void pgstat_count_slru_page_zeroed(int slru_idx);
 extern void pgstat_count_slru_page_hit(int slru_idx);
diff --git a/src/include/replication/reorderbuffer.h \
b/src/include/replication/reorderbuffer.h index 1ae17d5..edc51b1 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -525,6 +525,20 @@ struct ReorderBuffer
 
 	/* memory accounting */
 	Size		size;
+
+	/*
+	 * Statistics about transactions spilled to disk.
+	 *
+	 * A single transaction may be spilled repeatedly, which is why we keep
+	 * two different counters. For spilling, the transaction counter includes
+	 * both toplevel transactions and subtransactions.
+	 */
+	int64		spillCount;		/* spill-to-disk invocation counter */
+	int64		spillTxns;		/* number of transactions spilled to disk  */
+	int64		spillBytes;		/* amount of data spilled to disk */
+	int64		streamCount;	/* streaming invocation counter */
+	int64		streamTxns;		/* number of transactions spilled to disk */
+	int64		streamBytes;	/* amount of data streamed to subscriber */
 };
 
 
diff --git a/src/test/regress/expected/rules.out \
b/src/test/regress/expected/rules.out index 601734a..197a86c 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2008,6 +2008,14 @@ pg_stat_replication| SELECT s.pid,
    FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, \
application_name, state, query, wait_event_type, wait_event, xact_start, query_start, \
backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, \
backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, \
ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, \
leader_pid)  JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, \
flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, \
reply_time) ON ((s.pid = w.pid)))  LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
+pg_stat_replication_slots| SELECT s.name,
+    s.spill_txns,
+    s.spill_count,
+    s.spill_bytes,
+    s.stream_txns,
+    s.stream_count,
+    s.stream_bytes
+   FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, \
spill_bytes, stream_txns, stream_count, stream_bytes);  pg_stat_slru| SELECT s.name,
     s.blks_zeroed,
     s.blks_hit,



[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic