[prev in list] [next in list] [prev in thread] [next in thread]
List: postgresql-hackers
Subject: Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions
From: Ajin Cherian <itsajin () gmail ! com>
Date: 2020-07-31 9:52:27
Message-ID: CAFPTHDaMoJVosg6795xJMuFFOorK-3uFRmxqbzrx=LEyeFw5Qg () mail ! gmail ! com
[Download RAW message or body]
[Attachment #2 (multipart/alternative)]
> Attaching an updated patch for the stats for streaming based on v2 of
> Sawada's san replication slot stats framework and v44 of this patch series
> . This is one patch that has both the stats framework from Sawada-san (1)
> as well as my update for streaming, so it can be applied easily on top of
> v44.
regards,
Ajin Cherian
Fujitsu Australia
[Attachment #5 (text/html)]
<div dir="ltr"><div dir="ltr"><br></div><div class="gmail_quote"><blockquote \
class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid \
rgb(204,204,204);padding-left:1ex">Attaching an updated patch for the stats for \
streaming based on v2 of Sawada's san replication slot stats framework and v44 of \
this patch series . This is one patch that has both the stats framework from \
Sawada-san (1) as well as my update for streaming, so it can be applied easily on top \
of v44.</blockquote><div><br></div><div>regards,</div><div>Ajin \
Cherian</div><div>Fujitsu Australia </div></div></div>
--00000000000028a19f05abb9c0a9--
["streaming_stats_update.patch" (application/octet-stream)]
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 7dcddf4..d77fbeb 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -315,6 +315,15 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 \
0:00 postgres: ser </row>
<row>
+ <entry><structname>pg_stat_replication_slots</structname><indexterm><primary>pg_stat_replication_slots</primary></indexterm></entry>
+ <entry>One row per replication slot, showing statistics about
+ replication slot usage.
+ See <link linkend="monitoring-pg-stat-replication-slots-view">
+ <structname>pg_stat_replication_slots</structname></link> for details.
+ </entry>
+ </row>
+
+ <row>
<entry><structname>pg_stat_wal_receiver</structname><indexterm><primary>pg_stat_wal_receiver</primary></indexterm></entry>
<entry>Only one row, showing statistics about the WAL receiver from
that receiver's connected server.
@@ -2508,7 +2517,119 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity \
WHERE wait_event i
</sect2>
- <sect2 id="monitoring-pg-stat-wal-receiver-view">
+ <sect2 id="monitoring-pg-stat-replication-slots-view">
+ <title><structname>pg_stat_replication_slots</structname></title>
+
+ <indexterm>
+ <primary>pg_stat_replication_slots</primary>
+ </indexterm>
+
+ <para>
+ The <structname>pg_stat_replication_slots</structname> view will contain
+ one row per replication slot, showing statistics about replication
+ slot usage.
+ </para>
+
+ <table id="pg-stat-replication-slots-view" xreflabel="pg_stat_replication_slots">
+ <title><structname>pg_stat_replication_slots</structname> View</title>
+ <tgroup cols="1">
+ <thead>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ Column Type
+ </para>
+ <para>
+ Description
+ </para></entry>
+ </row>
+ </thead>
+
+ <tbody>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>name</structfield> <type>text</type>
+ </para>
+ <para>
+ A unique, cluster-wide identifier for the replication slot
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>spill_txns</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of transactions spilled to disk after the memory used by
+ logical decoding exceeds <literal>logical_decoding_work_mem</literal>. The
+ counter gets incremented both for toplevel transactions and
+ subtransactions.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>spill_count</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of times transactions were spilled to disk. Transactions
+ may get spilled repeatedly, and this counter gets incremented on every
+ such invocation.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>spill_bytes</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Amount of decoded transaction data spilled to disk.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>stream_txns</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of in-progress transactions streamed to subscriber after
+ memory used by logical decoding exceeds \
<literal>logical_decoding_work_mem</literal>. + Streaming only works with \
toplevel transactions (subtransactions can't + be streamed independently), so \
the counter does not get incremented for + subtransactions.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>stream_count</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of times in-progress transactions were streamed to subscriber.
+ Transactions may get streamed repeatedly, and this counter gets incremented
+ on every such invocation.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>stream_bytes</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Amount of decoded in-progress transaction data streamed to subscriber.
+ </para></entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+
+ <para>
+ Tracking of spilled transactions works only for logical replication. In
+ physical replication, the tracking mechanism will display 0 for spilled
+ statistics.
+ </para>
+ </sect2>
+
+ <sect2 id="monitoring-pg-stat-wal-receiver-view">
<title><structname>pg_stat_wal_receiver</structname></title>
<indexterm>
@@ -4707,6 +4828,26 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity \
WHERE wait_event i can be granted EXECUTE to run the function.
</para></entry>
</row>
+
+ <row>
+ <entry role="func_table_entry"><para role="func_signature">
+ <indexterm>
+ <primary>pg_stat_reset_replication_slot</primary>
+ </indexterm>
+ <function>pg_stat_reset_replication_slot</function> ( <type>text</type> )
+ <returnvalue>void</returnvalue>
+ </para>
+ <para>
+ Resets statistics to zero for a single replication slot, or for all
+ replication slots in the cluster. If the argument is NULL, all counters
+ shown in the <structname>pg_stat_replication_slots</structname> view for
+ all replication slots are reset.
+ </para>
+ <para>
+ This function is restricted to superusers by default, but other users
+ can be granted EXECUTE to run the function.
+ </para></entry>
+ </row>
</tbody>
</tgroup>
</table>
diff --git a/src/backend/catalog/system_views.sql \
b/src/backend/catalog/system_views.sql index 8625cbe..ceba837 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -790,6 +790,17 @@ CREATE VIEW pg_stat_replication AS
JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
+CREATE VIEW pg_stat_replication_slots AS
+ SELECT
+ s.name,
+ s.spill_txns,
+ s.spill_count,
+ s.spill_bytes,
+ s.stream_txns,
+ s.stream_count,
+ s.stream_bytes
+ FROM pg_stat_get_replication_slots() AS s;
+
CREATE VIEW pg_stat_slru AS
SELECT
s.name,
@@ -1441,6 +1452,7 @@ REVOKE EXECUTE ON FUNCTION pg_stat_reset_shared(text) FROM \
public; REVOKE EXECUTE ON FUNCTION pg_stat_reset_slru(text) FROM public;
REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_table_counters(oid) FROM public;
REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_function_counters(oid) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_stat_reset_replication_slot(text) FROM public;
REVOKE EXECUTE ON FUNCTION lo_import(text) FROM public;
REVOKE EXECUTE ON FUNCTION lo_import(text, oid) FROM public;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 479e3ca..7aba571 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -51,6 +51,7 @@
#include "postmaster/fork_process.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
+#include "replication/slot.h"
#include "replication/walsender.h"
#include "storage/backendid.h"
#include "storage/dsm.h"
@@ -282,6 +283,8 @@ static int localNumBackends = 0;
static PgStat_ArchiverStats archiverStats;
static PgStat_GlobalStats globalStats;
static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
+static PgStat_ReplSlotStats *replSlotStats;
+static int nReplSlotStats;
/*
* List of OIDs of databases we need to write out. If an entry is InvalidOid,
@@ -340,6 +343,8 @@ static const char *pgstat_get_wait_io(WaitEventIO w);
static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);
static void pgstat_send(void *msg, int len);
+static int pgstat_replslot_index(const char *name, bool create_it);
+
static void pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len);
static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
@@ -348,6 +353,7 @@ static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, \
int len); static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter \
*msg, int len); static void \
pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len); static \
void pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len); +static \
void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len); \
static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len); static void \
pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len); static void \
pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len); @@ -360,6 +366,7 @@ static void \
pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int le static void \
pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len); static void \
pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len); static void \
pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len); +static void \
pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
/* ------------------------------------------------------------
* Public functions called from postmaster follow
@@ -1430,6 +1437,36 @@ pgstat_reset_slru_counter(const char *name)
}
/* ----------
+ * pgstat_reset_replslot_counter() -
+ *
+ * Tell the statistics collector to reset a single replication slot
+ * counter, or all replication slots counters (when name is null).
+ *
+ * Permission checking for this function is managed through the normal
+ * GRANT system.
+ * ----------
+ */
+void
+pgstat_reset_replslot_counter(const char *name)
+{
+ PgStat_MsgResetreplslotcounter msg;
+
+ if (pgStatSock == PGINVALID_SOCKET)
+ return;
+
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETREPLSLOTCOUNTER);
+ if (name)
+ {
+ memcpy(&msg.m_slotname, name, NAMEDATALEN);
+ msg.clearall = false;
+ }
+ else
+ msg.clearall = true;
+
+ pgstat_send(&msg, sizeof(msg));
+}
+
+/* ----------
* pgstat_report_autovac() -
*
* Called from autovacuum.c to report startup of an autovacuum process.
@@ -1629,6 +1666,49 @@ pgstat_report_tempfile(size_t filesize)
pgstat_send(&msg, sizeof(msg));
}
+/* ----------
+ * pgstat_report_replslot() -
+ *
+ * Tell the collector about replication slot statistics.
+ * ----------
+ */
+void
+pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
+ int spillbytes, int streamtxns, int streamcount, int streambytes)
+{
+ PgStat_MsgReplSlot msg;
+
+ /*
+ * Prepare and send the message
+ */
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
+ memcpy(&msg.m_slotname, slotname, NAMEDATALEN);
+ msg.m_drop = false;
+ msg.m_spill_txns = spilltxns;
+ msg.m_spill_count = spillcount;
+ msg.m_spill_bytes = spillbytes;
+ msg.m_stream_txns = streamtxns;
+ msg.m_stream_count = streamcount;
+ msg.m_stream_bytes = streambytes;
+ pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
+}
+
+/* ----------
+ * pgstat_report_replslot_drop() -
+ *
+ * Tell the collector about dropping the replication slot.
+ * ----------
+ */
+void
+pgstat_report_replslot_drop(const char *slotname)
+{
+ PgStat_MsgReplSlot msg;
+
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
+ memcpy(&msg.m_slotname, slotname, NAMEDATALEN);
+ msg.m_drop = true;
+ pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
+}
/* ----------
* pgstat_ping() -
@@ -2691,6 +2771,23 @@ pgstat_fetch_slru(void)
return slruStats;
}
+/*
+ * ---------
+ * pgstat_fetch_replslot() -
+ *
+ * Support function for the SQL-callable pgstat* functions. Returns
+ * a pointer to the replication slot statistics struct and set the
+ * number of entries to nslots_p.
+ * ---------
+ */
+PgStat_ReplSlotStats *
+pgstat_fetch_replslot(int *nslots_p)
+{
+ backend_read_statsfile();
+
+ *nslots_p = nReplSlotStats;
+ return replSlotStats;
+}
/* ------------------------------------------------------------
* Functions for management of the shared-memory PgBackendStatus array
@@ -4630,6 +4727,11 @@ PgstatCollectorMain(int argc, char *argv[])
len);
break;
+ case PGSTAT_MTYPE_RESETREPLSLOTCOUNTER:
+ pgstat_recv_resetreplslotcounter(&msg.msg_resetreplslotcounter,
+ len);
+ break;
+
case PGSTAT_MTYPE_AUTOVAC_START:
pgstat_recv_autovac(&msg.msg_autovacuum_start, len);
break;
@@ -4680,6 +4782,10 @@ PgstatCollectorMain(int argc, char *argv[])
len);
break;
+ case PGSTAT_MTYPE_REPLSLOT:
+ pgstat_recv_replslot(&msg.msg_replslot, len);
+ break;
+
default:
break;
}
@@ -4883,6 +4989,7 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : \
pgstat_stat_tmpname; const char *statfile = permanent ? \
PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; int rc;
+ int i;
elog(DEBUG2, "writing stats file \"%s\"", statfile);
@@ -4930,6 +5037,16 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
(void) rc; /* we'll check for error with ferror */
/*
+ * Write replication slot stats struct
+ */
+ for (i = 0; i < nReplSlotStats; i++)
+ {
+ fputc('R', fpout);
+ rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout);
+ (void) rc; /* we'll check for error with ferror */
+ }
+
+ /*
* Walk through the database table.
*/
hash_seq_init(&hstat, pgStatDBHash);
@@ -5181,6 +5298,10 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ /* Allocate the space for replication slot statistics */
+ replSlotStats = palloc0(max_replication_slots * sizeof(PgStat_ReplSlotStats));
+ nReplSlotStats = 0;
+
/*
* Clear out global and archiver statistics so they start from zero in
* case we can't load an existing statsfile.
@@ -5203,6 +5324,12 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
/*
+ * Set the same reset timestamp for all replication slots too.
+ */
+ for (i = 0; i < max_replication_slots; i++)
+ replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
+
+ /*
* Try to open the stats file. If it doesn't exist, the backends simply
* return zero for anything and the collector simply starts from scratch
* with empty counters.
@@ -5365,6 +5492,23 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
break;
+ /*
+ * 'R' A PgStat_ReplSlotStats struct describing a replication slot
+ * follows.
+ */
+ case 'R':
+ if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin)
+ != sizeof(PgStat_ReplSlotStats))
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"",
+ statfile)));
+ memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
+ goto done;
+ }
+ nReplSlotStats++;
+ break;
+
case 'E':
goto done;
@@ -5574,6 +5718,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool \
permanent, PgStat_GlobalStats myGlobalStats;
PgStat_ArchiverStats myArchiverStats;
PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
+ PgStat_ReplSlotStats myReplSlotStats;
FILE *fpin;
int32 format_id;
const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : \
pgstat_stat_filename; @@ -5676,6 +5821,21 @@ pgstat_read_db_statsfile_timestamp(Oid \
databaseid, bool permanent,
break;
+ /*
+ * 'R' A PgStat_ReplSlotStats struct describing a replication slot
+ * follows.
+ */
+ case 'R':
+ if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin)
+ != sizeof(PgStat_ReplSlotStats))
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"",
+ statfile)));
+ goto done;
+ }
+ break;
+
case 'E':
goto done;
@@ -6263,6 +6423,48 @@ pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, \
int len) }
/* ----------
+ * pgstat_recv_resetreplslotcounter() -
+ *
+ * Reset some replication slot statistics of the cluster.
+ * ----------
+ */
+static void
+pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
+ int len)
+{
+ int i;
+ int idx = -1;
+ TimestampTz ts;
+
+ if (!msg->clearall)
+ {
+ /* Get the index of replication slot statistics to reset */
+ idx = pgstat_replslot_index(msg->m_slotname, false);
+
+ if (idx < 0)
+ return; /* not found */
+ }
+
+ ts = GetCurrentTimestamp();
+ for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
+ {
+ /* reset entry with the given index, or all entries (index is -1) */
+ if (msg->clearall || idx == i)
+ {
+ /* reset only counters. Don't clear slot name */
+ replSlotStats[i].spill_txns = 0;
+ replSlotStats[i].spill_count = 0;
+ replSlotStats[i].spill_bytes = 0;
+ replSlotStats[i].stream_txns = 0;
+ replSlotStats[i].stream_count = 0;
+ replSlotStats[i].stream_bytes = 0;
+ replSlotStats[i].stat_reset_timestamp = ts;
+ }
+ }
+}
+
+
+/* ----------
* pgstat_recv_autovac() -
*
* Process an autovacuum signaling message.
@@ -6509,6 +6711,80 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, \
int len) dbentry->last_checksum_failure = msg->m_failure_time;
}
+/*
+ * pgstat_replslot_index
+ *
+ * Return the index of entry of a replication slot with the given name, or
+ * -1 if the slot is not found. If create_it is true, this function creates
+ * the statistics of the replication slot if not exists.
+ */
+static int
+pgstat_replslot_index(const char *name, bool create_it)
+{
+ int i;
+
+ Assert(nReplSlotStats <= max_replication_slots);
+ for (i = 0; i < nReplSlotStats; i++)
+ {
+ if (strcmp(replSlotStats[i].slotname, name) == 0)
+ return i; /* found */
+ }
+
+ /*
+ * The slot is not found. We don't want to register the new statistics
+ * if the list is already full or the caller didn't request.
+ */
+ if (i == max_replication_slots || !create_it)
+ return -1;
+
+ /* Register new slot */
+ memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
+ memcpy(&replSlotStats[nReplSlotStats].slotname, name, NAMEDATALEN);
+ return nReplSlotStats++;
+}
+
+/* ----------
+ * pgstat_recv_replslot() -
+ *
+ * Process a REPLSLOT message.
+ * ----------
+ */
+static void
+pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
+{
+ int idx;
+
+ /*
+ * Get the index of replication slot statistics. On dropping, we
+ * don't create the new statistics.
+ */
+ idx = pgstat_replslot_index(msg->m_slotname, !msg->m_drop);
+
+ /* the statistics is not found or is already full */
+ if (idx < 0)
+ return;
+
+ Assert(idx >= 0 && idx <= max_replication_slots);
+ if (msg->m_drop)
+ {
+ /* Remove the replication slot statistics with the given name */
+ memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1],
+ sizeof(PgStat_ReplSlotStats));
+ nReplSlotStats--;
+ Assert(nReplSlotStats >= 0);
+ }
+ else
+ {
+ /* Update the replication slot statistics */
+ replSlotStats[idx].spill_txns += msg->m_spill_txns;
+ replSlotStats[idx].spill_count += msg->m_spill_count;
+ replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
+ replSlotStats[idx].stream_txns += msg->m_stream_txns;
+ replSlotStats[idx].stream_count += msg->m_stream_count;
+ replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
+ }
+}
+
/* ----------
* pgstat_recv_tempfile() -
*
diff --git a/src/backend/replication/logical/logical.c \
b/src/backend/replication/logical/logical.c index 42f284b..9cfc48c 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -32,6 +32,7 @@
#include "access/xlog_internal.h"
#include "fmgr.h"
#include "miscadmin.h"
+#include "pgstat.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/origin.h"
@@ -83,6 +84,7 @@ static void stream_truncate_cb_wrapper(ReorderBuffer *cache, \
ReorderBufferTXN *t
int nrelations, Relation relations[], ReorderBufferChange *change);
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
+static void UpdateSpillStats(LogicalDecodingContext *ctx);
/*
* Make sure the current settings & environment are capable of doing logical
@@ -740,6 +742,11 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
/* Pop the error context stack */
error_context_stack = errcallback.previous;
+
+ /*
+ * Update statistics about transactions that spilled to disk.
+ */
+ UpdateSpillStats(ctx);
}
static void
@@ -1452,3 +1459,28 @@ ResetLogicalStreamingState(void)
CheckXidAlive = InvalidTransactionId;
bsysscan = false;
}
+
+static void
+UpdateSpillStats(LogicalDecodingContext *ctx)
+{
+ ReorderBuffer *rb = ctx->reorder;
+
+ elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld %lld %lld %lld",
+ rb,
+ (long long) rb->spillTxns,
+ (long long) rb->spillCount,
+ (long long) rb->spillBytes,
+ (long long) rb->streamTxns,
+ (long long) rb->streamCount,
+ (long long) rb->streamBytes);
+
+ pgstat_report_replslot(NameStr(ctx->slot->data.name),
+ rb->spillTxns, rb->spillCount, rb->spillBytes,
+ rb->streamTxns, rb->streamCount, rb->streamBytes);
+ rb->spillTxns = 0;
+ rb->spillCount = 0;
+ rb->spillBytes = 0;
+ rb->streamTxns = 0;
+ rb->streamCount = 0;
+ rb->streamBytes = 0;
+}
diff --git a/src/backend/replication/logical/reorderbuffer.c \
b/src/backend/replication/logical/reorderbuffer.c index c469536..ac4422b 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -344,6 +344,13 @@ ReorderBufferAllocate(void)
buffer->outbufsize = 0;
buffer->size = 0;
+ buffer->spillCount = 0;
+ buffer->spillTxns = 0;
+ buffer->spillBytes = 0;
+ buffer->streamCount = 0;
+ buffer->streamTxns = 0;
+ buffer->streamBytes = 0;
+
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
dlist_init(&buffer->toplevel_by_lsn);
@@ -3098,6 +3105,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN \
*txn) int fd = -1;
XLogSegNo curOpenSegNo = 0;
Size spilled = 0;
+ Size size = txn->size;
elog(DEBUG2, "spill %u changes in XID %u to disk",
(uint32) txn->nentries_mem, txn->xid);
@@ -3156,6 +3164,13 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN \
*txn) spilled++;
}
+ /* update the statistics */
+ rb->spillCount += 1;
+ rb->spillBytes += size;
+
+ /* Don't consider already serialized transactions. */
+ rb->spillTxns += rbtxn_is_serialized(txn) ? 0 : 1;
+
Assert(spilled == txn->nentries_mem);
Assert(dlist_is_empty(&txn->changes));
txn->nentries_mem = 0;
@@ -3484,10 +3499,18 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN \
*txn) txn->snapshot_now = NULL;
}
+
+ rb->streamCount += 1;
+ rb->streamBytes += txn->total_size;
+
+ /* Don't consider already streamed transaction. */
+ rb->streamTxns += (rbtxn_is_streamed(txn)) ? 0 : 1;
+
/* Process and send the changes to output plugin. */
ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
command_id, true);
+
Assert(dlist_is_empty(&txn->changes));
Assert(txn->nentries == 0);
Assert(txn->nentries_mem == 0);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 57bbb62..ba8a013 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -322,6 +322,9 @@ ReplicationSlotCreate(const char *name, bool db_specific,
/* Let everybody know we've modified this slot */
ConditionVariableBroadcast(&slot->active_cv);
+
+ /* Create statistics entry for the new slot */
+ pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0);
}
/*
@@ -683,6 +686,17 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
(errmsg("could not remove directory \"%s\"", tmppath)));
/*
+ * Report to drop the replication slot to stats collector. Since there
+ * is no guarantees the order of message arrival on an UDP connection,
+ * it's possible that a message for creating a new slot arrives before a
+ * message for removing the old slot. We send the drop message while
+ * holding ReplicationSlotAllocationLock to reduce that possibility.
+ * If the messages arrived in reverse, we would lose one statistics update
+ * message.
+ */
+ pgstat_report_replslot_drop(NameStr(slot->data.name));
+
+ /*
* We release this at the very end, so that nobody starts trying to create
* a slot while we're still cleaning up the detritus of the old one.
*/
diff --git a/src/backend/utils/adt/pgstatfuncs.c \
b/src/backend/utils/adt/pgstatfuncs.c index 95738a4..59fba37 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2033,6 +2033,20 @@ pg_stat_reset_slru(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
+/* Reset replication slots counters (a specific one or all of them). */
+Datum
+pg_stat_reset_replication_slot(PG_FUNCTION_ARGS)
+{
+ char *target = NULL;
+
+ if (!PG_ARGISNULL(0))
+ target = text_to_cstring(PG_GETARG_TEXT_PP(0));
+
+ pgstat_reset_replslot_counter(target);
+
+ PG_RETURN_VOID();
+}
+
Datum
pg_stat_get_archiver(PG_FUNCTION_ARGS)
{
@@ -2098,3 +2112,66 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
/* Returns the record as Datum */
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}
+
+Datum
+pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_REPLICATION_SLOT_CLOS 7
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ TupleDesc tupdesc;
+ Tuplestorestate *tupstore;
+ MemoryContext per_query_ctx;
+ MemoryContext oldcontext;
+ PgStat_ReplSlotStats *stats;
+ int nstats;
+ int i;
+
+ /* check to see if caller supports us returning a tuplestore */
+ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("set-valued function called in context that cannot accept a set")));
+ if (!(rsinfo->allowedModes & SFRM_Materialize))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("materialize mode required, but it is not allowed in this context")));
+
+ /* Build a tuple descriptor for our result type */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+ oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+ tupstore = tuplestore_begin_heap(true, false, work_mem);
+ rsinfo->returnMode = SFRM_Materialize;
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = tupdesc;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ stats = pgstat_fetch_replslot(&nstats);
+ for (i = 0; i < nstats; i++)
+ {
+ Datum values[PG_STAT_GET_REPLICATION_SLOT_CLOS];
+ bool nulls[PG_STAT_GET_REPLICATION_SLOT_CLOS];
+ PgStat_ReplSlotStats stat = stats[i];
+
+ MemSet(values, 0, sizeof(values));
+ MemSet(nulls, 0, sizeof(nulls));
+
+ values[0] = PointerGetDatum(cstring_to_text(stat.slotname));
+ values[1] = Int64GetDatum(stat.spill_txns);
+ values[2] = Int64GetDatum(stat.spill_count);
+ values[3] = Int64GetDatum(stat.spill_bytes);
+ values[4] = Int64GetDatum(stat.stream_txns);
+ values[5] = Int64GetDatum(stat.stream_count);
+ values[6] = Int64GetDatum(stat.stream_bytes);
+
+ tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+ }
+
+ tuplestore_donestoring(tupstore);
+
+ return (Datum) 0;
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 082a11f..2a316b1 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5251,6 +5251,14 @@
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{pid,status,receive_start_lsn,receive_start_tli,written_lsn,flushed \
_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}',
prosrc => 'pg_stat_get_wal_receiver' },
+{ oid => '8595', descr => 'statistics: information about replication slots',
+ proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f',
+ proretset => 't', provolatile => 's', proparallel => 'r',
+ prorettype => 'record', proargtypes => '',
+ proallargtypes => '{text,int8,int8,int8,int8,int8,int8}',
+ proargmodes => '{o,o,o,o,o,o,o}',
+ proargnames => '{name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes}',
+ prosrc => 'pg_stat_get_replication_slots' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's',
proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
@@ -5592,6 +5600,10 @@
descr => 'statistics: reset collected statistics for a single SLRU',
proname => 'pg_stat_reset_slru', proisstrict => 'f', provolatile => 'v',
prorettype => 'void', proargtypes => 'text', prosrc => 'pg_stat_reset_slru' },
+{ oid => '8596',
+ descr => 'statistics: reset collected statistics for a single replication slot',
+ proname => 'pg_stat_reset_replication_slot', proisstrict => 'f', provolatile => \
'v', + prorettype => 'void', proargtypes => 'text', prosrc => \
'pg_stat_reset_replication_slot' },
{ oid => '3163', descr => 'current trigger depth',
proname => 'pg_trigger_depth', provolatile => 's', proparallel => 'r',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 0dfbac4..ce67740 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -56,6 +56,7 @@ typedef enum StatMsgType
PGSTAT_MTYPE_RESETSHAREDCOUNTER,
PGSTAT_MTYPE_RESETSINGLECOUNTER,
PGSTAT_MTYPE_RESETSLRUCOUNTER,
+ PGSTAT_MTYPE_RESETREPLSLOTCOUNTER,
PGSTAT_MTYPE_AUTOVAC_START,
PGSTAT_MTYPE_VACUUM,
PGSTAT_MTYPE_ANALYZE,
@@ -67,7 +68,8 @@ typedef enum StatMsgType
PGSTAT_MTYPE_RECOVERYCONFLICT,
PGSTAT_MTYPE_TEMPFILE,
PGSTAT_MTYPE_DEADLOCK,
- PGSTAT_MTYPE_CHECKSUMFAILURE
+ PGSTAT_MTYPE_CHECKSUMFAILURE,
+ PGSTAT_MTYPE_REPLSLOT,
} StatMsgType;
/* ----------
@@ -357,6 +359,18 @@ typedef struct PgStat_MsgResetslrucounter
} PgStat_MsgResetslrucounter;
/* ----------
+ * PgStat_MsgResetreplslotcounter Sent by the backend to tell the collector
+ * to reset replicatino slot counter(s)
+ * ----------
+ */
+typedef struct PgStat_MsgResetreplslotcounter
+{
+ PgStat_MsgHdr m_hdr;
+ char m_slotname[NAMEDATALEN];
+ bool clearall;
+} PgStat_MsgResetreplslotcounter;
+
+/* ----------
* PgStat_MsgAutovacStart Sent by the autovacuum daemon to signal
* that a database is going to be processed
* ----------
@@ -454,6 +468,25 @@ typedef struct PgStat_MsgSLRU
} PgStat_MsgSLRU;
/* ----------
+ * PgStat_MsgReplSlot Sent by a backend or a wal sender to update replication
+ * slot statistics.
+ * ----------
+ */
+typedef struct PgStat_MsgReplSlot
+{
+ PgStat_MsgHdr m_hdr;
+ char m_slotname[NAMEDATALEN];
+ bool m_drop;
+ PgStat_Counter m_spill_txns;
+ PgStat_Counter m_spill_count;
+ PgStat_Counter m_spill_bytes;
+ PgStat_Counter m_stream_txns;
+ PgStat_Counter m_stream_count;
+ PgStat_Counter m_stream_bytes;
+} PgStat_MsgReplSlot;
+
+
+/* ----------
* PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict
* ----------
*/
@@ -591,6 +624,7 @@ typedef union PgStat_Msg
PgStat_MsgResetsharedcounter msg_resetsharedcounter;
PgStat_MsgResetsinglecounter msg_resetsinglecounter;
PgStat_MsgResetslrucounter msg_resetslrucounter;
+ PgStat_MsgResetreplslotcounter msg_resetreplslotcounter;
PgStat_MsgAutovacStart msg_autovacuum_start;
PgStat_MsgVacuum msg_vacuum;
PgStat_MsgAnalyze msg_analyze;
@@ -603,6 +637,7 @@ typedef union PgStat_Msg
PgStat_MsgDeadlock msg_deadlock;
PgStat_MsgTempFile msg_tempfile;
PgStat_MsgChecksumFailure msg_checksumfailure;
+ PgStat_MsgReplSlot msg_replslot;
} PgStat_Msg;
@@ -760,6 +795,20 @@ typedef struct PgStat_SLRUStats
TimestampTz stat_reset_timestamp;
} PgStat_SLRUStats;
+/*
+ * Replication slot statistics kept in the stats collector
+ */
+typedef struct PgStat_ReplSlotStats
+{
+ char slotname[NAMEDATALEN];
+ PgStat_Counter spill_txns;
+ PgStat_Counter spill_count;
+ PgStat_Counter spill_bytes;
+ PgStat_Counter stream_txns;
+ PgStat_Counter stream_count;
+ PgStat_Counter stream_bytes;
+ TimestampTz stat_reset_timestamp;
+} PgStat_ReplSlotStats;
/* ----------
* Backend states
@@ -1303,6 +1352,7 @@ extern void pgstat_reset_counters(void);
extern void pgstat_reset_shared_counters(const char *);
extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type \
type); extern void pgstat_reset_slru_counter(const char *);
+extern void pgstat_reset_replslot_counter(const char *name);
extern void pgstat_report_autovac(Oid dboid);
extern void pgstat_report_vacuum(Oid tableoid, bool shared,
@@ -1315,6 +1365,9 @@ extern void pgstat_report_recovery_conflict(int reason);
extern void pgstat_report_deadlock(void);
extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
extern void pgstat_report_checksum_failure(void);
+extern void pgstat_report_replslot(const char *slotname, int spilltxns, int \
spillcount, + int spillbytes, int streamtxns, int streamcount, int \
streambytes); +extern void pgstat_report_replslot_drop(const char *slotname);
extern void pgstat_initialize(void);
extern void pgstat_bestart(void);
@@ -1479,6 +1532,7 @@ extern int pgstat_fetch_stat_numbackends(void);
extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void);
extern PgStat_GlobalStats *pgstat_fetch_global(void);
extern PgStat_SLRUStats *pgstat_fetch_slru(void);
+extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p);
extern void pgstat_count_slru_page_zeroed(int slru_idx);
extern void pgstat_count_slru_page_hit(int slru_idx);
diff --git a/src/include/replication/reorderbuffer.h \
b/src/include/replication/reorderbuffer.h index 1ae17d5..edc51b1 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -525,6 +525,20 @@ struct ReorderBuffer
/* memory accounting */
Size size;
+
+ /*
+ * Statistics about transactions spilled to disk.
+ *
+ * A single transaction may be spilled repeatedly, which is why we keep
+ * two different counters. For spilling, the transaction counter includes
+ * both toplevel transactions and subtransactions.
+ */
+ int64 spillCount; /* spill-to-disk invocation counter */
+ int64 spillTxns; /* number of transactions spilled to disk */
+ int64 spillBytes; /* amount of data spilled to disk */
+ int64 streamCount; /* streaming invocation counter */
+ int64 streamTxns; /* number of transactions spilled to disk */
+ int64 streamBytes; /* amount of data streamed to subscriber */
};
diff --git a/src/test/regress/expected/rules.out \
b/src/test/regress/expected/rules.out index 601734a..197a86c 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2008,6 +2008,14 @@ pg_stat_replication| SELECT s.pid,
FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, \
application_name, state, query, wait_event_type, wait_event, xact_start, query_start, \
backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, \
backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, \
ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, \
leader_pid) JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, \
flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, \
reply_time) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
+pg_stat_replication_slots| SELECT s.name,
+ s.spill_txns,
+ s.spill_count,
+ s.spill_bytes,
+ s.stream_txns,
+ s.stream_count,
+ s.stream_bytes
+ FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, \
spill_bytes, stream_txns, stream_count, stream_bytes); pg_stat_slru| SELECT s.name,
s.blks_zeroed,
s.blks_hit,
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic