[prev in list] [next in list] [prev in thread] [next in thread] 

List:       postgresql-general
Subject:    [HACKERS] shared memory message queues
From:       Robert Haas <robertmhaas () gmail ! com>
Date:       2013-10-31 16:21:31
Message-ID: CA+TgmobUe28JR3zRUDH7s0jkCcdxsw6dP4sLw57x9NnMf01wgg () mail ! gmail ! com
[Download RAW message or body]

Right now, it's pretty hard to write code that does anything useful
with dynamic shared memory.  Sure, you can allocate a dynamic shared
memory segment, and that's nice, but you won't get any help at all
figuring out what to store in it, or how to use it to communicate
effectively, which is not so nice.  And some of the services we offer
around the main shared memory segment are conspicuously missing for
dynamic shared memory.  The attached patches attempt to rectify some
of these problems.  If you're not the patient type who wants to read
the whole email, patch #3 is the cool part.

Patch #1, on-dsm-detach-v1.patch, adds the concept of on_dsm_detach
hooks.  These are basically like on_shmem_exit hooks, except that
detaching from a dsm can happen at any time, not just at backend exit.
 But they're needed for the same reasons: when we detach from the main
shared memory segment, we need to make sure that we've released all
relevant locks, returned our PGPROC to the pool, etc.  Dynamic shared
memory segments require the same sorts of cleanup when they contain
similarly complex data structures.  The part of this patch which I
suppose will elicit some controversy is that I've had to rearrange
on_shmem_exit a bit.  It turns out that during shmem_exit, we do
"user-level" cleanup, like aborting the transaction, first.  We expect
that will probably release all of our shared-memory resources.  Then,
just to make doubly sure, we do "low-level cleanup", where individual
modules return session-lifetime resources and make doubly sure that no
lwlocks, etc. have been leaked.  on_dsm_exit callbacks properly happen
in the middle, after we've tried to abort the transaction but before
the main shared memory segment is finally shut down.  I'm not sure
that the solution I've adopted here is optimal; see within for
details.

Patch #2, shm-toc-v1.patch, provides a facility for sizing a dynamic
shared memory segment before creation, and for dividing it up into
chunks after it's been created.  It therefore serves a function quite
similar to RequestAddinShmemSpace, except of course that there is only
one main shared memory segment created at postmaster startup time,
whereas new dynamic shared memory segments can come into existence on
the fly; and it serves even more conspicuously the function of
ShmemIndex, which enables backends to locate particular data
structures within the shared memory segment.  It is however quite a
bit simpler than the ShmemIndex mechanism: we don't need the same
level of extensibility here that we do for the main shared memory
segment, because a new extension need not piggyback on an existing
dynamic shared memory segment, but can create a whole segment of its
own.

Patch #3, shm-mq-v1.patch, is the heart of this series.  It creates an
infrastructure for sending and receiving messages of arbitrary length
using ring buffers stored in shared memory (presumably dynamic shared
memory, but hypothetically the main shared memory segment could be
used).  Queues are single-reader and single-writer; they use process
latches to implement waiting for the queue to fill (in the case of the
reader) or drain (in the case of the writer).  A non-blocking mode is
also available for situations where other options might lead to
deadlock.  Even without this patch, backends can write messages to a
dynamic shared memory segment and wait for some other backend to read
them, but unless you know exactly how much data you want to send
before you create the shared memory segment, and unless you don't mind
storing all of it for the lifetime of the segment, you'll quickly run
into non-trivial problems around memory reuse and synchronization.  So
this is an effort to create a higher-level infrastructure where one
process can simply declare that it wishes to a send series of messages
to a particular queue and another process can declare that it wishes
to read them out of that queue, and so it happens.

As far as parallelism is concerned, I anticipate that this code will
be useful for at least two purposes: (1) propagating errors that occur
inside a worker process back to the user backend that initiated the
parallel operation; and (2) streaming tuples from a worker performing
one part of the query (a scan or join, say) back to the user backend
or another worker performing a different part of the same query.  I
suspect that this code will find applications outside parallelism as
well.

Patch #4, test-shm-mq-v1.patch, is a demonstration of how to use the
various background worker and dynamic shared memory facilities
introduced over the course of the 9.4 release cycle, and the
facilities introduced by patches #1-#3 of this series, to actually do
something interesting.  Specifically, it sets up a ring of processes
connected by shared message queues and relays a user-specified message
around the ring repeatedly, then checks that it has the same message
at the end.  This is obviously just a demonstration, but I find it
pretty cool, because the code here demonstrates that, with all of
these facilities in place, setting up a bunch of workers and having
them talk to each other can be done using what is really a pretty
modest amount of code.  Importantly, this patch shows how to make the
start-up and shut-down sequences reliable, so that you don't end up
with the user backend hanging forever waiting for a worker that has
already died or will never start, or a worker backend waiting for a
user backend that has already aborted.  Review of this logic is
particularly appreciated, as it's proven to be pretty complex: I think
the solutions I've worked out here are generally good, but there may
still be holes to plug.  My hope is that people will take this test
code and use it as a basis for real applications.  Including this
patch in our distribution will also serve as a useful regression test
of dynamic background workers and dynamic shared memory, which has so
far been lacking.

Particular thanks are due to Noah Misch for serving as my constant
sounding board during the development of this patch series.

Thanks,

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

["on-dsm-detach-v1.patch" (text/x-patch)]

commit 5f688508a3dd576c008cfd6c8c0363d7a8d7ecae
Author: Robert Haas <rhaas@postgresql.org>
Date:   Tue Oct 1 11:39:52 2013 -0400

    Allow on-detach callbacks for shared memory segments.
    
    Just as backends must clean up their shared memory state (releasing
    lwlocks, buffer pins, etc.) before exiting, they must also perform
    any similar cleanups related to dynamic shared memory segments they
    have mapped.  So add a mechanism to ensure that.
    
    Existing on_shmem_exit hooks include both "user level" cleanup such
    as transaction abort and removal of leftover temporary relations and
    also "low level" cleanup that forcibly released leftover shared
    memory resources.  On-detach callbacks should run after the first
    group but before the second group, so split on_shmem_exit into early
    and late callbacks for that purpose.

diff --git a/contrib/pg_stat_statements/pg_stat_statements.c \
b/contrib/pg_stat_statements/pg_stat_statements.c index ea930af..770f499 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.c
+++ b/contrib/pg_stat_statements/pg_stat_statements.c
@@ -439,7 +439,7 @@ pgss_shmem_startup(void)
 	 * exit hook to dump the statistics to disk.
 	 */
 	if (!IsUnderPostmaster)
-		on_shmem_exit(pgss_shmem_shutdown, (Datum) 0);
+		on_shmem_exit(pgss_shmem_shutdown, (Datum) 0, SHMEM_EXIT_LATE);
 
 	/*
 	 * Attempt to load old statistics from the dump file, if this is the first
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index 2a1af63..515a420 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -403,7 +403,7 @@ AuxiliaryProcessMain(int argc, char *argv[])
 		InitBufferPoolBackend();
 
 		/* register a shutdown callback for LWLock cleanup */
-		on_shmem_exit(ShutdownAuxiliaryProcess, 0);
+		on_shmem_exit(ShutdownAuxiliaryProcess, 0, SHMEM_EXIT_EARLY);
 	}
 
 	/*
diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c
index 4434dd6..3b4e0ca 100644
--- a/src/backend/catalog/namespace.c
+++ b/src/backend/catalog/namespace.c
@@ -3681,7 +3681,7 @@ AtEOXact_Namespace(bool isCommit)
 	if (myTempNamespaceSubID != InvalidSubTransactionId)
 	{
 		if (isCommit)
-			on_shmem_exit(RemoveTempRelationsCallback, 0);
+			on_shmem_exit(RemoveTempRelationsCallback, 0, SHMEM_EXIT_EARLY);
 		else
 		{
 			myTempNamespace = InvalidOid;
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 6414291..f273c0f 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -921,7 +921,7 @@ Exec_ListenPreCommit(void)
 	 */
 	if (!unlistenExitRegistered)
 	{
-		on_shmem_exit(Async_UnlistenOnExit, 0);
+		on_shmem_exit(Async_UnlistenOnExit, 0, SHMEM_EXIT_EARLY);
 		unlistenExitRegistered = true;
 	}
 
diff --git a/src/backend/port/ipc_test.c b/src/backend/port/ipc_test.c
index 6020ef5..2e067ea 100644
--- a/src/backend/port/ipc_test.c
+++ b/src/backend/port/ipc_test.c
@@ -56,14 +56,19 @@ char	   *DataDir = ".";
 
 #define MAX_ON_EXITS 20
 
-static struct ONEXIT
+struct ONEXIT
 {
 	pg_on_exit_callback function;
 	Datum		arg;
-}	on_proc_exit_list[MAX_ON_EXITS], on_shmem_exit_list[MAX_ON_EXITS];
+};
+
+static struct ONEXIT on_proc_exit_list[MAX_ON_EXITS];
+static struct ONEXIT on_shmem_exit_early_list[MAX_ON_EXITS];
+static struct ONEXIT on_shmem_exit_late_list[MAX_ON_EXITS];
 
 static int	on_proc_exit_index,
-			on_shmem_exit_index;
+			on_shmem_exit_early_index,
+			on_shmem_exit_late_index;
 
 void
 proc_exit(int code)
@@ -78,28 +83,51 @@ proc_exit(int code)
 void
 shmem_exit(int code)
 {
-	while (--on_shmem_exit_index >= 0)
-		(*on_shmem_exit_list[on_shmem_exit_index].function) (code,
-								on_shmem_exit_list[on_shmem_exit_index].arg);
-	on_shmem_exit_index = 0;
+	while (--on_shmem_exit_early_index >= 0)
+		(*on_shmem_exit_early_list[on_shmem_exit_early_index].function) (code,
+					on_shmem_exit_early_list[on_shmem_exit_early_index].arg);
+	on_shmem_exit_early_index = 0;
+
+	while (--on_shmem_exit_late_index >= 0)
+		(*on_shmem_exit_late_list[on_shmem_exit_late_index].function) (code,
+					on_shmem_exit_late_list[on_shmem_exit_late_index].arg);
+	on_shmem_exit_late_index = 0;
 }
 
 void
-on_shmem_exit(pg_on_exit_callback function, Datum arg)
+on_shmem_exit(pg_on_exit_callback function, Datum arg, shmem_exit_phase phase)
 {
-	if (on_shmem_exit_index >= MAX_ON_EXITS)
-		elog(FATAL, "out of on_shmem_exit slots");
+	int	   *index;
+	struct ONEXIT *list;
+
+	if (phase == SHMEM_EXIT_EARLY)
+	{
+		index = &on_shmem_exit_early_index;
+		list = on_shmem_exit_early_list;
+	}
+	else
+	{
+		Assert(phase == SHMEM_EXIT_LATE);
+		index = &on_shmem_exit_late_index;
+		list = on_shmem_exit_late_list;
+	}
+
+	if (*index >= MAX_ON_EXITS)
+		ereport(FATAL,
+				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+				 errmsg_internal("out of on_shmem_exit slots")));
 
-	on_shmem_exit_list[on_shmem_exit_index].function = function;
-	on_shmem_exit_list[on_shmem_exit_index].arg = arg;
+	list[*index].function = function;
+	list[*index].arg = arg;
 
-	++on_shmem_exit_index;
+	++*index;
 }
 
 void
 on_exit_reset(void)
 {
-	on_shmem_exit_index = 0;
+	on_shmem_exit_early_index = 0;
+	on_shmem_exit_late_index = 0;
 	on_proc_exit_index = 0;
 }
 
diff --git a/src/backend/port/posix_sema.c b/src/backend/port/posix_sema.c
index 061fd2d..ff0bb64 100644
--- a/src/backend/port/posix_sema.c
+++ b/src/backend/port/posix_sema.c
@@ -160,7 +160,7 @@ PGReserveSemaphores(int maxSemas, int port)
 	maxSems = maxSemas;
 	nextSemKey = port * 1000;
 
-	on_shmem_exit(ReleaseSemaphores, 0);
+	on_shmem_exit(ReleaseSemaphores, 0, SHMEM_EXIT_LATE);
 }
 
 /*
diff --git a/src/backend/port/sysv_sema.c b/src/backend/port/sysv_sema.c
index 2988561..2e10934 100644
--- a/src/backend/port/sysv_sema.c
+++ b/src/backend/port/sysv_sema.c
@@ -296,7 +296,7 @@ PGReserveSemaphores(int maxSemas, int port)
 	nextSemaKey = port * 1000;
 	nextSemaNumber = SEMAS_PER_SET;		/* force sema set alloc on 1st call */
 
-	on_shmem_exit(ReleaseSemaphores, 0);
+	on_shmem_exit(ReleaseSemaphores, 0, SHMEM_EXIT_LATE);
 }
 
 /*
diff --git a/src/backend/port/sysv_shmem.c b/src/backend/port/sysv_shmem.c
index b604407..aa80765 100644
--- a/src/backend/port/sysv_shmem.c
+++ b/src/backend/port/sysv_shmem.c
@@ -165,7 +165,7 @@ InternalIpcMemoryCreate(IpcMemoryKey memKey, Size size)
 	}
 
 	/* Register on-exit routine to delete the new segment */
-	on_shmem_exit(IpcMemoryDelete, Int32GetDatum(shmid));
+	on_shmem_exit(IpcMemoryDelete, Int32GetDatum(shmid), SHMEM_EXIT_LATE);
 
 	/* OK, should be able to attach to the segment */
 	memAddress = shmat(shmid, NULL, PG_SHMAT_FLAGS);
@@ -174,7 +174,8 @@ InternalIpcMemoryCreate(IpcMemoryKey memKey, Size size)
 		elog(FATAL, "shmat(id=%d) failed: %m", shmid);
 
 	/* Register on-exit routine to detach new segment before deleting */
-	on_shmem_exit(IpcMemoryDetach, PointerGetDatum(memAddress));
+	on_shmem_exit(IpcMemoryDetach, PointerGetDatum(memAddress),
+				  SHMEM_EXIT_LATE);
 
 	/*
 	 * Store shmem key and ID in data directory lockfile.  Format to try to
diff --git a/src/backend/port/win32_sema.c b/src/backend/port/win32_sema.c
index dc5054b..3f02fea 100644
--- a/src/backend/port/win32_sema.c
+++ b/src/backend/port/win32_sema.c
@@ -41,7 +41,7 @@ PGReserveSemaphores(int maxSemas, int port)
 	numSems = 0;
 	maxSems = maxSemas;
 
-	on_shmem_exit(ReleaseSemaphores, 0);
+	on_shmem_exit(ReleaseSemaphores, 0, SHMEM_EXIT_LATE);
 }
 
 /*
diff --git a/src/backend/port/win32_shmem.c b/src/backend/port/win32_shmem.c
index 0db8e8f..28cb739 100644
--- a/src/backend/port/win32_shmem.c
+++ b/src/backend/port/win32_shmem.c
@@ -212,7 +212,8 @@ PGSharedMemoryCreate(Size size, bool makePrivate, int port)
 
 
 	/* Register on-exit routine to delete the new segment */
-	on_shmem_exit(pgwin32_SharedMemoryDelete, PointerGetDatum(hmap2));
+	on_shmem_exit(pgwin32_SharedMemoryDelete, PointerGetDatum(hmap2),
+				  SHMEM_EXIT_LATE);
 
 	/*
 	 * Get a pointer to the new shared memory segment. Map the whole segment
diff --git a/src/backend/postmaster/autovacuum.c \
b/src/backend/postmaster/autovacuum.c index 8c14d0f..6596fd3 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -1624,7 +1624,7 @@ AutoVacWorkerMain(int argc, char *argv[])
 		AutoVacuumShmem->av_startingWorker = NULL;
 		LWLockRelease(AutovacuumLock);
 
-		on_shmem_exit(FreeWorkerInfo, 0);
+		on_shmem_exit(FreeWorkerInfo, 0, SHMEM_EXIT_LATE);
 
 		/* wake up the launcher */
 		if (AutoVacuumShmem->av_launcherpid != 0)
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index b5ce2f6..2e2351e 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -2459,7 +2459,7 @@ pgstat_initialize(void)
 	MyBEEntry = &BackendStatusArray[MyBackendId - 1];
 
 	/* Set up a process-exit hook to clean up */
-	on_shmem_exit(pgstat_beshutdown_hook, 0);
+	on_shmem_exit(pgstat_beshutdown_hook, 0, SHMEM_EXIT_LATE);
 }
 
 /* ----------
diff --git a/src/backend/replication/walreceiver.c \
b/src/backend/replication/walreceiver.c index 413f0b9..ea1773f 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -250,7 +250,7 @@ WalReceiverMain(void)
 	SpinLockRelease(&walrcv->mutex);
 
 	/* Arrange to clean up at walreceiver exit */
-	on_shmem_exit(WalRcvDie, 0);
+	on_shmem_exit(WalRcvDie, 0, SHMEM_EXIT_LATE);
 
 	OwnLatch(&walrcv->latch);
 
diff --git a/src/backend/replication/walsender.c \
b/src/backend/replication/walsender.c index afd559d..a4505c2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1207,7 +1207,7 @@ InitWalSenderSlot(void)
 						max_wal_senders)));
 
 	/* Arrange to clean up at walsender exit */
-	on_shmem_exit(WalSndKill, 0);
+	on_shmem_exit(WalSndKill, 0, SHMEM_EXIT_LATE);
 }
 
 /* Destroy the per-walsender data structure for this walsender process */
diff --git a/src/backend/storage/buffer/bufmgr.c \
b/src/backend/storage/buffer/bufmgr.c index f848391..276e29e 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1741,7 +1741,7 @@ AtEOXact_Buffers(bool isCommit)
 void
 InitBufferPoolBackend(void)
 {
-	on_shmem_exit(AtProcExit_Buffers, 0);
+	on_shmem_exit(AtProcExit_Buffers, 0, SHMEM_EXIT_LATE);
 }
 
 /*
diff --git a/src/backend/storage/ipc/dsm.c b/src/backend/storage/ipc/dsm.c
index 4d7fb08..ce46795 100644
--- a/src/backend/storage/ipc/dsm.c
+++ b/src/backend/storage/ipc/dsm.c
@@ -58,6 +58,14 @@
 
 #define INVALID_CONTROL_SLOT		((uint32) -1)
 
+/* Backend-local tracking for on-detach callbacks. */
+typedef struct dsm_segment_detach_callback
+{
+	on_dsm_detach_callback	function;
+	Datum					arg;
+	slist_node				node;
+} dsm_segment_detach_callback;
+
 /* Backend-local state for a dynamic shared memory segment. */
 struct dsm_segment
 {
@@ -68,6 +76,7 @@ struct dsm_segment
 	void       *impl_private;		/* Implementation-specific private data. */
 	void	   *mapped_address;		/* Mapping address, or NULL if unmapped. */
 	Size		mapped_size;		/* Size of our mapping. */
+	slist_head	on_detach;			/* On-detach callbacks. */
 };
 
 /* Shared-memory state for a dynamic shared memory segment. */
@@ -91,7 +100,6 @@ static void dsm_cleanup_for_mmap(void);
 static bool dsm_read_state_file(dsm_handle *h);
 static void dsm_write_state_file(dsm_handle h);
 static void dsm_postmaster_shutdown(int code, Datum arg);
-static void dsm_backend_shutdown(int code, Datum arg);
 static dsm_segment *dsm_create_descriptor(void);
 static bool dsm_control_segment_sane(dsm_control_header *control,
 						 Size mapped_size);
@@ -191,7 +199,7 @@ dsm_postmaster_startup(void)
 			break;
 	}
 	dsm_control = dsm_control_address;
-	on_shmem_exit(dsm_postmaster_shutdown, 0);
+	on_shmem_exit(dsm_postmaster_shutdown, 0, SHMEM_EXIT_LATE);
 	elog(DEBUG2, "created dynamic shared memory control segment %u ("
 		UINT64_FORMAT " bytes)", dsm_control_handle, segsize);
 	dsm_write_state_file(dsm_control_handle);
@@ -552,9 +560,6 @@ dsm_backend_startup(void)
 	}
 #endif
 
-	/* Arrange to detach segments on exit. */
-	on_shmem_exit(dsm_backend_shutdown, 0);
-
 	dsm_init_done = true;
 }
 
@@ -714,8 +719,8 @@ dsm_attach(dsm_handle h)
 /*
  * At backend shutdown time, detach any segments that are still attached.
  */
-static void
-dsm_backend_shutdown(int code, Datum arg)
+void
+dsm_backend_shutdown(void)
 {
 	while (!dlist_is_empty(&dsm_segment_list))
 	{
@@ -771,6 +776,27 @@ void
 dsm_detach(dsm_segment *seg)
 {
 	/*
+	 * Invoke registered callbacks.  Just in case one of those callbacks
+	 * throws a further error that brings us back here, pop the callback
+	 * before invoking it, to avoid infinite error recursion.
+	 */
+	while (!slist_is_empty(&seg->on_detach))
+	{
+		slist_node *node;
+		dsm_segment_detach_callback *cb;
+		on_dsm_detach_callback	function;
+		Datum		arg;
+
+		node = slist_pop_head_node(&seg->on_detach);
+		cb = slist_container(dsm_segment_detach_callback, node, node);
+		function = cb->function;
+		arg = cb->arg;
+		pfree(cb);
+
+		function(seg, arg);
+	}
+
+	/*
 	 * Try to remove the mapping, if one exists.  Normally, there will be,
 	 * but maybe not, if we failed partway through a create or attach
 	 * operation.  We remove the mapping before decrementing the reference
@@ -912,6 +938,44 @@ dsm_segment_handle(dsm_segment *seg)
 }
 
 /*
+ * Register an on-detach callback for a dynamic shared memory segment.
+ */
+void
+on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
+{
+	dsm_segment_detach_callback *cb;
+
+	cb = MemoryContextAlloc(TopMemoryContext,
+							sizeof(dsm_segment_detach_callback));
+	cb->function = function;
+	cb->arg = arg;
+	slist_push_head(&seg->on_detach, &cb->node);
+}
+
+/*
+ * Unregister an on-detach callback for a dynamic shared memory segment.
+ */
+void
+on_dsm_detach_cancel(dsm_segment *seg, on_dsm_detach_callback function,
+					 Datum arg)
+{
+	slist_mutable_iter	iter;
+
+	slist_foreach_modify(iter, &seg->on_detach)
+	{
+		dsm_segment_detach_callback *cb;
+
+		cb = slist_container(dsm_segment_detach_callback, node, iter.cur);
+		if (cb->function == function && cb->arg == arg)
+		{
+			slist_delete_current(&iter);
+			pfree(cb);
+			break;
+		}
+	}
+}
+
+/*
  * Create a segment descriptor.
  */
 static dsm_segment *
@@ -933,6 +997,8 @@ dsm_create_descriptor(void)
 	seg->resowner = CurrentResourceOwner;
 	ResourceOwnerRememberDSM(CurrentResourceOwner, seg);
 
+	slist_init(&seg->on_detach);
+
 	return seg;
 }
 
diff --git a/src/backend/storage/ipc/ipc.c b/src/backend/storage/ipc/ipc.c
index c339e9c..ae42366 100644
--- a/src/backend/storage/ipc/ipc.c
+++ b/src/backend/storage/ipc/ipc.c
@@ -27,6 +27,7 @@
 #ifdef PROFILE_PID_DIR
 #include "postmaster/autovacuum.h"
 #endif
+#include "storage/dsm.h"
 #include "storage/ipc.h"
 #include "tcop/tcopprot.h"
 
@@ -64,14 +65,19 @@ static void proc_exit_prepare(int code);
 
 #define MAX_ON_EXITS 20
 
-static struct ONEXIT
+struct ONEXIT
 {
 	pg_on_exit_callback function;
 	Datum		arg;
-}	on_proc_exit_list[MAX_ON_EXITS], on_shmem_exit_list[MAX_ON_EXITS];
+};
+
+static struct ONEXIT on_proc_exit_list[MAX_ON_EXITS];
+static struct ONEXIT on_shmem_exit_early_list[MAX_ON_EXITS];
+static struct ONEXIT on_shmem_exit_late_list[MAX_ON_EXITS];
 
 static int	on_proc_exit_index,
-			on_shmem_exit_index;
+			on_shmem_exit_early_index,
+			on_shmem_exit_late_index;
 
 
 /* ----------------------------------------------------------------
@@ -202,26 +208,61 @@ proc_exit_prepare(int code)
 /* ------------------
  * Run all of the on_shmem_exit routines --- but don't actually exit.
  * This is used by the postmaster to re-initialize shared memory and
- * semaphores after a backend dies horribly.
+ * semaphores after a backend dies horribly.  As with proc_exit(), we
+ * remove each callback from the list before calling it, to avoid
+ * infinite loop in case of error.
  * ------------------
  */
 void
 shmem_exit(int code)
 {
-	elog(DEBUG3, "shmem_exit(%d): %d callbacks to make",
-		 code, on_shmem_exit_index);
+	/*
+	 * Call early callbacks.
+	 *
+	 * These are generally things that need most of the system to still be
+	 * up and working, such as cleanup of temp relations, which requires
+	 * catalog access; or things that need to be completed because later
+	 * cleanup steps depend on them, such as releasing lwlocks.
+	 */
+	elog(DEBUG3, "shmem_exit(%d): %d early callbacks to make",
+		 code, on_shmem_exit_early_index);
+	while (--on_shmem_exit_early_index >= 0)
+		(*on_shmem_exit_early_list[on_shmem_exit_early_index].function) (code,
+					on_shmem_exit_early_list[on_shmem_exit_early_index].arg);
+	on_shmem_exit_early_index = 0;
 
 	/*
-	 * call all the registered callbacks.
+	 * Call dynamic shared memory callbacks.
+	 *
+	 * These serve the same purpose as late callbacks, but for dynamic shared
+	 * memory segments rather than the main shared memory segment.
+	 * dsm_backend_shutdown() has the same kind of progressive logic we use
+	 * for the main shared memory segment; namely, it unregisters each
+	 * callback before invoking it, so that we don't get stuck in an infinite
+	 * loop if one of those callbacks itself throws an ERROR or FATAL.
 	 *
-	 * As with proc_exit(), we remove each callback from the list before
-	 * calling it, to avoid infinite loop in case of error.
+	 * Note that explicitly calling this function here is quite different
+	 * from registering it as an on_shmem_exit callback for precisely this
+	 * reason: if one dynamic shared memory callback errors out, the remaining
+	 * callbacks will still be invoked.  Thus, hard-coding this call puts it
+	 * equal footing with callbacks for the main shared memory segment.
 	 */
-	while (--on_shmem_exit_index >= 0)
-		(*on_shmem_exit_list[on_shmem_exit_index].function) (code,
-								on_shmem_exit_list[on_shmem_exit_index].arg);
+	dsm_backend_shutdown();
 
-	on_shmem_exit_index = 0;
+	/*
+	 * Call late callbacks.
+	 *
+	 * These are generally releasing low-level shared memory resources.  In
+	 * some cases, this is a backstop against the possibility that the early
+	 * callbacks might themselves fail, leading to re-entry to this routine;
+	 * in other cases, it's cleanup that only happens at process exit.
+	 */
+	elog(DEBUG3, "shmem_exit(%d): %d late callbacks to make",
+		 code, on_shmem_exit_late_index);
+	while (--on_shmem_exit_late_index >= 0)
+		(*on_shmem_exit_late_list[on_shmem_exit_late_index].function) (code,
+					on_shmem_exit_late_list[on_shmem_exit_late_index].arg);
+	on_shmem_exit_late_index = 0;
 }
 
 /* ----------------------------------------------------------------
@@ -277,17 +318,32 @@ on_proc_exit(pg_on_exit_callback function, Datum arg)
  * ----------------------------------------------------------------
  */
 void
-on_shmem_exit(pg_on_exit_callback function, Datum arg)
+on_shmem_exit(pg_on_exit_callback function, Datum arg, shmem_exit_phase phase)
 {
-	if (on_shmem_exit_index >= MAX_ON_EXITS)
+	int	   *index;
+	struct ONEXIT *list;
+
+	if (phase == SHMEM_EXIT_EARLY)
+	{
+		index = &on_shmem_exit_early_index;
+		list = on_shmem_exit_early_list;
+	}
+	else
+	{
+		Assert(phase == SHMEM_EXIT_LATE);
+		index = &on_shmem_exit_late_index;
+		list = on_shmem_exit_late_list;
+	}
+
+	if (*index >= MAX_ON_EXITS)
 		ereport(FATAL,
 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
 				 errmsg_internal("out of on_shmem_exit slots")));
 
-	on_shmem_exit_list[on_shmem_exit_index].function = function;
-	on_shmem_exit_list[on_shmem_exit_index].arg = arg;
+	list[*index].function = function;
+	list[*index].arg = arg;
 
-	++on_shmem_exit_index;
+	++*index;
 
 	if (!atexit_callback_setup)
 	{
@@ -306,12 +362,28 @@ on_shmem_exit(pg_on_exit_callback function, Datum arg)
  * ----------------------------------------------------------------
  */
 void
-cancel_shmem_exit(pg_on_exit_callback function, Datum arg)
+cancel_shmem_exit(pg_on_exit_callback function, Datum arg,
+				  shmem_exit_phase phase)
 {
-	if (on_shmem_exit_index > 0 &&
-		on_shmem_exit_list[on_shmem_exit_index - 1].function == function &&
-		on_shmem_exit_list[on_shmem_exit_index - 1].arg == arg)
-		--on_shmem_exit_index;
+	int	   *index;
+	struct ONEXIT *list;
+
+	if (phase == SHMEM_EXIT_EARLY)
+	{
+		index = &on_shmem_exit_early_index;
+		list = on_shmem_exit_early_list;
+	}
+	else
+	{
+		Assert(phase == SHMEM_EXIT_LATE);
+		index = &on_shmem_exit_late_index;
+		list = on_shmem_exit_late_list;
+	}
+
+	if (*index > 0 &&
+		list[*index - 1].function == function &&
+		list[*index - 1].arg == arg)
+		--*index;
 }
 
 /* ----------------------------------------------------------------
@@ -326,6 +398,7 @@ cancel_shmem_exit(pg_on_exit_callback function, Datum arg)
 void
 on_exit_reset(void)
 {
-	on_shmem_exit_index = 0;
+	on_shmem_exit_early_index = 0;
+	on_shmem_exit_late_index = 0;
 	on_proc_exit_index = 0;
 }
diff --git a/src/backend/storage/ipc/procsignal.c \
b/src/backend/storage/ipc/procsignal.c index c4b5d01..2533bf1 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -131,7 +131,8 @@ ProcSignalInit(int pss_idx)
 	MyProcSignalSlot = slot;
 
 	/* Set up to release the slot on process exit */
-	on_shmem_exit(CleanupProcSignalState, Int32GetDatum(pss_idx));
+	on_shmem_exit(CleanupProcSignalState, Int32GetDatum(pss_idx),
+				  SHMEM_EXIT_LATE);
 }
 
 /*
diff --git a/src/backend/storage/ipc/sinvaladt.c \
b/src/backend/storage/ipc/sinvaladt.c index 44d02c5..aaf958a 100644
--- a/src/backend/storage/ipc/sinvaladt.c
+++ b/src/backend/storage/ipc/sinvaladt.c
@@ -325,7 +325,8 @@ SharedInvalBackendInit(bool sendOnly)
 	LWLockRelease(SInvalWriteLock);
 
 	/* register exit routine to mark my entry inactive at exit */
-	on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
+	on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP),
+				  SHMEM_EXIT_LATE);
 
 	elog(DEBUG4, "my backend ID is %d", MyBackendId);
 }
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 4f88d3f..b1af5c6 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -138,7 +138,7 @@ init_lwlock_stats(void)
 	spin_delay_counts = calloc(numLocks, sizeof(int));
 	block_counts = calloc(numLocks, sizeof(int));
 	counts_for_pid = MyProcPid;
-	on_shmem_exit(print_lwlock_stats, 0);
+	on_shmem_exit(print_lwlock_stats, 0, SHMEM_EXIT_LATE);
 }
 
 static void
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 222251d..8dffea9 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -406,7 +406,7 @@ InitProcess(void)
 	/*
 	 * Arrange to clean up at backend exit.
 	 */
-	on_shmem_exit(ProcKill, 0);
+	on_shmem_exit(ProcKill, 0, SHMEM_EXIT_LATE);
 
 	/*
 	 * Now that we have a PGPROC, we could try to acquire locks, so initialize
@@ -435,7 +435,7 @@ InitProcessPhase2(void)
 	/*
 	 * Arrange to clean that up at backend exit.
 	 */
-	on_shmem_exit(RemoveProcFromArray, 0);
+	on_shmem_exit(RemoveProcFromArray, 0, SHMEM_EXIT_LATE);
 }
 
 /*
@@ -563,7 +563,8 @@ InitAuxiliaryProcess(void)
 	/*
 	 * Arrange to clean up at process exit.
 	 */
-	on_shmem_exit(AuxiliaryProcKill, Int32GetDatum(proctype));
+	on_shmem_exit(AuxiliaryProcKill, Int32GetDatum(proctype),
+				  SHMEM_EXIT_LATE);
 }
 
 /*
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 2c7f0f1..97a7d49 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -560,7 +560,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char \
                *username,
 		 * down at exit.
 		 */
 		StartupXLOG();
-		on_shmem_exit(ShutdownXLOG, 0);
+		on_shmem_exit(ShutdownXLOG, 0, SHMEM_EXIT_LATE);
 	}
 
 	/*
@@ -587,15 +587,14 @@ InitPostgres(const char *in_dbname, Oid dboid, const char \
*username,  RelationCacheInitializePhase2();
 
 	/*
-	 * Set up process-exit callback to do pre-shutdown cleanup.  This has to
-	 * be after we've initialized all the low-level modules like the buffer
-	 * manager, because during shutdown this has to run before the low-level
-	 * modules start to close down.  On the other hand, we want it in place
-	 * before we begin our first transaction --- if we fail during the
-	 * initialization transaction, as is entirely possible, we need the
-	 * AbortTransaction call to clean up.
+	 * Set up process-exit callback to do pre-shutdown cleanup.  This is the
+	 * first callback we register as SHMEM_EXIT_EARLY; thus, this will be the
+	 * last thing we do before low-level modules like the buffer manager begin
+	 * to close down.  We need to have this in place before we begin our first
+	 * transaction --- if we fail during the initialization transaction, as is
+	 * entirely possible, we need the AbortTransaction call to clean up.
 	 */
-	on_shmem_exit(ShutdownPostgres, 0);
+	on_shmem_exit(ShutdownPostgres, 0, SHMEM_EXIT_EARLY);
 
 	/* The autovacuum launcher is done here */
 	if (IsAutoVacuumLauncherProcess())
diff --git a/src/include/storage/dsm.h b/src/include/storage/dsm.h
index d906eba..5c7c981 100644
--- a/src/include/storage/dsm.h
+++ b/src/include/storage/dsm.h
@@ -17,8 +17,9 @@
 
 typedef struct dsm_segment dsm_segment;
 
-/* Initialization function. */
+/* Startup and shutdown functions. */
 extern void dsm_postmaster_startup(void);
+extern void dsm_backend_shutdown(void);
 
 /* Functions that create, update, or remove mappings. */
 extern dsm_segment *dsm_create(Size size);
@@ -36,4 +37,11 @@ extern void *dsm_segment_address(dsm_segment *seg);
 extern Size dsm_segment_map_length(dsm_segment *seg);
 extern dsm_handle dsm_segment_handle(dsm_segment *seg);
 
+/* Cleanup hooks. */
+typedef void (*on_dsm_detach_callback) (dsm_segment *, Datum arg);
+extern void on_dsm_detach(dsm_segment *seg,
+			  on_dsm_detach_callback function, Datum arg);
+extern void on_dsm_detach_cancel(dsm_segment *seg,
+			  on_dsm_detach_callback function, Datum arg);
+
 #endif   /* DSM_H */
diff --git a/src/include/storage/ipc.h b/src/include/storage/ipc.h
index ac4ac66..4989abb 100644
--- a/src/include/storage/ipc.h
+++ b/src/include/storage/ipc.h
@@ -21,6 +21,12 @@
 typedef void (*pg_on_exit_callback) (int code, Datum arg);
 typedef void (*shmem_startup_hook_type) (void);
 
+typedef enum
+{
+	SHMEM_EXIT_EARLY,		/* user-level cleanup and transaction abort */
+	SHMEM_EXIT_LATE			/* low-level subsystem shutdown */
+} shmem_exit_phase;
+
 /*----------
  * API for handling cleanup that must occur during either ereport(ERROR)
  * or ereport(FATAL) exits from a block of code.  (Typical examples are
@@ -46,14 +52,14 @@ typedef void (*shmem_startup_hook_type) (void);
  */
 #define PG_ENSURE_ERROR_CLEANUP(cleanup_function, arg)	\
 	do { \
-		on_shmem_exit(cleanup_function, arg); \
+		on_shmem_exit(cleanup_function, arg, SHMEM_EXIT_EARLY); \
 		PG_TRY()
 
 #define PG_END_ENSURE_ERROR_CLEANUP(cleanup_function, arg)	\
-		cancel_shmem_exit(cleanup_function, arg); \
+		cancel_shmem_exit(cleanup_function, arg, SHMEM_EXIT_EARLY); \
 		PG_CATCH(); \
 		{ \
-			cancel_shmem_exit(cleanup_function, arg); \
+			cancel_shmem_exit(cleanup_function, arg, SHMEM_EXIT_EARLY); \
 			cleanup_function (0, arg); \
 			PG_RE_THROW(); \
 		} \
@@ -67,8 +73,10 @@ extern bool proc_exit_inprogress;
 extern void proc_exit(int code) __attribute__((noreturn));
 extern void shmem_exit(int code);
 extern void on_proc_exit(pg_on_exit_callback function, Datum arg);
-extern void on_shmem_exit(pg_on_exit_callback function, Datum arg);
-extern void cancel_shmem_exit(pg_on_exit_callback function, Datum arg);
+extern void on_shmem_exit(pg_on_exit_callback function, Datum arg,
+			  shmem_exit_phase phase);
+extern void cancel_shmem_exit(pg_on_exit_callback function, Datum arg,
+				  shmem_exit_phase phase);
 extern void on_exit_reset(void);
 
 /* ipci.c */


["shm-toc-v1.patch" (text/x-patch)]

commit c5a68681270d6e705e1e46ddc88c7e368188b86b
Author: Robert Haas <rhaas@postgresql.org>
Date:   Sat Aug 17 22:04:13 2013 -0400

    Simple table of contents for a shared memory segment.
    
    This interface is intended to make it simple to divide a dynamic shared
    memory segment into different regions with distinct purposes.  It
    therefore serves much the same purpose that ShmemIndex accomplishes for
    the main shared memory segment, but it is intended to be more
    lightweight.

diff --git a/src/backend/storage/ipc/Makefile b/src/backend/storage/ipc/Makefile
index 873dd60..df0a49e 100644
--- a/src/backend/storage/ipc/Makefile
+++ b/src/backend/storage/ipc/Makefile
@@ -16,6 +16,6 @@ endif
 endif
 
 OBJS = dsm_impl.o dsm.o ipc.o ipci.o pmsignal.o procarray.o procsignal.o \
-	shmem.o shmqueue.o sinval.o sinvaladt.o standby.o
+	shmem.o shmqueue.o shm_toc.o sinval.o sinvaladt.o standby.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/ipc/shm_toc.c b/src/backend/storage/ipc/shm_toc.c
new file mode 100644
index 0000000..7c615d8
--- /dev/null
+++ b/src/backend/storage/ipc/shm_toc.c
@@ -0,0 +1,239 @@
+/*-------------------------------------------------------------------------
+ *
+ * shm_toc.c
+ *	  shared memory segment table of contents
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/shm_toc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "storage/barrier.h"
+#include "storage/shm_toc.h"
+#include "storage/spin.h"
+
+typedef struct shm_toc_entry
+{
+	uint64		key;					/* Arbitrary identifier */
+	uint64		offset;					/* Bytes offset */
+} shm_toc_entry;
+
+struct shm_toc
+{
+	uint64		toc_magic;				/* Magic number for this TOC */
+	slock_t		toc_mutex;				/* Spinlock for mutual exclusion */
+	Size		toc_total_bytes;		/* Bytes managed by this TOC */
+	Size		toc_allocated_bytes;	/* Bytes allocated of those managed */
+	Size		toc_nentry;				/* Number of entries in TOC */
+	shm_toc_entry toc_entry[FLEXIBLE_ARRAY_MEMBER];
+};
+
+/*
+ * Initialize a region of shared memory with a table of contents.
+ */
+shm_toc *
+shm_toc_create(uint64 magic, void *address, Size nbytes)
+{
+	shm_toc	   *toc = (shm_toc *) address;
+
+	Assert(nbytes > offsetof(shm_toc, toc_entry));
+	toc->toc_magic = magic;
+	SpinLockInit(&toc->toc_mutex);
+	toc->toc_total_bytes = nbytes;
+	toc->toc_allocated_bytes = 0;
+	toc->toc_nentry = 0;
+
+	return toc;
+}
+
+/*
+ * Attach to an existing table of contents.  If the magic number found at
+ * the target address doesn't match our expectations, returns NULL.
+ */
+extern shm_toc *
+shm_toc_attach(uint64 magic, void *address)
+{
+	shm_toc	   *toc = (shm_toc *) address;
+
+	if (toc->toc_magic != magic)
+		return NULL;
+
+	Assert(toc->toc_total_bytes >= toc->toc_allocated_bytes);
+	Assert(toc->toc_total_bytes >= offsetof(shm_toc, toc_entry));
+
+	return toc;
+}
+
+/*
+ * Allocate shared memory from a segment managed by a table of contents.
+ *
+ * This is not a full-blown allocator; there's no way to free memory.  It's
+ * just a way of dividing a single physical shared memory segment into logical
+ * chunks that may be used for different purposes.
+ *
+ * We allocated backwards from the end of the segment, so that the TOC entries
+ * can grow forward from the start of the segment.
+ */
+extern void *
+shm_toc_allocate(shm_toc *toc, Size nbytes)
+{
+	volatile shm_toc *vtoc = toc;
+	Size		total_bytes;
+	Size		allocated_bytes;
+	Size		nentry;
+	Size		toc_bytes;
+
+	/* Make sure request is well-aligned. */
+	nbytes = BUFFERALIGN(nbytes);
+
+	SpinLockAcquire(&toc->toc_mutex);
+
+	total_bytes = vtoc->toc_total_bytes;
+	allocated_bytes = vtoc->toc_allocated_bytes;
+	nentry = vtoc->toc_nentry;
+	toc_bytes = offsetof(shm_toc, toc_entry) + nentry * sizeof(shm_toc_entry)
+		+ allocated_bytes;
+
+	/* Check for memory exhaustion and overflow. */
+	if (toc_bytes + nbytes > total_bytes || toc_bytes + nbytes < toc_bytes)
+	{
+		SpinLockRelease(&toc->toc_mutex);
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of shared memory")));
+	}
+	vtoc->toc_allocated_bytes += nbytes;
+
+	SpinLockRelease(&toc->toc_mutex);
+
+	return ((char *) toc) + (total_bytes - allocated_bytes - nbytes);
+}
+
+/*
+ * Return the number of bytes that can still be allocated.
+ */
+extern Size
+shm_toc_freespace(shm_toc *toc)
+{
+	volatile shm_toc *vtoc = toc;
+	Size		total_bytes;
+	Size		allocated_bytes;
+	Size		nentry;
+	Size		toc_bytes;
+
+	SpinLockAcquire(&toc->toc_mutex);
+	total_bytes = vtoc->toc_total_bytes;
+	allocated_bytes = vtoc->toc_allocated_bytes;
+	nentry = vtoc->toc_nentry;
+	SpinLockRelease(&toc->toc_mutex);
+
+	toc_bytes = offsetof(shm_toc, toc_entry) + nentry * sizeof(shm_toc_entry);
+	Assert(allocated_bytes + BUFFERALIGN(toc_bytes) <= total_bytes);
+	return total_bytes - (allocated_bytes + BUFFERALIGN(toc_bytes));
+}
+
+/*
+ * Insert a TOC entry.
+ *
+ * The idea here is that process setting up the shared memory segment will
+ * register the addresses of data structures within the segment using this
+ * function.  Each data structure will be identified using a 64-bit key, which
+ * is assumed to be a well-known or discoverable integer.  Other processes
+ * accessing the shared memory segment can pass the same key to
+ * shm_toc_lookup() to discover the addresses of those data structures.
+ *
+ * Since the shared memory segment may be mapped at different addresses within
+ * different backends, we store relative rather than absolute pointers.
+ */
+void
+shm_toc_insert(shm_toc *toc, uint64 key, void *address)
+{
+	volatile shm_toc *vtoc = toc;
+	uint64		total_bytes;
+	uint64		allocated_bytes;
+	uint64		nentry;
+	uint64		toc_bytes;
+	uint64		offset;
+
+	/* Relativize pointer. */
+	Assert(address > (void *) toc);
+	offset = ((char *) address) - (char *) toc;
+
+	SpinLockAcquire(&toc->toc_mutex);
+
+	total_bytes = vtoc->toc_total_bytes;
+	allocated_bytes = vtoc->toc_allocated_bytes;
+	nentry = vtoc->toc_nentry;
+	toc_bytes = offsetof(shm_toc, toc_entry) + nentry * sizeof(shm_toc_entry)
+		+ allocated_bytes;
+
+	/* Check for memory exhaustion and overflow. */
+	if (toc_bytes + sizeof(shm_toc_entry) > total_bytes ||
+		toc_bytes + sizeof(shm_toc_entry) < toc_bytes)
+	{
+		SpinLockRelease(&toc->toc_mutex);
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of shared memory")));
+	}
+
+	Assert(offset < total_bytes);
+	vtoc->toc_entry[nentry].key = key;
+	vtoc->toc_entry[nentry].offset = offset;
+
+	/*
+	 * By placing a write barrier after filling in the entry and before
+	 * updating the number of entries, we make it safe to read the TOC
+	 * unlocked.
+	 */
+	pg_write_barrier();
+
+	vtoc->toc_nentry++;
+
+	SpinLockRelease(&toc->toc_mutex);
+}
+
+/*
+ * Look up a TOC entry.
+ *
+ * Unlike the other functions in this file, this operation acquires no lock;
+ * it uses only barriers.  It probably wouldn't hurt concurrency very much even
+ * if it did get a lock, but since it's reasonably likely that a group of
+ * worker processes could each read a series of entries from the same TOC
+ * right around the same time, there seems to be some value in avoiding it.
+ */
+void *
+shm_toc_lookup(shm_toc *toc, uint64 key)
+{
+	uint64		nentry;
+	uint64		i;
+
+	/* Read the number of entries before we examine any entry. */
+	nentry = toc->toc_nentry;
+	pg_read_barrier();
+
+	/* Now search for a matching entry. */
+	for (i = 0; i < nentry; ++i)
+		if (toc->toc_entry[i].key == key)
+			return ((char *) toc) + toc->toc_entry[i].offset;
+
+	/* No matching entry was found. */
+	return NULL;
+}
+
+/*
+ * Estimate how much shared memory will be required to store a TOC and its
+ * dependent data structures.
+ */
+Size
+shm_toc_estimate(shm_toc_estimator *e)
+{
+	return add_size(offsetof(shm_toc, toc_entry),
+			   add_size(mul_size(e->number_of_keys, sizeof(shm_toc_entry)),
+						e->space_for_chunks));
+}
diff --git a/src/include/storage/shm_toc.h b/src/include/storage/shm_toc.h
new file mode 100644
index 0000000..49259ec
--- /dev/null
+++ b/src/include/storage/shm_toc.h
@@ -0,0 +1,48 @@
+/*-------------------------------------------------------------------------
+ *
+ * shm_toc.h
+ *	  shared memory segment table of contents
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/shm_toc.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SHM_TOC_H
+#define SHM_TOC_H
+
+#include "storage/shmem.h"
+
+struct shm_toc;
+typedef struct shm_toc shm_toc;
+
+extern shm_toc *shm_toc_create(uint64 magic, void *address, Size nbytes);
+extern shm_toc *shm_toc_attach(uint64 magic, void *address);
+extern void *shm_toc_allocate(shm_toc *toc, Size nbytes);
+extern Size shm_toc_freespace(shm_toc *toc);
+extern void shm_toc_insert(shm_toc *toc, uint64 key, void *address);
+extern void *shm_toc_lookup(shm_toc *toc, uint64 key);
+
+/*
+ * Tools for estimating how large a chunk of shared memory will be needed
+ * to store a TOC and its dependent objects.
+ */
+typedef struct
+{
+	Size	space_for_chunks;
+	Size	number_of_keys;
+} shm_toc_estimator;
+
+#define shm_toc_initialize_estimator(e) \
+	((e)->space_for_chunks = 0, (e)->number_of_keys = 0)
+#define shm_toc_estimate_chunk(e, sz) \
+	((e)->space_for_chunks = add_size((e)->space_for_chunks, \
+		BUFFERALIGN((sz))))
+#define shm_toc_estimate_keys(e, cnt) \
+	((e)->number_of_keys = add_size((e)->number_of_keys, (cnt)))
+
+extern Size shm_toc_estimate(shm_toc_estimator *);
+
+#endif   /* SHM_TOC_H */

["shm-mq-v1.patch" (text/x-patch)]

commit 558529ce2bb6d964fbbf64676548d67450b8bcba
Author: Robert Haas <rhaas@postgresql.org>
Date:   Tue Aug 27 14:23:30 2013 -0400

    Single-reader, single-writer, lightweight shared message queue.
    
    This code provides infrastructure for user backends to communicate
    relatively easily with background workers.  The message queue is
    structured as a ring buffer and allows messages of arbitary length
    to be sent and received.

diff --git a/src/backend/storage/ipc/Makefile b/src/backend/storage/ipc/Makefile
index df0a49e..850347c 100644
--- a/src/backend/storage/ipc/Makefile
+++ b/src/backend/storage/ipc/Makefile
@@ -16,6 +16,6 @@ endif
 endif
 
 OBJS = dsm_impl.o dsm.o ipc.o ipci.o pmsignal.o procarray.o procsignal.o \
-	shmem.o shmqueue.o shm_toc.o sinval.o sinvaladt.o standby.o
+	shmem.o shmqueue.o shm_mq.o shm_toc.o sinval.o sinvaladt.o standby.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
new file mode 100644
index 0000000..28b6d6e
--- /dev/null
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -0,0 +1,855 @@
+/*-------------------------------------------------------------------------
+ *
+ * shm_mq.c
+ *	  single-reader, single-writer shared memory message queue
+ *
+ * Both the sender and the receiver must have a PGPROC; their respective
+ * process latches are used for synchronization.  Only the sender may send,
+ * and only the receiver may receive.  This is intended to allow a user
+ * backend to communicate with worker backends that it has registered.
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/shm_mq.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "postmaster/bgworker.h"
+#include "storage/procsignal.h"
+#include "storage/shm_mq.h"
+#include "storage/spin.h"
+
+struct shm_mq
+{
+	slock_t		mq_mutex;
+	PGPROC	   *mq_receiver;
+	PGPROC	   *mq_sender;
+	uint64		mq_bytes_read;
+	uint64		mq_bytes_written;
+	uint64		mq_ring_size;
+	bool		mq_detached;
+	uint8		mq_ring_offset;
+	char		mq_ring[FLEXIBLE_ARRAY_MEMBER];
+};
+
+struct shm_mq_handle
+{
+	shm_mq	   *mqh_queue;
+	dsm_segment *mqh_segment;
+	char	   *mqh_buffer;
+	BackgroundWorkerHandle *mqh_handle;
+	uint64		mqh_buflen;
+	uint64		mqh_consume_pending;
+	uint64		mqh_partial_message_bytes;
+	uint64		mqh_expected_bytes;
+	bool		mqh_did_length_word;
+	bool		mqh_counterparty_attached;
+	MemoryContext mqh_context;
+};
+
+static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, uint64 nbytes,
+				  void *data, bool nowait, uint64 *bytes_written);
+static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, uint64 bytes_needed,
+					 bool nowait, uint64 *nbytesp, void **datap);
+static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC * volatile *ptr,
+					 BackgroundWorkerHandle *handle);
+static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
+static void shm_mq_inc_bytes_read(volatile shm_mq *mq, uint64 n);
+static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
+static void shm_mq_inc_bytes_written(volatile shm_mq *mq, uint64 n);
+static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
+static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
+
+/* Minimum queue size is enough for header and at least one chunk of data. */
+Size shm_mq_minimum_size =
+	MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
+
+#define MQH_BUFSIZE				8192
+
+/*
+ * Initialize a new shared message queue.
+ */
+shm_mq *
+shm_mq_create(void *address, Size size)
+{
+	shm_mq	   *mq = address;
+	uint64		data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
+
+	/* If the size isn't MAXALIGN'd, just discard the odd bytes. */
+	size = MAXALIGN_DOWN(size);
+
+	/* Queue size must be large enough to hold some data. */
+	Assert(size > data_offset);
+
+	/* Initialize queue header. */
+	mq->mq_receiver = NULL;
+	mq->mq_sender = NULL;
+	mq->mq_bytes_read = 0;
+	mq->mq_bytes_written = 0;
+	mq->mq_ring_size = size - data_offset;
+	mq->mq_detached = false;
+	mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
+
+	return mq;
+}
+
+/*
+ * Set the identity of the process that will receive from a shared message
+ * queue.
+ */
+void
+shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
+{
+	volatile shm_mq *vmq = mq;
+	PGPROC   *sender;
+
+	SpinLockAcquire(&mq->mq_mutex);
+	Assert(vmq->mq_receiver == NULL);
+	vmq->mq_receiver = proc;
+	sender = vmq->mq_sender;
+	SpinLockRelease(&mq->mq_mutex);
+
+	if (sender != NULL)
+		SetLatch(&sender->procLatch);
+}
+
+/*
+ * Set the identity of the process that will send to a shared message queue.
+ */
+void
+shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
+{
+	volatile shm_mq *vmq = mq;
+	PGPROC   *receiver;
+
+	SpinLockAcquire(&mq->mq_mutex);
+	Assert(vmq->mq_sender == NULL);
+	vmq->mq_sender = proc;
+	receiver = vmq->mq_receiver;
+	SpinLockRelease(&mq->mq_mutex);
+
+	if (receiver != NULL)
+		SetLatch(&receiver->procLatch);
+}
+
+/*
+ * Get the configured receiver.
+ */
+PGPROC *
+shm_mq_get_receiver(shm_mq *mq)
+{
+	volatile shm_mq *vmq = mq;
+	PGPROC   *receiver;
+
+	SpinLockAcquire(&mq->mq_mutex);
+	receiver = vmq->mq_receiver;
+	SpinLockRelease(&mq->mq_mutex);
+
+	return receiver;
+}
+
+/*
+ * Get the configured sender.
+ */
+PGPROC *
+shm_mq_get_sender(shm_mq *mq)
+{
+	volatile shm_mq *vmq = mq;
+	PGPROC   *sender;
+
+	SpinLockAcquire(&mq->mq_mutex);
+	sender = vmq->mq_sender;
+	SpinLockRelease(&mq->mq_mutex);
+
+	return sender;
+}
+
+/*
+ * Attach to a shared message queue so we can send or receive messages.
+ *
+ * The memory context in effect at the time this function is called should
+ * be one which will last for at least as long as the message queue itself.
+ *
+ * If seg != NULL, the queue will be automatically detached when that dynamic
+ * shared memory segment is detached.
+ *
+ * If handle != NULL, the queue can be read or written even before the
+ * other process has attached.  We'll wait for it to do so if needed.  The
+ * handle must be for a background worker initialized with bgw_notify_pid
+ * equal to our PID.
+ *
+ * shm_mq_detach() should be called when done.  This will free the
+ * shm_mq_handle and mark the queue itself as detached, so that our
+ * counterpart won't get stuck waiting for us to fill or drain the queue
+ * after we've already lost interest.
+ */
+shm_mq_handle *
+shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
+{
+	shm_mq_handle	   *mqh = palloc(sizeof(shm_mq_handle));
+
+	Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
+	mqh->mqh_queue = mq;
+	mqh->mqh_segment = seg;
+	mqh->mqh_buffer = NULL;
+	mqh->mqh_handle = handle;
+	mqh->mqh_buflen = 0;
+	mqh->mqh_consume_pending = 0;
+	mqh->mqh_context = CurrentMemoryContext;
+	mqh->mqh_partial_message_bytes = 0;
+	mqh->mqh_did_length_word = false;
+	mqh->mqh_counterparty_attached = false;
+
+	if (seg != NULL)
+		on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
+
+	return mqh;
+}
+
+/*
+ * Write a message into a shared message queue.
+ *
+ * When nowait = false, we'll wait on our process latch when the ring buffer
+ * fills up, and then continue writing once the receiver has drained some data.
+ * The process latch is reset after each wait.
+ *
+ * When nowait = true, we do not manipulate the state of the process latch;
+ * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK.  In
+ * this case, the caller should call this function again, with the same
+ * arguments, each time the process latch is set.
+ */
+shm_mq_result
+shm_mq_send(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait)
+{
+	shm_mq_result	res;
+	shm_mq		   *mq = mqh->mqh_queue;
+	uint64			bytes_written;
+
+	Assert(mq->mq_sender == MyProc);
+
+	/* Write the message length into the buffer. */
+	if (!mqh->mqh_did_length_word)
+	{
+		res = shm_mq_send_bytes(mqh, sizeof(uint64), &nbytes, nowait,
+								&bytes_written);
+		if (res != SHM_MQ_SUCCESS)
+			return res;
+
+		/*
+		 * We're sure to have sent the length in full, since we always
+		 * write a MAXALIGN'd chunk.
+		 */
+		Assert(bytes_written == MAXALIGN64(sizeof(uint64)));
+		mqh->mqh_did_length_word = true;
+	}
+
+	/* Write the actual data bytes into the buffer. */
+	Assert(mqh->mqh_partial_message_bytes <= nbytes);
+	res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_message_bytes,
+							((char *) data) + mqh->mqh_partial_message_bytes,
+							nowait, &bytes_written);
+	if (res == SHM_MQ_WOULD_BLOCK)
+		mqh->mqh_partial_message_bytes += bytes_written;
+	else
+	{
+		mqh->mqh_partial_message_bytes = 0;
+		mqh->mqh_did_length_word = false;
+	}
+	if (res != SHM_MQ_SUCCESS)
+		return res;
+
+	/* Notify receiver of the newly-written data, and return. */
+	return shm_mq_notify_receiver(mq);
+}
+
+/*
+ * Receive a message from a shared message queue.
+ *
+ * We set *nbytes to the message length and *data to point to the message
+ * payload.  If the entire message exists in the queue as a single,
+ * contiguous chunk, *data will point directly into shared memory; otherwise,
+ * it will point to a temporary buffer.  This mostly avoids data copying in
+ * the hoped-for case where messages are short compared to the buffer size,
+ * while still allowing longer messages.  In either case, the return value
+ * remains valid until the next receive operation is perfomed on the queue.
+ *
+ * When nowait = false, we'll wait on our process latch when the ring buffer
+ * is empty and we have not yet received a full message.  The sender will
+ * set our process latch after more data has been written, and we'll resume
+ * processing.  Each call will therefore return a complete message
+ * (unless the sender detaches the queue).
+ *
+ * When nowait = true, we do not manipulate the state of the process latch;
+ * instead, whenever the buffer is empty and we need to read from it, we
+ * return SHM_MQ_WOULD_BLOCK.  In this case, the caller should call this
+ * function again after the process latch has been set.
+ */
+shm_mq_result
+shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
+{
+	shm_mq		   *mq = mqh->mqh_queue;
+	shm_mq_result	res;
+	uint64			rb = 0;
+	uint64			nbytes;
+	uint64			needed;
+	void		   *rawdata;
+
+	Assert(mq->mq_receiver == MyProc);
+
+	/* We can't receive data until the sender has attached. */
+	if (!mqh->mqh_counterparty_attached)
+	{
+		if (nowait)
+		{
+			if (shm_mq_get_sender(mq) == NULL)
+				return SHM_MQ_WOULD_BLOCK;
+		}
+		else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle))
+		{
+			mq->mq_detached = true;
+			return SHM_MQ_DETACHED;
+		}
+		mqh->mqh_counterparty_attached = true;
+	}
+
+	/* Consume any zero-copy data from previous receive operation. */
+	if (mqh->mqh_consume_pending > 0)
+	{
+		shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
+		mqh->mqh_consume_pending = 0;
+	}
+
+	/* Determine the message length. */
+	if (mqh->mqh_did_length_word)
+	{
+		/* We've partially received a message; recall expected length. */
+		nbytes = mqh->mqh_expected_bytes;
+	}
+	else
+	{
+		/* Try to receive the message length word. */
+		res = shm_mq_receive_bytes(mq, sizeof(uint64), nowait, &rb, &rawdata);
+		if (res != SHM_MQ_SUCCESS)
+			return res;
+		Assert(rb >= sizeof(uint64));
+		memcpy(&nbytes, rawdata, sizeof(uint64));
+		mqh->mqh_expected_bytes = nbytes;
+
+		/* If we've already got the whole message, we're done. */
+		needed = MAXALIGN64(sizeof(uint64)) + MAXALIGN64(nbytes);
+		if (rb >= needed)
+		{
+			/*
+			 * Technically, we could consume the message length information at
+			 * this point, but the extra write to shared memory wouldn't be
+			 * free and in most cases we would reap no benefit.
+			 */
+			mqh->mqh_consume_pending = needed;
+			*nbytesp = nbytes;
+			*datap = ((char *) rawdata) + MAXALIGN64(sizeof(uint64));
+			return SHM_MQ_SUCCESS;
+		}
+
+		/* Consume the length word. */
+		shm_mq_inc_bytes_read(mq, MAXALIGN64(sizeof(uint64)));
+		mqh->mqh_did_length_word = true;
+		rb -= MAXALIGN64(sizeof(uint64));
+	}
+
+	if (mqh->mqh_partial_message_bytes == 0)
+	{
+		/*
+		 * Try to obtain the whole message in a single chunk.  If this works,
+		 * we need not copy the data and can return a pointer directly into
+		 * shared memory.
+		 */
+		res = shm_mq_receive_bytes(mq, nbytes, nowait, &rb, &rawdata);
+		if (res != SHM_MQ_SUCCESS)
+			return res;
+		if (rb >= nbytes)
+		{
+			mqh->mqh_did_length_word = false;
+			mqh->mqh_consume_pending = MAXALIGN64(nbytes);
+			*nbytesp = nbytes;
+			*datap = rawdata;
+			return SHM_MQ_SUCCESS;
+		}
+
+		/*
+		 * The message has wrapped the buffer.  We'll need to copy it in order
+		 * to return it to the client in one chunk.  First, make sure we have a
+		 * large enough buffer available.
+		 */
+		if (mqh->mqh_buflen < nbytes)
+		{
+			uint64		newbuflen = Max(mqh->mqh_buflen, MQH_BUFSIZE);
+
+			while (newbuflen < nbytes)
+				newbuflen *= 2;
+
+			if (mqh->mqh_buffer != NULL)
+			{
+				pfree(mqh->mqh_buffer);
+				mqh->mqh_buffer = NULL;
+				mqh->mqh_buflen = 0;
+			}
+			mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
+			mqh->mqh_buflen = newbuflen;
+		}
+	}
+
+	/* Loop until we've copied the entire message. */
+	for (;;)
+	{
+		uint64	still_needed;
+
+		/* Copy as much as we can. */
+		Assert(mqh->mqh_partial_message_bytes + rb <= nbytes);
+		memcpy(&mqh->mqh_buffer[mqh->mqh_partial_message_bytes], rawdata, rb);
+		mqh->mqh_partial_message_bytes += rb;
+
+		/*
+		 * Update count of bytes read, with alignment padding.  Note
+		 * that this will never actually insert any padding except at the
+		 * end of a message, because the buffer size is a multiple of
+		 * MAXIMUM_ALIGNOF, and each read and write is as well.
+		 */
+		Assert(mqh->mqh_partial_message_bytes == nbytes ||
+				rb == MAXALIGN64(rb));
+		shm_mq_inc_bytes_read(mq, MAXALIGN64(rb));
+
+		/* If we got all the data, exit the loop. */
+		if (mqh->mqh_partial_message_bytes >= nbytes)
+			break;
+
+		/* Wait for some more data. */
+		still_needed = nbytes - mqh->mqh_partial_message_bytes;
+		res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
+		if (res != SHM_MQ_SUCCESS)
+			return res;
+		if (rb > still_needed)
+			rb = still_needed;
+	}
+
+	/* Return the complete message, and reset for next message. */
+	*nbytesp = nbytes;
+	*datap = mqh->mqh_buffer;
+	mqh->mqh_did_length_word = false;
+	mqh->mqh_partial_message_bytes = 0;
+	return SHM_MQ_SUCCESS;
+}
+
+/*
+ * Wait for the other process that's supposed to use this queue to attach
+ * to it.
+ *
+ * The return value is SHM_MQ_DETACHED if the worker has already detached or
+ * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
+ * Note that we will only be able to detect that the worker has died before
+ * attaching if a background worker handle was passed to shm_mq_attach().
+ */
+shm_mq_result
+shm_mq_wait_for_attach(shm_mq_handle *mqh)
+{
+	shm_mq	   *mq = mqh->mqh_queue;
+	PGPROC	   **victim;
+
+	if (shm_mq_get_receiver(mq) == MyProc)
+		victim = &mq->mq_sender;
+	else
+	{
+		Assert(shm_mq_get_sender(mq) == MyProc);
+		victim = &mq->mq_receiver;
+	}
+
+	if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
+		return SHM_MQ_SUCCESS;
+	else
+		return SHM_MQ_DETACHED;
+}
+
+/*
+ * Detach a shared message queue.
+ *
+ * The purpose of this function is to make sure that the process
+ * with which we're communicating doesn't block forever waiting for us to
+ * fill or drain the queue once we've lost interest.  Whem the sender
+ * detaches, the receiver can read any messages remaining in the queue;
+ * further reads will return SHM_MQ_DETACHED.  If the receiver detaches,
+ * further attempts to send messages will likewise return SHM_MQ_DETACHED.
+ */
+void
+shm_mq_detach(shm_mq *mq)
+{
+	volatile shm_mq *vmq = mq;
+	PGPROC	   *victim;
+
+	SpinLockAcquire(&mq->mq_mutex);
+	if (vmq->mq_sender == MyProc)
+		victim = vmq->mq_receiver;
+	else
+	{
+		Assert(vmq->mq_receiver == MyProc);
+		victim = vmq->mq_sender;
+	}
+	vmq->mq_detached = true;
+	SpinLockRelease(&mq->mq_mutex);
+
+	if (victim != NULL)
+		SetLatch(&victim->procLatch);
+}
+
+/*
+ * Write bytes into a shared message queue.
+ */
+static shm_mq_result
+shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
+				  uint64 *bytes_written)
+{
+	shm_mq	   *mq = mqh->mqh_queue;
+	uint64		sent = 0;
+	uint64		used;
+	uint64		ringsize = mq->mq_ring_size;
+	uint64		available;
+
+	while (sent < nbytes)
+	{
+		bool	detached;
+		uint64	rb;
+
+		/* Compute number of ring buffer bytes used and available. */
+		rb = shm_mq_get_bytes_read(mq, &detached);
+		Assert(mq->mq_bytes_written >= rb);
+		used = mq->mq_bytes_written - rb;
+		Assert(used <= ringsize);
+		available = Min(ringsize - used, nbytes - sent);
+
+		/* Bail out if the queue has been detached. */
+		if (detached)
+			return SHM_MQ_DETACHED;
+
+		if (available == 0)
+		{
+			shm_mq_result	res;
+
+			/*
+			 * The queue is full, so if the receiver isn't yet known to be
+			 * attached, we must wait for that to happen.
+			 */
+			if (!mqh->mqh_counterparty_attached)
+			{
+				if (nowait)
+				{
+					if (shm_mq_get_receiver(mq) == NULL)
+						return SHM_MQ_WOULD_BLOCK;
+				}
+				else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
+											   mqh->mqh_handle))
+				{
+					mq->mq_detached = true;
+					return SHM_MQ_DETACHED;
+				}
+				mqh->mqh_counterparty_attached = true;
+			}
+
+			/* Let the receiver know that we need them to read some data. */
+			res = shm_mq_notify_receiver(mq);
+			if (res != SHM_MQ_SUCCESS)
+			{
+				*bytes_written = res;
+				return res;
+			}
+
+			/* Skip manipulation of our latch if nowait = true. */
+			if (nowait)
+			{
+				*bytes_written = sent;
+				return SHM_MQ_WOULD_BLOCK;
+			}
+
+			/*
+			 * Wait for our latch to be set.  It might already be set for
+			 * some unrelated reason, but that'll just result in one extra
+			 * trip through the loop.  It's worth it to avoid resetting the
+			 * latch at top of loop, because setting an already-set latch is
+			 * much cheaper than setting one that has been reset.
+			 */
+			WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
+
+			/* An interrupt may have occurred while we were waiting. */
+			CHECK_FOR_INTERRUPTS();
+
+			/* Reset the latch so we don't spin. */
+			ResetLatch(&MyProc->procLatch);
+		}
+		else
+		{
+			uint64	offset = mq->mq_bytes_written % ringsize;
+			uint64	sendnow = Min(available, ringsize - offset);
+
+			/* Write as much data as we can via a single memcpy(). */
+			memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
+				   (char *) data + sent, sendnow);
+			sent += sendnow;
+
+			/*
+			 * Update count of bytes written, with alignment padding.  Note
+			 * that this will never actually insert any padding except at the
+			 * end of a run of bytes, because the buffer size is a multiple of
+			 * MAXIMUM_ALIGNOF, and each read is as well.
+			 */
+			Assert(sent == nbytes || sendnow == MAXALIGN64(sendnow));
+			shm_mq_inc_bytes_written(mq, MAXALIGN64(sendnow));
+
+			/*
+			 * For efficiency, we don't set the reader's latch here.  We'll
+			 * do that only when the buffer fills up or after writing an
+			 * entire message.
+			 */
+		}
+	}
+
+	*bytes_written = sent;
+	return SHM_MQ_SUCCESS;
+}
+
+/*
+ * Wait until at least *nbytesp bytes are available to be read from the
+ * shared message queue, or until the buffer wraps around.  On return,
+ * *datap is set to the location at which data bytes can be read.  The
+ * return value is the number of bytes available to be read starting at
+ * that offset; if the message has wrapped the buffer, it may be less than
+ * bytes_needed.
+ */
+static shm_mq_result
+shm_mq_receive_bytes(shm_mq *mq, uint64 bytes_needed, bool nowait,
+					 uint64 *nbytesp, void **datap)
+{
+	uint64		used;
+	uint64		ringsize = mq->mq_ring_size;
+	uint64		written;
+
+	for (;;)
+	{
+		uint64		offset;
+		bool		detached;
+
+		/* Get bytes written, so we can compute what's available to read. */
+		written = shm_mq_get_bytes_written(mq, &detached);
+		used = written - mq->mq_bytes_read;
+		Assert(used <= ringsize);
+		offset = mq->mq_bytes_read % ringsize;
+
+		/* If we have enough data or buffer has wrapped, we're done. */
+		if (used >= bytes_needed || offset + used >= ringsize)
+		{
+			*nbytesp = Min(used, ringsize - offset);
+			*datap = &mq->mq_ring[mq->mq_ring_offset + offset];
+			return SHM_MQ_SUCCESS;
+		}
+
+		/*
+		 * Fall out before waiting if the queue has been detached.
+		 *
+		 * Note that we don't check for this until *after* considering
+		 * whether the data already available is enough, since the
+		 * receiver can finish receiving a message stored in the buffer
+		 * even after the sender has detached.
+		 */
+		if (detached)
+			return SHM_MQ_DETACHED;
+
+		/* Skip manipulation of our latch if nowait = true. */
+		if (nowait)
+			return SHM_MQ_WOULD_BLOCK;
+
+		/*
+		 * Wait for our latch to be set.  It might already be set for
+		 * some unrelated reason, but that'll just result in one extra
+		 * trip through the loop.  It's worth it to avoid resetting the
+		 * latch at top of loop, because setting an already-set latch is
+		 * much cheaper than setting one that has been reset.
+		 */
+		WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
+
+		/* An interrupt may have occurred while we were waiting. */
+		CHECK_FOR_INTERRUPTS();
+
+		/* Reset the latch so we don't spin. */
+		ResetLatch(&MyProc->procLatch);
+	}
+}
+
+/*
+ * This is used when a process is waiting for its counterpart to attach to the
+ * queue.  We exit when the other process attaches as expected, or, if
+ * handle != NULL, when the referenced background process or the postmaster
+ * dies.  Note that if handle == NULL, and the process fails to attach, we'll
+ * potentially get stuck here forever waiting for a process that may never
+ * start.  We do check for interrupts, though.
+ *
+ * ptr is a pointer to the memory address that we're expecting to become
+ * non-NULL when our counterpart attaches to the queue.
+ */
+static bool
+shm_mq_wait_internal(volatile shm_mq *mq, PGPROC * volatile *ptr,
+					 BackgroundWorkerHandle *handle)
+{
+	bool	save_set_latch_on_sigusr1;
+	bool	result = false;
+
+	save_set_latch_on_sigusr1 = set_latch_on_sigusr1;
+	if (handle != NULL)
+		set_latch_on_sigusr1 = true;
+
+	PG_TRY();
+	{
+		for (;;)
+		{
+			BgwHandleStatus	status;
+			pid_t	pid;
+			bool	detached;
+
+			/* Acquire the lock just long enough to check the pointer. */
+			SpinLockAcquire(&mq->mq_mutex);
+			detached = mq->mq_detached;
+			result = (*ptr != NULL);
+			SpinLockRelease(&mq->mq_mutex);
+
+			/* Fail if detached; else succeed if initialized. */
+			if (detached)
+			{
+				result = false;
+				break;
+			}
+			if (result)
+				break;
+
+			if (handle != NULL)
+			{
+				/* Check for unexpected worker death. */
+				status = GetBackgroundWorkerPid(handle, &pid);
+				if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
+				{
+					result = false;
+					break;
+				}
+			}
+
+			/* Wait to be signalled. */
+			WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
+
+			/* An interrupt may have occurred while we were waiting. */
+			CHECK_FOR_INTERRUPTS();
+
+			/* Reset the latch so we don't spin. */
+			ResetLatch(&MyProc->procLatch);
+		}
+	}
+	PG_CATCH();
+	{
+		set_latch_on_sigusr1 = save_set_latch_on_sigusr1;
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+
+	return result;
+}
+
+/*
+ * Get the number of bytes read.  The receiver need not use this to access
+ * the count of bytes read, but the sender must.
+ */
+static uint64
+shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached)
+{
+	uint64	v;
+
+	SpinLockAcquire(&mq->mq_mutex);
+	v = mq->mq_bytes_read;
+	*detached = mq->mq_detached;
+	SpinLockRelease(&mq->mq_mutex);
+
+	return mq->mq_bytes_read;
+}
+
+/*
+ * Increment the number of bytes read.
+ */
+static void
+shm_mq_inc_bytes_read(volatile shm_mq *mq, uint64 n)
+{
+	PGPROC	   *sender;
+
+	SpinLockAcquire(&mq->mq_mutex);
+	mq->mq_bytes_read += n;
+	sender = mq->mq_sender;
+	SpinLockRelease(&mq->mq_mutex);
+
+	/* We shoudn't have any bytes to read without a sender. */
+	Assert(sender != NULL);
+	SetLatch(&sender->procLatch);
+}
+
+/*
+ * Get the number of bytes written.  The sender need not use this to access
+ * the count of bytes written, but the reciever must.
+ */
+static uint64
+shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached)
+{
+	uint64	v;
+
+	SpinLockAcquire(&mq->mq_mutex);
+	v = mq->mq_bytes_written;
+	*detached = mq->mq_detached;
+	SpinLockRelease(&mq->mq_mutex);
+
+	return mq->mq_bytes_written;
+}
+
+/*
+ * Increment the number of bytes written.
+ */
+static void
+shm_mq_inc_bytes_written(volatile shm_mq *mq, uint64 n)
+{
+	SpinLockAcquire(&mq->mq_mutex);
+	mq->mq_bytes_written += n;
+	SpinLockRelease(&mq->mq_mutex);
+}
+
+/*
+ * Set sender's latch, unless queue is detached.
+ */
+static shm_mq_result
+shm_mq_notify_receiver(volatile shm_mq *mq)
+{
+	PGPROC *receiver;
+	bool	detached;
+
+	SpinLockAcquire(&mq->mq_mutex);
+	detached = mq->mq_detached;
+	receiver = mq->mq_receiver;
+	SpinLockRelease(&mq->mq_mutex);
+
+	if (detached)
+		return SHM_MQ_DETACHED;
+	if (receiver)
+		SetLatch(&receiver->procLatch);
+	return SHM_MQ_SUCCESS;
+}
+
+/* Shim for on_dsm_callback. */
+static void
+shm_mq_detach_callback(dsm_segment *seg, Datum arg)
+{
+	shm_mq	   *mq = (shm_mq *) DatumGetPointer(arg);
+
+	shm_mq_detach(mq);
+}
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
new file mode 100644
index 0000000..749ab3b
--- /dev/null
+++ b/src/include/storage/shm_mq.h
@@ -0,0 +1,70 @@
+/*-------------------------------------------------------------------------
+ *
+ * shm_mq.h
+ *	  single-reader, single-writer shared memory message queue
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/shm_mq.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SHM_MQ_H
+#define SHM_MQ_H
+
+#include "postmaster/bgworker.h"
+#include "storage/dsm.h"
+#include "storage/proc.h"
+
+/* The queue itself, in shared memory. */
+struct shm_mq;
+typedef struct shm_mq shm_mq;
+
+/* Backend-private state. */
+struct shm_mq_handle;
+typedef struct shm_mq_handle shm_mq_handle;
+
+/* Possible results of a send or receive operation. */
+typedef enum
+{
+	SHM_MQ_SUCCESS,			/* Sent or received a message. */
+	SHM_MQ_WOULD_BLOCK,		/* Not completed; retry later. */
+	SHM_MQ_DETACHED			/* Other process has detached queue. */
+} shm_mq_result;
+
+/*
+ * Primitives to create a queue and set the sender and receiver.
+ *
+ * Both the sender and the receiver must be set before any messages are read
+ * or written, but they need not be set by the same process.  Each must be
+ * set exactly once.
+ */
+extern shm_mq *shm_mq_create(void *address, Size size);
+extern void shm_mq_set_receiver(shm_mq *mq, PGPROC *);
+extern void shm_mq_set_sender(shm_mq *mq, PGPROC *);
+
+/* Accessor methods for sender and receiver. */
+extern PGPROC *shm_mq_get_receiver(shm_mq *);
+extern PGPROC *shm_mq_get_sender(shm_mq *);
+
+/* Set up backend-local queue state. */
+extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg,
+			  BackgroundWorkerHandle *handle);
+
+/* Break connection. */
+extern void shm_mq_detach(shm_mq *);
+
+/* Send or receive messages. */
+extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
+			uint64 nbytes, void *data, bool nowait);
+extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
+			   uint64 *nbytesp, void **datap, bool nowait);
+
+/* Wait for our counterparty to attach to the queue. */
+extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);
+
+/* Smallest possible queue. */
+extern Size shm_mq_minimum_size;
+
+#endif   /* SHM_MQ_H */

["test-shm-mq-v1.patch" (text/x-patch)]

commit 45ffed0eb59d2771461df6f751552ee872405559
Author: Robert Haas <rhaas@postgresql.org>
Date:   Mon Oct 28 13:06:53 2013 -0400

    Test code for shared memory message queue facility.
    
    This code is intended as a demonstration of how the dynamic shared
    memory and dynamic background worker facilities can be used to establish
    a group of coooperating processes which can coordinate their activities
    using the shared memory message queue facility.  By itself, the code
    does nothing particularly interesting: it simply allows messages to
    be passed through a loop of workers and back to the original process.
    But it's a useful unit test, in addition to its demonstration value.

diff --git a/contrib/Makefile b/contrib/Makefile
index 8a2a937..6e4cfd4 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -50,6 +50,7 @@ SUBDIRS = \
 		tablefunc	\
 		tcn		\
 		test_parser	\
+		test_shm_mq	\
 		tsearch2	\
 		unaccent	\
 		vacuumlo	\
diff --git a/contrib/test_shm_mq/.gitignore b/contrib/test_shm_mq/.gitignore
new file mode 100644
index 0000000..5dcb3ff
--- /dev/null
+++ b/contrib/test_shm_mq/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/contrib/test_shm_mq/Makefile b/contrib/test_shm_mq/Makefile
new file mode 100644
index 0000000..5e5ac1c
--- /dev/null
+++ b/contrib/test_shm_mq/Makefile
@@ -0,0 +1,20 @@
+# contrib/test_shm_mq/Makefile
+
+MODULE_big = test_shm_mq
+OBJS = test.o setup.o worker.o
+
+EXTENSION = test_shm_mq
+DATA = test_shm_mq--1.0.sql
+
+REGRESS = test_shm_mq
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/test_shm_mq
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/test_shm_mq/expected/test_shm_mq.out \
b/contrib/test_shm_mq/expected/test_shm_mq.out new file mode 100644
index 0000000..dd9891c
--- /dev/null
+++ b/contrib/test_shm_mq/expected/test_shm_mq.out
@@ -0,0 +1,18 @@
+CREATE EXTENSION test_shm_mq;
+--
+-- These tests don't produce any interesting output.  We're checking that
+-- the operations complete without crashing or hanging and that none of their
+-- internal sanity tests fail.
+--
+SELECT test_shm_mq(32768, (select string_agg(chr(32+(random()*96)::int), '') from \
generate_series(1,400)), 10000, 1); + test_shm_mq 
+-------------
+ 
+(1 row)
+
+SELECT test_shm_mq_pipelined(16384, (select string_agg(chr(32+(random()*96)::int), \
'') from generate_series(1,270000)), 200, 3); + test_shm_mq_pipelined 
+-----------------------
+ 
+(1 row)
+
diff --git a/contrib/test_shm_mq/setup.c b/contrib/test_shm_mq/setup.c
new file mode 100644
index 0000000..60601fe
--- /dev/null
+++ b/contrib/test_shm_mq/setup.c
@@ -0,0 +1,323 @@
+/*--------------------------------------------------------------------------
+ *
+ * setup.c
+ *		Code to set up a dynamic shared memory segments and a specified
+ *		number of background workers for shared memory message queue
+ *		testing.
+ *
+ * Copyright (C) 2013, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		contrib/test_shm_mq/setup.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "postmaster/bgworker.h"
+#include "storage/procsignal.h"
+#include "storage/shm_toc.h"
+#include "utils/memutils.h"
+
+#include "test_shm_mq.h"
+
+typedef struct
+{
+	int		nworkers;
+	BackgroundWorkerHandle *handle[FLEXIBLE_ARRAY_MEMBER];
+} worker_state;
+
+static void setup_dynamic_shared_memory(uint64 queue_size, int nworkers,
+							dsm_segment **segp,
+							test_shm_mq_header **hdrp,
+							shm_mq **outp, shm_mq **inp);
+static worker_state *setup_background_workers(int nworkers,
+											  dsm_segment *seg);
+static void cleanup_background_workers(dsm_segment *seg, Datum arg);
+static void wait_for_workers_to_become_ready(worker_state *wstate,
+								 volatile test_shm_mq_header *hdr);
+static bool check_worker_status(worker_state *wstate);
+
+/*
+ * Set up a dynamic shared memory segment and zero or more background workers
+ * for a test run.
+ */
+void
+test_shm_mq_setup(uint64 queue_size, int32 nworkers, dsm_segment **segp,
+				  shm_mq_handle **output, shm_mq_handle **input)
+{
+	dsm_segment *seg;
+	test_shm_mq_header *hdr;
+	shm_mq	   *outq;
+	shm_mq	   *inq;
+	worker_state	   *wstate;
+
+	/* Set up a dynamic shared memory segment. */
+	setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);
+	*segp = seg;
+
+	/* Register background workers. */
+	wstate = setup_background_workers(nworkers, seg);
+
+	/* Attach the queues. */
+	*output = shm_mq_attach(outq, seg, wstate->handle[0]);
+	*input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);
+
+	/* Wait for workers to become ready. */
+	wait_for_workers_to_become_ready(wstate, hdr);
+
+	/*
+	 * Once we reach this point, all workers are ready.  We no longer need
+	 * to kill them if we die; they'll die on their own as the message queues
+	 * shut down.
+	 */
+	on_dsm_detach_cancel(seg, cleanup_background_workers,
+						 PointerGetDatum(wstate));
+	pfree(wstate);
+}
+
+/*
+ * Set up a dynamic shared memory segment.
+ *
+ * We set up a small control region that contains only a test_shm_mq_header,
+ * plus one region per message queue.  There are as many message queues as
+ * the number of workers, plus one.
+ */
+static void
+setup_dynamic_shared_memory(uint64 queue_size, int nworkers,
+							dsm_segment **segp, test_shm_mq_header **hdrp,
+							shm_mq **outp, shm_mq **inp)
+{
+	shm_toc_estimator	e;
+	int					i;
+	uint64			segsize;
+	dsm_segment	   *seg;
+	shm_toc		   *toc;
+	test_shm_mq_header *hdr;
+
+	/* Ensure a valid queue size. */
+	if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("queue size must be at least " UINT64_FORMAT " bytes",
+					shm_mq_minimum_size)));
+
+	/*
+	 * Estimate how much shared memory we need.
+	 *
+	 * Because the TOC machinery may choose to insert padding of oddly-sized
+	 * requests, we must estimate each chunk separately.
+	 *
+	 * We need one key to register the location of the header, and we need
+	 * nworkers + 1 keys to track the locations of the message queues.
+	 */
+	shm_toc_initialize_estimator(&e);
+	shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header));
+	for (i = 0; i <= nworkers; ++i)
+		shm_toc_estimate_chunk(&e, queue_size);
+	shm_toc_estimate_keys(&e, 2 + nworkers);
+	segsize = shm_toc_estimate(&e);
+
+	/* Create the shared memory segment and establish a table of contents. */
+	seg = dsm_create(shm_toc_estimate(&e));
+	toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg),
+						 segsize);
+
+	/* Set up the header region. */
+	hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header));
+	SpinLockInit(&hdr->mutex);
+	hdr->workers_total = nworkers;
+	hdr->workers_attached = 0;
+	hdr->workers_ready = 0;
+	shm_toc_insert(toc, 0, hdr);
+
+	/* Set up one message queue per worker, plus one. */
+	for (i = 0; i <= nworkers; ++i)
+	{
+		shm_mq		   *mq;
+
+		mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
+		shm_toc_insert(toc, i + 1, mq);
+
+		if (i == 0)
+		{
+			/* We send messages to the first queue. */
+			shm_mq_set_sender(mq, MyProc);
+			*outp = mq;
+		}
+		if (i == nworkers)
+		{
+			/* We receive messages from the last queue. */
+			shm_mq_set_receiver(mq, MyProc);
+			*inp = mq;
+		}
+	}
+
+	/* Return results to caller. */
+	*segp = seg;
+	*hdrp = hdr;
+}
+
+/*
+ * Register background workers.
+ */
+static worker_state *
+setup_background_workers(int nworkers, dsm_segment *seg)
+{
+	MemoryContext	oldcontext;
+	BackgroundWorker worker;
+	worker_state	*wstate;
+	int		i;
+
+	/*
+	 * We need the worker_state object and the background worker handles to
+	 * which it points to be allocated in CurTransactionContext rather than
+	 * ExprContext; otherwise, they'll be destroyed before the on_dsm_detach
+	 * hooks run.
+	 */
+	oldcontext = MemoryContextSwitchTo(CurTransactionContext);
+
+	/* Create worker state object. */
+	wstate = MemoryContextAlloc(TopTransactionContext,
+								offsetof(worker_state, handle) +
+								sizeof(BackgroundWorkerHandle *) * nworkers);
+	wstate->nworkers = 0;
+
+	/*
+	 * Arrange to kill all the workers if we abort before all workers are
+	 * finished hooking themselves up to the dynamic shared memory segment.
+	 *
+	 * If we die after all the workers have finished hooking themselves up
+	 * to the dynamic shared memory segment, we'll mark the two queues to
+	 * which we're directly connected as detached, and the worker(s)
+	 * connected to those queues will exit, marking any other queues to
+	 * which they are connected as detached.  This will cause any
+	 * as-yet-unaware workers connected to those queues to exit in their
+	 * turn, and so on, until everybody exits.
+	 *
+	 * But suppose the workers which are supposed to connect to the queues
+	 * to which we're directly attached exit due to some error before they
+	 * actually attach the queues.  The remaining workers will have no way of
+	 * knowing this.  From their perspective, they're still waiting for those
+	 * workers to start, when in fact they've already died.
+	 */
+	on_dsm_detach(seg, cleanup_background_workers,
+				  PointerGetDatum(wstate));
+
+	/* Configure a worker. */
+	worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
+	worker.bgw_start_time = BgWorkerStart_ConsistentState;
+	worker.bgw_restart_time = BGW_NEVER_RESTART;
+	worker.bgw_main = NULL;		/* new worker might not have library loaded */
+	sprintf(worker.bgw_library_name, "test_shm_mq");
+	sprintf(worker.bgw_function_name, "test_shm_mq_main");
+	snprintf(worker.bgw_name, BGW_MAXLEN, "test_shm_mq");
+	worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
+	/* set bgw_notify_pid, so we can detect if the worker stops */
+	worker.bgw_notify_pid = MyProcPid;
+
+	/* Register the workers. */
+	for (i = 0; i < nworkers; ++i)
+	{
+		if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i]))
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+					 errmsg("could not register background process"),
+				 errhint("You may need to increase max_worker_processes.")));
+		++wstate->nworkers;
+	}
+
+	/* All done. */
+	MemoryContextSwitchTo(oldcontext);
+	return wstate;
+}
+
+static void
+cleanup_background_workers(dsm_segment *seg, Datum arg)
+{
+	worker_state *wstate = (worker_state *) DatumGetPointer(arg);
+
+	while (wstate->nworkers > 0)
+	{
+		--wstate->nworkers;
+		TerminateBackgroundWorker(wstate->handle[wstate->nworkers]);
+	}
+}
+
+static void
+wait_for_workers_to_become_ready(worker_state *wstate,
+								 volatile test_shm_mq_header *hdr)
+{
+	bool	save_set_latch_on_sigusr1;
+	bool	result = false;
+
+	save_set_latch_on_sigusr1 = set_latch_on_sigusr1;
+	set_latch_on_sigusr1 = true;
+
+	PG_TRY();
+	{
+		for (;;)
+		{
+			int workers_ready;
+
+			/* If all the workers are ready, we have succeeded. */
+			SpinLockAcquire(&hdr->mutex);
+			workers_ready = hdr->workers_ready;
+			SpinLockRelease(&hdr->mutex);
+			if (workers_ready >= wstate->nworkers)
+			{
+				result = true;
+				break;
+			}
+
+			/* If any workers (or the postmaster) have died, we have failed. */
+			if (!check_worker_status(wstate))
+			{
+				result = false;
+				break;
+			}
+
+  			/* Wait to be signalled. */
+			WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
+
+			/* An interrupt may have occurred while we were waiting. */
+			CHECK_FOR_INTERRUPTS();
+
+			/* Reset the latch so we don't spin. */
+			ResetLatch(&MyProc->procLatch);
+		}
+	}
+	PG_CATCH();
+	{
+		set_latch_on_sigusr1 = save_set_latch_on_sigusr1;
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+
+	if (!result)
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+				 errmsg("one or more background workers failed to start")));
+}
+
+static bool
+check_worker_status(worker_state *wstate)
+{
+	int	n;
+
+	/* If any workers (or the postmaster) have died, we have failed. */
+	for (n = 0; n < wstate->nworkers; ++n)
+	{
+		BgwHandleStatus status;
+		pid_t	pid;
+
+		status = GetBackgroundWorkerPid(wstate->handle[n], &pid);
+		if (status == BGWH_STOPPED || status == BGWH_POSTMASTER_DIED)
+			return false;
+	}
+
+	/* Otherwise, things still look OK. */
+	return true;
+}
diff --git a/contrib/test_shm_mq/sql/test_shm_mq.sql \
b/contrib/test_shm_mq/sql/test_shm_mq.sql new file mode 100644
index 0000000..1366df1
--- /dev/null
+++ b/contrib/test_shm_mq/sql/test_shm_mq.sql
@@ -0,0 +1,9 @@
+CREATE EXTENSION test_shm_mq;
+
+--
+-- These tests don't produce any interesting output.  We're checking that
+-- the operations complete without crashing or hanging and that none of their
+-- internal sanity tests fail.
+--
+SELECT test_shm_mq(32768, (select string_agg(chr(32+(random()*96)::int), '') from \
generate_series(1,400)), 10000, 1); +SELECT test_shm_mq_pipelined(16384, (select \
                string_agg(chr(32+(random()*96)::int), '') from \
                generate_series(1,270000)), 200, 3);
diff --git a/contrib/test_shm_mq/test.c b/contrib/test_shm_mq/test.c
new file mode 100644
index 0000000..59f18ec
--- /dev/null
+++ b/contrib/test_shm_mq/test.c
@@ -0,0 +1,265 @@
+/*--------------------------------------------------------------------------
+ *
+ * test.c
+ *		Test harness code for shared memory message queues.
+ *
+ * Copyright (C) 2013, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		contrib/test_shm_mq/test.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "fmgr.h"
+#include "miscadmin.h"
+
+#include "test_shm_mq.h"
+
+PG_MODULE_MAGIC;
+PG_FUNCTION_INFO_V1(test_shm_mq);
+PG_FUNCTION_INFO_V1(test_shm_mq_pipelined);
+
+void		_PG_init(void);
+Datum		test_shm_mq(PG_FUNCTION_ARGS);
+Datum		test_shm_mq_pipelined(PG_FUNCTION_ARGS);
+
+static void verify_message(uint64 origlen, char *origdata, uint64 newlen,
+			   char *newdata);
+
+/*
+ * Simple test of the shared memory message queue infrastructure.
+ *
+ * We set up a ring of message queues passing through 1 or more background
+ * processes and eventually looping back to ourselves.  We then send a message
+ * through the ring a number of times indicated by the loop count.  At the end,
+ * we check whether the final message matches the one we started with.
+ */
+Datum
+test_shm_mq(PG_FUNCTION_ARGS)
+{
+	int64		queue_size = PG_GETARG_INT64(0);
+	text	   *message = PG_GETARG_TEXT_PP(1);
+	char	   *message_contents = VARDATA_ANY(message);
+	int			message_size = VARSIZE_ANY_EXHDR(message);
+	int32		loop_count = PG_GETARG_INT32(2);
+	int32		nworkers = PG_GETARG_INT32(3);
+	dsm_segment *seg;
+	shm_mq_handle *outqh;
+	shm_mq_handle *inqh;
+	shm_mq_result	res;
+	uint64		len;
+	void	   *data;
+
+	/* A negative loopcount is nonsensical. */
+	if (loop_count < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("repeat count size must be a non-negative integer")));
+
+	/*
+	 * Since this test sends data using the blocking interfaces, it cannot
+	 * send data to itself.  Therefore, a minimum of 1 worker is required.
+	 * Of course, a negative worker count is nonsensical.
+	 */
+	if (nworkers < 1)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("number of workers must be a positive integer")));
+
+	/* Set up dynamic shared memory segment and background workers. */
+	test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
+
+	/* Send the initial message. */
+	res = shm_mq_send(outqh, message_size, message_contents, false);
+	if (res != SHM_MQ_SUCCESS)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("could not send message")));
+
+	/*
+	 * Receive a message and send it back out again.  Do this a number of
+	 * times equal to the loop count.
+	 */
+	for (;;)
+	{
+		/* Receive a message. */
+		res = shm_mq_receive(inqh, &len, &data, false);
+		if (res != SHM_MQ_SUCCESS)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("could not receive message")));
+
+		/* If this is supposed to be the last iteration, stop here. */
+		if (--loop_count <= 0)
+			break;
+
+		/* Send it back out. */
+		res = shm_mq_send(outqh, len, data, false);
+		if (res != SHM_MQ_SUCCESS)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("could not send message")));
+	}
+
+	/*
+	 * Finally, check that we got back the same message from the last
+	 * iteration that we originally sent.
+	 */
+	verify_message(message_size, message_contents, len, data);
+
+	/* Clean up. */
+	dsm_detach(seg);
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * Pipelined test of the shared memory message queue infrastructure.
+ *
+ * As in the basic test, we set up a ring of message queues passing through
+ * 1 or more background processes and eventually looping back to ourselves.
+ * Then, we send N copies of the user-specified message through the ring and
+ * receive them all back.  Since this might fill up all message queues in the
+ * ring and then stall, we must be prepared to begin receiving the messages
+ * back before we've finished sending them.
+ */
+Datum
+test_shm_mq_pipelined(PG_FUNCTION_ARGS)
+{
+	int64		queue_size = PG_GETARG_INT64(0);
+	text	   *message = PG_GETARG_TEXT_PP(1);
+	char	   *message_contents = VARDATA_ANY(message);
+	int			message_size = VARSIZE_ANY_EXHDR(message);
+	int32		loop_count = PG_GETARG_INT32(2);
+	int32		nworkers = PG_GETARG_INT32(3);
+	bool		verify = PG_GETARG_BOOL(4);
+	int32		send_count = 0;
+	int32		receive_count = 0;
+	dsm_segment *seg;
+	shm_mq_handle *outqh;
+	shm_mq_handle *inqh;
+	shm_mq_result	res;
+	uint64		len;
+	void	   *data;
+
+	/* A negative loopcount is nonsensical. */
+	if (loop_count < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("repeat count size must be a non-negative integer")));
+
+	/*
+	 * Using the nonblocking interfaces, we can even send data to ourselves,
+	 * so the minimum number of workers for this test is zero.
+	 */
+	if (nworkers < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("number of workers must be a non-negative integer")));
+
+	/* Set up dynamic shared memory segment and background workers. */
+	test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
+
+	/* Main loop. */
+	for (;;)
+	{
+		bool		wait = true;
+
+		/*
+		 * If we haven't yet sent the message the requisite number of times,
+		 * try again to send it now.  Note that when shm_mq_send() returns
+		 * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
+		 * same message size and contents; that's not an issue here because
+		 * we're sending the same message every time.
+		 */
+		if (send_count < loop_count)
+		{
+			res = shm_mq_send(outqh, message_size, message_contents, true);
+			if (res == SHM_MQ_SUCCESS)
+			{
+				++send_count;
+				wait = false;
+			}
+			else if (res == SHM_MQ_DETACHED)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("could not send message")));
+		}
+
+		/*
+		 * If we haven't yet received the message the requisite number of
+		 * times, try to receive it again now.
+		 */
+		if (receive_count < loop_count)
+		{
+			res = shm_mq_receive(inqh, &len, &data, true);
+			if (res == SHM_MQ_SUCCESS)
+			{
+				++receive_count;
+				/* Verifying every time is slow, so it's optional. */
+				if (verify)
+					verify_message(message_size, message_contents, len, data);
+				wait = false;
+			}
+			else if (res == SHM_MQ_DETACHED)
+				ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("could not receive message")));
+		}
+		else
+		{
+			/*
+			 * Otherwise, we've received the message enough times.  This
+			 * shouldn't happen unless we've also sent it enough times.
+			 */
+			if (send_count != receive_count)
+				ereport(ERROR,
+						(errcode(ERRCODE_INTERNAL_ERROR),
+						 errmsg("message sent %d times, but received %d times",
+							send_count, receive_count)));
+			break;
+		}
+
+		if (wait)
+		{
+			/*
+			 * If we made no progress, wait for one of the other processes
+			 * to which we are connected to set our latch, indicating that
+			 * they have read or written data and therefore there may now be
+			 * work for us to do.
+			 */
+			WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
+			CHECK_FOR_INTERRUPTS();
+			ResetLatch(&MyProc->procLatch);
+		}
+	}
+
+	/* Clean up. */
+	dsm_detach(seg);
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * Verify that two messages are the same.
+ */
+static void
+verify_message(uint64 origlen, char *origdata, uint64 newlen, char *newdata)
+{
+	uint64	i;
+
+	if (origlen != newlen)
+		ereport(ERROR,
+				(errmsg("message corrupted"),
+				 errdetail("The original message was " UINT64_FORMAT " bytes but the final \
message is " UINT64_FORMAT " bytes.", +					 origlen, newlen)));
+
+	for (i = 0; i < origlen; ++i)
+		if (origdata[i] != newdata[i])
+			ereport(ERROR,
+					(errmsg("message corrupted"),
+					 errdetail("The new and original messages differ at byte " UINT64_FORMAT " of " \
UINT64_FORMAT ".", i, origlen))); +}
diff --git a/contrib/test_shm_mq/test_shm_mq--1.0.sql \
b/contrib/test_shm_mq/test_shm_mq--1.0.sql new file mode 100644
index 0000000..54b225e
--- /dev/null
+++ b/contrib/test_shm_mq/test_shm_mq--1.0.sql
@@ -0,0 +1,19 @@
+/* contrib/test_shm_mq/test_shm_mq--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_shm_mq" to load this file. \quit
+
+CREATE FUNCTION test_shm_mq(queue_size pg_catalog.int8,
+					   message pg_catalog.text,
+					   repeat_count pg_catalog.int4 default 1,
+					   num_workers pg_catalog.int4 default 1)
+    RETURNS pg_catalog.void STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION test_shm_mq_pipelined(queue_size pg_catalog.int8,
+					   message pg_catalog.text,
+					   repeat_count pg_catalog.int4 default 1,
+					   num_workers pg_catalog.int4 default 1,
+					   verify pg_catalog.bool default true)
+    RETURNS pg_catalog.void STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/contrib/test_shm_mq/test_shm_mq.control \
b/contrib/test_shm_mq/test_shm_mq.control new file mode 100644
index 0000000..d9a74c7
--- /dev/null
+++ b/contrib/test_shm_mq/test_shm_mq.control
@@ -0,0 +1,4 @@
+comment = 'Test code for shared memory message queues'
+default_version = '1.0'
+module_pathname = '$libdir/test_shm_mq'
+relocatable = true
diff --git a/contrib/test_shm_mq/test_shm_mq.h b/contrib/test_shm_mq/test_shm_mq.h
new file mode 100644
index 0000000..5f87da7
--- /dev/null
+++ b/contrib/test_shm_mq/test_shm_mq.h
@@ -0,0 +1,45 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_shm_mq.h
+ *		Definitions for shared memory message queues
+ *
+ * Copyright (C) 2013, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		contrib/test_shm_mq/test_shm_mq.h
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#ifndef TEST_SHM_MQ_H
+#define TEST_SHM_MQ_H
+
+#include "storage/dsm.h"
+#include "storage/shm_mq.h"
+#include "storage/spin.h"
+
+/* Identifier for shared memory segments used by this extension. */
+#define		PG_TEST_SHM_MQ_MAGIC		0x79fb2447
+
+/*
+ * This structure is stored in the dynamic shared memory segment.  We use
+ * it to determine whether all workers started up OK and successfully
+ * attached to their respective shared message queues.
+ */
+typedef struct
+{
+	slock_t			mutex;
+	int				workers_total;
+	int				workers_attached;
+	int				workers_ready;
+} test_shm_mq_header;
+
+/* Set up dynamic shared memory and background workers for test run. */
+extern void test_shm_mq_setup(uint64 queue_size, int32 nworkers,
+							  dsm_segment **seg, shm_mq_handle **output,
+							  shm_mq_handle **input);
+
+/* Main entrypoint for a worker. */
+extern void	test_shm_mq_main(Datum);
+
+#endif
diff --git a/contrib/test_shm_mq/worker.c b/contrib/test_shm_mq/worker.c
new file mode 100644
index 0000000..95b23c9
--- /dev/null
+++ b/contrib/test_shm_mq/worker.c
@@ -0,0 +1,224 @@
+/*--------------------------------------------------------------------------
+ *
+ * worker.c
+ *		Code for sample worker making use of shared memory message queues.
+ *		Our test worker simply reads messages from one message queue and
+ *		writes them back out to another message queue.  In a real
+ *		application, you'd presumably want the worker to do some more
+ *		complex calculation rather than simply returning the input,
+ *		but it should be possible to use much of the control logic just
+ *		as presented here.
+ *
+ * Copyright (C) 2013, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		contrib/test_shm_mq/worker.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/procarray.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "utils/resowner.h"
+
+#include "test_shm_mq.h"
+
+static void handle_sigterm(SIGNAL_ARGS);
+static void attach_to_queues(dsm_segment *seg, shm_toc *toc,
+							 int myworkernumber, shm_mq_handle **inqhp,
+							 shm_mq_handle **outqhp);
+static void copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh);
+
+/*
+ * Background worker entrypoint.
+ *
+ * This is intended to demonstrate how a background worker can be used to
+ * facilitate a parallel computation.  Most of the logic here is fairly
+ * boilerplate stuff, designed to attach to the shared memory segment,
+ * notify the user backend that we're alive, and so on.  The
+ * application-specific bits of logic that you'd replace for your own worker
+ * are attach_to_queues() and copy_messages().
+ */
+void
+test_shm_mq_main(Datum main_arg)
+{
+	dsm_segment *seg;
+	shm_toc	   *toc;
+	shm_mq_handle *inqh;
+	shm_mq_handle *outqh;
+	volatile test_shm_mq_header *hdr;
+	int			myworkernumber;
+	PGPROC	   *registrant;
+
+	/*
+	 * Establish signal handlers.
+	 *
+	 * We want CHECK_FOR_INTERRUPTS() to kill off this worker process just
+	 * as it would a normal user backend.  To make that happen, we establish
+	 * a signal handler that is a stripped-down version of die().  We don't
+	 * have any equivalent of the backend's command-read loop, where interrupts
+	 * can be processed immediately, so make sure ImmediateInterruptOK is
+	 * turned off.
+	 */
+	pqsignal(SIGTERM, handle_sigterm);
+	ImmediateInterruptOK = false;
+	BackgroundWorkerUnblockSignals();
+
+	/*
+	 * Connect to the dynamic shared memory segment.
+	 *
+	 * The backend that registered this worker passed us the ID of a shared
+	 * memory segment to which we must attach for further instructions.  In
+	 * order to attach to dynamic shared memory, we need a resource owner.
+	 * Once we've mapped the segment in our address space, attach to the table
+	 * of contents so we can locate the various data structures we'll need
+	 * to find within the segment.
+	 */
+	CurrentResourceOwner = ResourceOwnerCreate(NULL, "test_shm_mq worker");
+	seg = dsm_attach(DatumGetInt32(main_arg));
+	if (seg == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("unable to map dynamic shared memory segment")));
+	toc = shm_toc_attach(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg));
+	if (toc == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("bad magic number in dynamic shared memory segment")));
+
+	/*
+	 * Acquire a worker number.
+	 *
+	 * By convention, the process registering this background worker should
+	 * have stored the control structure at key 0.  We look up that key to
+	 * find it.  Our worker number gives our identity: there may be just one
+	 * worker involved in this parallel operation, or there may be many.
+	 */
+	hdr = shm_toc_lookup(toc, 0);
+	SpinLockAcquire(&hdr->mutex);
+	myworkernumber = ++hdr->workers_attached;
+	SpinLockRelease(&hdr->mutex);
+	if (myworkernumber > hdr->workers_total)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("too many message queue testing workers already")));
+
+	/*
+	 * Attach to the appropriate message queues.
+	 */
+	attach_to_queues(seg, toc, myworkernumber, &inqh, &outqh);
+
+	/*
+	 * Indicate that we're fully initialized and ready to begin the main
+	 * part of the parallel operation.
+	 *
+	 * Once we signal that we're ready, the user backend is entitled to assume
+	 * that our on_dsm_detach callbacks will fire before we disconnect from
+	 * the shared memory segment and exit.  Generally, that means we must have
+	 * attached to all relevant dynamic shared memory data structures by now.
+	 */
+	SpinLockAcquire(&hdr->mutex);
+	++hdr->workers_ready;
+	SpinLockRelease(&hdr->mutex);
+	registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
+	if (registrant == NULL)		
+	{
+		elog(DEBUG1, "registrant backend has exited prematurely");
+		proc_exit(1);
+	}
+	SetLatch(&registrant->procLatch);
+
+	/* Do the work. */
+	copy_messages(inqh, outqh);
+
+	/*
+	 * We're done.  Explicitly detach the shared memory segment so that we
+	 * don't get a resource leak warning at commit time.  This will fire any
+	 * on_dsm_detach callbacks we've registered, as well.  Once that's done,
+	 * we can go ahead and exit.
+	 */
+	dsm_detach(seg);
+	proc_exit(1);
+}
+
+/*
+ * Attach to shared memory message queues.
+ *
+ * We use our worker number to determine to which queue we should attach.
+ * The queues are registered at keys 1..<number-of-workers>.  The user backend
+ * writes to queue #1 and reads from queue #<number-of-workers>; each worker
+ * reads from the queue whose number is equal to its worker number and writes
+ * to the next higher-numbered queue.
+ */
+static void
+attach_to_queues(dsm_segment *seg, shm_toc *toc, int myworkernumber,
+				 shm_mq_handle **inqhp, shm_mq_handle **outqhp)
+{
+	shm_mq	   *inq;
+	shm_mq	   *outq;
+
+	inq = shm_toc_lookup(toc, myworkernumber);
+	shm_mq_set_receiver(inq, MyProc);
+	*inqhp = shm_mq_attach(inq, seg, NULL);
+	outq = shm_toc_lookup(toc, myworkernumber + 1);
+	shm_mq_set_sender(outq, MyProc);
+	*outqhp = shm_mq_attach(outq, seg, NULL);
+}
+
+/*
+ * Loop, receiving and sending messages, until the connection is broken.
+ *
+ * This is the "real work" performed by this worker process.  Everything that
+ * happens before this is initialization of one form or another, and everything
+ * after this point is cleanup.
+ */
+static void
+copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh)
+{
+	uint64		len;
+	void	   *data;
+	shm_mq_result res;
+
+	for (;;)
+	{
+		/* Notice any interrupts that have occurred. */
+		CHECK_FOR_INTERRUPTS();
+
+		/* Receive a message. */
+		res = shm_mq_receive(inqh, &len, &data, false);
+		if (res != SHM_MQ_SUCCESS)
+			break;
+
+		/* Send it back out. */
+		res = shm_mq_send(outqh, len, data, false);
+		if (res != SHM_MQ_SUCCESS)
+			break;
+	}
+}
+
+/*
+ * When we receive a SIGTERM, we set InterruptPending and ProcDiePending just
+ * like a normal backend.  The next CHECK_FOR_INTERRUPTS() will do the right
+ * thing.
+ */
+static void
+handle_sigterm(SIGNAL_ARGS)
+{
+	int			save_errno = errno;
+
+	if (MyProc)
+		SetLatch(&MyProc->procLatch);
+
+	if (!proc_exit_inprogress)
+	{
+		InterruptPending = true;
+		ProcDiePending = true;
+	}
+
+	errno = save_errno;
+}



-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic