[prev in list] [next in list] [prev in thread] [next in thread] 

List:       pgsql-hackers
Subject:    Re: [HACKERS] LISTEN denial of service with aborted transaction
From:       Matt Newell <newellm () blur ! com>
Date:       2015-09-30 19:59:22
Message-ID: 3873524.8xdXel8qyP () obsidian
[Download RAW message or body]

> 
> > After further thought, I wonder whether instead of having an incoming
> > listener initialize its "pos" to QUEUE_TAIL, we couldn't have it
> > initialize to the maximum "pos" among existing listeners (with a floor of
> > QUEUE_TAIL of course).  If any other listener has advanced over a given
> > message X, then X was committed (or aborted) earlier and the new listener
> > is not required to return it; so it should be all right to take that
> > listener's "pos" as a starting point.
> 
> That's a great idea and I will try it.  It does need to be the maximum
> position of existing listeners *with matching db oid" since
> asyncQueueReadAllNotifications doesn't check transaction status if the db
> oid doesn't match it's own.  Another advantage of this approach is that a
> queued notify from an open transaction in one database won't incur
> additional cost on listens coming from other databases, whereas with my
> patch such a situation will prevent firstUncommitted from advancing.
> 


Patch attached works great.  I added the dboid to the QueueBackendStatus 
struct but that might not be needed if there is an easy and fast way to get a 
db oid from a backend pid.

I also skip the max pos calculation loop if QUEUE_HEAD is on the same page as 
QUEUE_TAIL, with the thought that it's not desirable to increase the time that 
AsyncQueueLock is held if the queue is small and 
asyncQueueReadAllNotifications will execute quickly.

I then noticed that we are taking the same lock twice in a row to read the 
same values, first in Exec_ListenPreCommit then in 
asyncQueueReadAllNotifications, and much of the time the latter will simply 
return because pos will already be at QUEUE_HEAD.  I prepared a second patch
that splits asyncQueueReadAllNotifications. Exec_ListPreCommit then only calls 
the worker version only when needed.  This avoids the duplicate lock.

Thanks,
Matt Newell
["fix_slow_listen.patch" (fix_slow_listen.patch)]

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 91baede..58682dc 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -202,6 +202,12 @@ typedef struct QueuePosition
 	 (x).page != (y).page ? (y) : \
 	 (x).offset < (y).offset ? (x) : (y))
 
+/* choose logically larger QueuePosition */
+#define QUEUE_POS_MAX(x,y) \
+	(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
+	 (x).page != (y).page ? (x) : \
+	 (x).offset < (y).offset ? (y) : (x))
+
 /*
  * Struct describing a listening backend's status
  */
@@ -209,6 +215,7 @@ typedef struct QueueBackendStatus
 {
 	int32		pid;			/* either a PID or InvalidPid */
 	QueuePosition pos;			/* backend has read queue up to here */
+	Oid 		dboid;			/* backend's database OID */
 } QueueBackendStatus;
 
 /*
@@ -248,6 +255,7 @@ static AsyncQueueControl *asyncQueueControl;
 #define QUEUE_TAIL					(asyncQueueControl->tail)
 #define QUEUE_BACKEND_PID(i)		(asyncQueueControl->backend[i].pid)
 #define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_DBOID(i)		(asyncQueueControl->backend[i].dboid)
 
 /*
  * The SLRU buffer area through which we access the notification queue
@@ -461,6 +469,7 @@ AsyncShmemInit(void)
 		for (i = 0; i <= MaxBackends; i++)
 		{
 			QUEUE_BACKEND_PID(i) = InvalidPid;
+			QUEUE_BACKEND_DBOID(i) = InvalidOid;
 			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
 		}
 	}
@@ -907,6 +916,9 @@ AtCommit_Notify(void)
 static void
 Exec_ListenPreCommit(void)
 {
+	QueuePosition max;
+	int i;
+
 	/*
 	 * Nothing to do if we are already listening to something, nor if we
 	 * already ran this routine in this transaction.
@@ -936,7 +948,25 @@ Exec_ListenPreCommit(void)
 	 * doesn't hurt.
 	 */
 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
-	QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL;
+	max = QUEUE_TAIL;
+	/*
+	 * If there is over a page of notifications queued, then we should find
+	 * the maximum position of all backends connected to our database, to
+	 * avoid having to process all of the notifications that belong to already
+	 * committed transactions that we will disregard anyway.  This solves
+	 * a significant performance problem with listen when there is a long
+	 * running transaction
+	 */
+	if( QUEUE_POS_PAGE(max) < QUEUE_POS_PAGE(QUEUE_HEAD) )
+	{
+		for (i = 1; i <= MaxBackends; i++)
+		{
+			if (QUEUE_BACKEND_PID(i) != InvalidPid && QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+				max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
+		}
+	}
+	QUEUE_BACKEND_POS(MyBackendId) = max;
+	QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
 	QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
 	LWLockRelease(AsyncQueueLock);
 
@@ -1156,6 +1186,7 @@ asyncQueueUnregister(void)
 		QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
 	/* ... then mark it invalid */
 	QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
+	QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
 	LWLockRelease(AsyncQueueLock);
 
 	/* mark ourselves as no longer listed in the global array */

["avoid_redundant_lock.patch" (avoid_redundant_lock.patch)]

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 58682dc..e2f942f 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -384,6 +384,8 @@ static double asyncQueueUsage(void);
 static void asyncQueueFillWarning(void);
 static bool SignalBackends(void);
 static void asyncQueueReadAllNotifications(void);
+static void asyncQueueReadAllNotificationsWorker(volatile QueuePosition pos,
+									 QueuePosition head);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
 							 QueuePosition stop,
 							 char *page_buffer);
@@ -917,6 +919,7 @@ static void
 Exec_ListenPreCommit(void)
 {
 	QueuePosition max;
+	QueuePosition head;
 	int i;
 
 	/*
@@ -949,6 +952,7 @@ Exec_ListenPreCommit(void)
 	 */
 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
 	max = QUEUE_TAIL;
+	head = QUEUE_HEAD;
 	/*
 	 * If there is over a page of notifications queued, then we should find
 	 * the maximum position of all backends connected to our database, to
@@ -957,7 +961,7 @@ Exec_ListenPreCommit(void)
 	 * a significant performance problem with listen when there is a long
 	 * running transaction
 	 */
-	if( QUEUE_POS_PAGE(max) < QUEUE_POS_PAGE(QUEUE_HEAD) )
+	if( QUEUE_POS_PAGE(max) < QUEUE_POS_PAGE(head) )
 	{
 		for (i = 1; i <= MaxBackends; i++)
 		{
@@ -983,7 +987,10 @@ Exec_ListenPreCommit(void)
 	 *
 	 * This will also advance the global tail pointer if possible.
 	 */
-	asyncQueueReadAllNotifications();
+	if (!QUEUE_POS_EQUAL(max,head))
+	{
+		asyncQueueReadAllNotificationsWorker(max,head);
+	}
 }
 
 /*
@@ -1732,23 +1739,14 @@ ProcessNotifyInterrupt(void)
 static void
 asyncQueueReadAllNotifications(void)
 {
-	volatile QueuePosition pos;
-	QueuePosition oldpos;
+	QueuePosition pos;
 	QueuePosition head;
-	bool		advanceTail;
-
-	/* page_buffer must be adequately aligned, so use a union */
-	union
-	{
-		char		buf[QUEUE_PAGESIZE];
-		AsyncQueueEntry align;
-	}			page_buffer;
 
 	/* Fetch current state */
 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
 	/* Assert checks that we have a valid state entry */
 	Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId));
-	pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
+	pos = QUEUE_BACKEND_POS(MyBackendId);
 	head = QUEUE_HEAD;
 	LWLockRelease(AsyncQueueLock);
 
@@ -1757,6 +1755,24 @@ asyncQueueReadAllNotifications(void)
 		/* Nothing to do, we have read all notifications already. */
 		return;
 	}
+	asyncQueueReadAllNotificationsWorker(pos, head);
+}
+
+static void
+asyncQueueReadAllNotificationsWorker(volatile QueuePosition pos,
+									 QueuePosition head)
+{
+	QueuePosition oldpos;
+	bool 		advanceTail;
+
+	/* page_buffer must be adequately aligned, so use a union */
+	union
+	{
+		char		buf[QUEUE_PAGESIZE];
+		AsyncQueueEntry align;
+	}			page_buffer;
+
+	oldpos = pos;
 
 	/*----------
 	 * Note that we deliver everything that we see in the queue and that


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic