[prev in list] [next in list] [prev in thread] [next in thread] 

List:       pgsql-hackers
Subject:    Re: [HACKERS] Proposal: Generic WAL logical messages
From:       Petr Jelinek <petr () 2ndquadrant ! com>
Date:       2016-02-29 21:10:05
Message-ID: 56D4B3AD.5000207 () 2ndquadrant ! com
[Download RAW message or body]

Hi,

attached is the newest version of the patch.

I removed the registry, renamed the 'send' to 'emit', documented the 
callback parameters properly. I also added the test to ddl.sql for the 
serialization and deserialization (and of course found a bug there) and 
in general fixed all the stuff Andres reported.

(see more inline)

On 28/02/16 22:55, Andres Freund wrote:
>
>
>>>
>>>> +void
>>>> +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
>>>> +						  bool transactional, const char *prefix, Size msg_sz,
>>>> +						  const char *msg)
>>>> +{
>>>> +	ReorderBufferTXN *txn = NULL;
>>>> +
>>>> +	if (transactional)
>>>> +	{
>>>> +		ReorderBufferChange *change;
>>>> +
>>>> +		txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
>>>> +
>>>> +		Assert(xid != InvalidTransactionId);
>>>> +		Assert(txn != NULL);
>>>> +
>>>> +		change = ReorderBufferGetChange(rb);
>>>> +		change->action = REORDER_BUFFER_CHANGE_MESSAGE;
>>>> +		change->data.msg.transactional = true;
>>>> +		change->data.msg.prefix = pstrdup(prefix);
>>>> +		change->data.msg.message_size = msg_sz;
>>>> +		change->data.msg.message = palloc(msg_sz);
>>>> +		memcpy(change->data.msg.message, msg, msg_sz);
>>>> +
>>>> +		ReorderBufferQueueChange(rb, xid, lsn, change);
>>>> +	}
>>>> +	else
>>>> +	{
>>>> +		rb->message(rb, txn, lsn, transactional, prefix, msg_sz, msg);
>>>> +	}
>>>> +}
>>>
>>>
>>> This approach prohibts catalog access when processing a nontransaction
>>> message as there's no snapshot set up.
>>>
>>
>> Hmm I do see usefulness in having snapshot, although I wonder if that does
>> not kill the point of non-transactional messages.
>
> I don't see how it would? It'd obviously have to be the catalog/historic
> snapshot a transaction would have had if it started in that moment in
> the original WAL stream?
>
>
>> Question is then though which snapshot should the message see,
>> base_snapshot of transaction?
>
> Well, there'll not be a transaction, but something like snapbuild.c's
> ->snapshot ought to do the trick.
>

Ok I added interface which returns either existing snapshot or makes new 
one, seems like the most reasonable thing to do to me.

>
>> That would mean we'd have to call SnapBuildProcessChange for
>> non-transactional messages which we currently avoid. Alternatively we
>> could probably invent lighter version of that interface that would
>> just make sure builder->snapshot is valid and if not then build it
>
> I think the latter is probably the direction we should go in.
>
>
>> I am honestly sure if that's a win or not.
>
> I think it'll be confusing (bug inducing) if there's no snapshot for
> non-transactional messages but for transactional ones, and it'll
> severely limit the usefulness of the interface.
>

Nono, I meant I am not sure if special interface is a win over just 
using SnapBuildProcessChange() in practice.


-- 
   Petr Jelinek                  http://www.2ndQuadrant.com/
   PostgreSQL Development, 24x7 Support, Training & Services

["logical-messages-2016-02-29.patch" (application/x-patch)]

From 2f037a757d9cec09f04457d82cdd1256b8255b78 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Wed, 24 Feb 2016 17:02:36 +0100
Subject: [PATCH] Logical Decoding Messages

---
 contrib/test_decoding/Makefile                  |  2 +-
 contrib/test_decoding/expected/ddl.out          | 21 ++++--
 contrib/test_decoding/expected/messages.out     | 56 ++++++++++++++++
 contrib/test_decoding/sql/ddl.sql               |  3 +-
 contrib/test_decoding/sql/messages.sql          | 17 +++++
 contrib/test_decoding/test_decoding.c           | 19 ++++++
 doc/src/sgml/func.sgml                          | 45 +++++++++++++
 doc/src/sgml/logicaldecoding.sgml               | 37 +++++++++++
 src/backend/access/rmgrdesc/Makefile            |  4 +-
 src/backend/access/rmgrdesc/logicalmsgdesc.c    | 41 ++++++++++++
 src/backend/access/transam/rmgr.c               |  1 +
 src/backend/replication/logical/Makefile        |  2 +-
 src/backend/replication/logical/decode.c        | 49 ++++++++++++++
 src/backend/replication/logical/logical.c       | 38 +++++++++++
 src/backend/replication/logical/logicalfuncs.c  | 27 ++++++++
 src/backend/replication/logical/message.c       | 87 +++++++++++++++++++++++++
 src/backend/replication/logical/reorderbuffer.c | 67 +++++++++++++++++++
 src/backend/replication/logical/snapbuild.c     | 19 ++++++
 src/bin/pg_xlogdump/rmgrdesc.c                  |  1 +
 src/include/access/rmgrlist.h                   |  1 +
 src/include/catalog/pg_proc.h                   |  4 ++
 src/include/replication/logicalfuncs.h          |  2 +
 src/include/replication/message.h               | 41 ++++++++++++
 src/include/replication/output_plugin.h         | 13 ++++
 src/include/replication/reorderbuffer.h         | 21 ++++++
 src/include/replication/snapbuild.h             |  2 +
 26 files changed, 608 insertions(+), 12 deletions(-)
 create mode 100644 contrib/test_decoding/expected/messages.out
 create mode 100644 contrib/test_decoding/sql/messages.sql
 create mode 100644 src/backend/access/rmgrdesc/logicalmsgdesc.c
 create mode 100644 src/backend/replication/logical/message.c
 create mode 100644 src/include/replication/message.h

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index a362e69..8fdcfbc 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -38,7 +38,7 @@ submake-test_decoding:
 	$(MAKE) -C $(top_builddir)/contrib/test_decoding
 
 REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel \
-	binary prepared replorigin
+	binary prepared replorigin messages
 
 regresscheck: | submake-regress submake-test_decoding temp-install
 	$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/ddl.out \
b/contrib/test_decoding/expected/ddl.out index 57a1289..4d13ad4 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -220,11 +220,17 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', \
NULL, NULL, 'inc  (7 rows)
 
 /*
- * check that disk spooling works
+ * check that disk spooling works (also for logical messages)
  */
 BEGIN;
 CREATE TABLE tr_etoomuch (id serial primary key, data int);
 INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i);
+SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical \
msg'); +    ?column?    
+----------------
+ tx logical msg
+(1 row)
+
 DELETE FROM tr_etoomuch WHERE id < 5000;
 UPDATE tr_etoomuch SET data = - data WHERE id > 5000;
 COMMIT;
@@ -233,12 +239,13 @@ SELECT count(*), min(data), max(data)
 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', \
'skip-empty-xacts', '1')  GROUP BY substring(data, 1, 24)
 ORDER BY 1,2;
- count |                       min                       |                           \
                max                                   
--------+-------------------------------------------------+------------------------------------------------------------------------
                
-     1 | BEGIN                                           | BEGIN
-     1 | COMMIT                                          | COMMIT
- 20467 | table public.tr_etoomuch: DELETE: id[integer]:1 | table public.tr_etoomuch: \
                UPDATE: id[integer]:9999 data[integer]:-9999
-(3 rows)
+ count |                                  min                                  |     \
max                                    \
+-------+-----------------------------------------------------------------------+------------------------------------------------------------------------
 +     1 | BEGIN                                                                 | \
BEGIN +     1 | COMMIT                                                                \
| COMMIT +     1 | message: transactional: 1 prefix: test, sz: 14 content:tx logical \
msg | message: transactional: 1 prefix: test, sz: 14 content:tx logical msg + 20467 | \
table public.tr_etoomuch: DELETE: id[integer]:1                       | table \
public.tr_etoomuch: UPDATE: id[integer]:9999 data[integer]:-9999 +(4 rows)
 
 -- check that a large, spooled, upsert works
 INSERT INTO tr_etoomuch (id, data)
diff --git a/contrib/test_decoding/expected/messages.out \
b/contrib/test_decoding/expected/messages.out new file mode 100644
index 0000000..7fa8256
--- /dev/null
+++ b/contrib/test_decoding/expected/messages.out
@@ -0,0 +1,56 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', \
'test_decoding'); + ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
+ ?column? 
+----------
+ msg1
+(1 row)
+
+SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
+ ?column? 
+----------
+ msg2
+(1 row)
+
+BEGIN;
+SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3');
+ ?column? 
+----------
+ msg3
+(1 row)
+
+SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4');
+ ?column? 
+----------
+ msg4
+(1 row)
+
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5');
+ ?column? 
+----------
+ msg5
+(1 row)
+
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, \
'force-binary', '0', 'skip-empty-xacts', '1'); +                            data      \
 +------------------------------------------------------------
+ message: transactional: 1 prefix: test, sz: 4 content:msg1
+ message: transactional: 0 prefix: test, sz: 4 content:msg2
+ message: transactional: 0 prefix: test, sz: 4 content:msg4
+ message: transactional: 1 prefix: test, sz: 4 content:msg3
+ message: transactional: 1 prefix: test, sz: 4 content:msg5
+(5 rows)
+
+SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+----------
+ init
+(1 row)
+
diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql
index e311c59..2f31784 100644
--- a/contrib/test_decoding/sql/ddl.sql
+++ b/contrib/test_decoding/sql/ddl.sql
@@ -108,11 +108,12 @@ DELETE FROM tr_pkey;
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, \
'include-xids', '0', 'skip-empty-xacts', '1');  
 /*
- * check that disk spooling works
+ * check that disk spooling works (also for logical messages)
  */
 BEGIN;
 CREATE TABLE tr_etoomuch (id serial primary key, data int);
 INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i);
+SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical \
msg');  DELETE FROM tr_etoomuch WHERE id < 5000;
 UPDATE tr_etoomuch SET data = - data WHERE id > 5000;
 COMMIT;
diff --git a/contrib/test_decoding/sql/messages.sql \
b/contrib/test_decoding/sql/messages.sql new file mode 100644
index 0000000..8744eb6
--- /dev/null
+++ b/contrib/test_decoding/sql/messages.sql
@@ -0,0 +1,17 @@
+-- predictability
+SET synchronous_commit = on;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', \
'test_decoding'); +
+SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
+SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
+
+BEGIN;
+SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3');
+SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4');
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5');
+COMMIT;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, \
'force-binary', '0', 'skip-empty-xacts', '1'); +
+SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c \
b/contrib/test_decoding/test_decoding.c index 4cf808f..6395ffd 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -21,6 +21,7 @@
 
 #include "replication/output_plugin.h"
 #include "replication/logical.h"
+#include "replication/message.h"
 #include "replication/origin.h"
 
 #include "utils/builtins.h"
@@ -63,6 +64,10 @@ static void pg_decode_change(LogicalDecodingContext *ctx,
 				 ReorderBufferChange *change);
 static bool pg_decode_filter(LogicalDecodingContext *ctx,
 				 RepOriginId origin_id);
+static void pg_decode_message(LogicalDecodingContext *ctx,
+							  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+							  bool transactional, const char *prefix,
+							  Size sz, const char *message);
 
 void
 _PG_init(void)
@@ -82,6 +87,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->commit_cb = pg_decode_commit_txn;
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
+	cb->message_cb = pg_decode_message;
 }
 
 
@@ -471,3 +477,16 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN \
*txn,  
 	OutputPluginWrite(ctx, true);
 }
+
+static void
+pg_decode_message(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn, XLogRecPtr lsn,
+				  bool transactional, const char *prefix,
+				  Size sz, const char *message)
+{
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu \
content:", +					 transactional, prefix, sz);
+	appendBinaryStringInfo(ctx->out, message, sz);
+	OutputPluginWrite(ctx, true);
+}
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index c0b94bc..1201f9f 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -17769,6 +17769,51 @@ postgres=# SELECT * FROM \
pg_xlogfile_name_offset(pg_stop_backup());  </entry>
       </row>
 
+      <row>
+       <entry id="pg-logical-emit-message-text">
+        <indexterm>
+         <primary>pg_logical_emit_message</primary>
+        </indexterm>
+        <literal><function>pg_logical_emit_message(<parameter>transactional</parameter> \
<type>bool</type>, <parameter>prefix</parameter> <type>text</type>, \
<parameter>content</parameter> <type>text</type>)</function></literal> +       \
</entry> +       <entry>
+        void
+       </entry>
+       <entry>
+        Write text logical decoding message. This can be used to pass generic
+        messages to logical decoding plugins through WAL. The parameter
+        <parameter>transactional</parameter> specifies if the message should
+        be part of current transaction or if it should be written immediately
+        and decoded as soon as the logical decoding reads the record. The
+        <parameter>prefix</parameter> is textual prefix used by the logical
+        decoding plugins to easily recognize interesting messages for them.
+        The <parameter>content</parameter> is the text of the message.
+       </entry>
+      </row>
+
+      <row>
+       <entry id="pg-logical-emit-message-bytea">
+        <indexterm>
+         <primary>>pg_logical_emit_message</primary>
+        </indexterm>
+        <literal><function>>pg_logical_emit_message(<parameter>transactional</parameter> \
<type>bool</type>, <parameter>prefix</parameter> <type>text</type>, \
<parameter>content</parameter> <type>bytea</type>)</function></literal> +       \
</entry> +       <entry>
+        void
+       </entry>
+       <entry>
+        Write binary logical decoding message. This can be used to pass generic
+        messages to logical decoding plugins through WAL. The parameter
+        <parameter>transactional</parameter> specifies if the message should
+        be part of current transaction or if it should be written immediately
+        and decoded as soon as the logical decoding reads the record. The
+        <parameter>prefix</parameter> is textual prefix used by the logical
+        decoding plugins to easily recognize interesting messages for them.
+        The <parameter>content</parameter> is the binary content of the
+        message.
+       </entry>
+      </row>
+
      </tbody>
     </tgroup>
    </table>
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index e841348..93ffcfb 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -363,6 +363,7 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeBeginCB begin_cb;
     LogicalDecodeChangeCB change_cb;
     LogicalDecodeCommitCB commit_cb;
+    LogicalDecodeMessageCB message_cb;
     LogicalDecodeFilterByOriginCB filter_by_origin_cb;
     LogicalDecodeShutdownCB shutdown_cb;
 } OutputPluginCallbacks;
@@ -602,6 +603,42 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (
        more efficient.
      </para>
      </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-message">
+     <title>Generic Message Callback</title>
+
+     <para>
+      The optional <function>message_cb</function> callback is called whenever
+      a logical decoding message has been decoded.
+<programlisting>
+typedef void (*LogicalDecodeMessageCB) (
+    struct LogicalDecodingContext *,
+    ReorderBufferTXN *txn,
+    XLogRecPtr message_lsn,
+    bool transactional,
+    const char *prefix,
+    Size message_size,
+    const char *message
+);
+</programlisting>
+      The <parameter>ctx</parameter> and <parameter>txn</parameter> parameters
+      have the same contents as for the <function>begin_cb</function> with the
+      difference that <parameter>txn</parameter> is null for non-transactional
+      messages. The <parameter>lsn</parameter> has WAL position of the message.
+      The <parameter>transactional</parameter> says if he message was sent as
+      transactional or not. The <parameter>prefix</parameter> is arbitrary
+      textual prefix which can be used for identifying interesting messages
+      for the current plugin. And finally the <parameter>message</parameter>
+      parameter holds the actual message of <parameter>message_size</parameter>
+      size.
+     </para>
+     <para>
+      Extra care should be taken to ensure that the prefix the output plugin
+      considers interesting is unique. Using name of the extension or the
+      output plugin itself is often a good choice.
+     </para>
+    </sect3>
+
    </sect2>
 
    <sect2 id="logicaldecoding-output-plugin-output">
diff --git a/src/backend/access/rmgrdesc/Makefile \
b/src/backend/access/rmgrdesc/Makefile index c72a1f2..723b4d8 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -9,8 +9,8 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o gindesc.o gistdesc.o \
-	   hashdesc.o heapdesc.o mxactdesc.o nbtdesc.o relmapdesc.o \
-	   replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \
+	   hashdesc.o heapdesc.o logicalmsgdesc.o mxactdesc.o nbtdesc.o \
+	   relmapdesc.o replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \
 	   standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/rmgrdesc/logicalmsgdesc.c \
b/src/backend/access/rmgrdesc/logicalmsgdesc.c new file mode 100644
index 0000000..e5e84dd
--- /dev/null
+++ b/src/backend/access/rmgrdesc/logicalmsgdesc.c
@@ -0,0 +1,41 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalmsgdesc.c
+ *	  rmgr descriptor routines for replication/logical/message.c
+ *
+ * Portions Copyright (c) 2015-2016, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/rmgrdesc/logicalmsgdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "replication/message.h"
+
+void
+logicalmsg_desc(StringInfo buf, XLogReaderState *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info == XLOG_LOGICAL_MESSAGE)
+	{
+		xl_logical_message *xlrec = (xl_logical_message *) rec;
+
+		appendStringInfo(buf, "%s message size %zu bytes",
+				   xlrec->transactional ? "transactional" : "nontransactional",
+						 xlrec->message_size);
+	}
+}
+
+const char *
+logicalmsg_identify(uint8 info)
+{
+	if (info & ~XLR_INFO_MASK == XLOG_LOGICAL_MESSAGE)
+		return "MESSAGE";
+
+	return NULL;
+}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 7c4d773..1a42121 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -23,6 +23,7 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "replication/message.h"
 #include "replication/origin.h"
 #include "storage/standby.h"
 #include "utils/relmapper.h"
diff --git a/src/backend/replication/logical/Makefile \
b/src/backend/replication/logical/Makefile index 8adea13..1d7ca06 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,7 +14,7 @@ include $(top_builddir)/src/Makefile.global
 
 override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
-OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o origin.o \
+OBJS = decode.o logical.o logicalfuncs.o message.o origin.o reorderbuffer.o \
 	snapbuild.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/decode.c \
b/src/backend/replication/logical/decode.c index 88c3a49..368a2f6 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -39,6 +39,7 @@
 
 #include "replication/decode.h"
 #include "replication/logical.h"
+#include "replication/message.h"
 #include "replication/reorderbuffer.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
@@ -58,6 +59,7 @@ static void DecodeHeapOp(LogicalDecodingContext *ctx, \
XLogRecordBuffer *buf);  static void DecodeHeap2Op(LogicalDecodingContext *ctx, \
XLogRecordBuffer *buf);  static void DecodeXactOp(LogicalDecodingContext *ctx, \
XLogRecordBuffer *buf);  static void DecodeStandbyOp(LogicalDecodingContext *ctx, \
XLogRecordBuffer *buf); +static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, \
XLogRecordBuffer *buf);  
 /* individual record(group)'s handlers */
 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -115,6 +117,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, \
XLogReaderState *recor  DecodeHeapOp(ctx, &buf);
 			break;
 
+		case RM_LOGICALMSG_ID:
+			DecodeLogicalMsgOp(ctx, &buf);
+			break;
+
 			/*
 			 * Rmgrs irrelevant for logical decoding; they describe stuff not
 			 * represented in logical decoding. Add new rmgrs in rmgrlist.h's
@@ -431,6 +437,49 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	}
 }
 
+/*
+ * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	SnapBuild  *builder = ctx->snapshot_builder;
+	XLogReaderState *r = buf->record;
+	uint8		info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+	xl_logical_message *message;
+
+	if (info != XLOG_LOGICAL_MESSAGE)
+		elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
+
+	message = (xl_logical_message *) XLogRecGetData(r);
+
+	if (message->transactional)
+	{
+		if (!SnapBuildProcessChange(builder, XLogRecGetXid(r), buf->origptr))
+			return;
+
+		ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r),
+								  buf->endptr,
+								  message->message, /* first part of message is prefix */
+								  message->message_size,
+								  message->message + message->prefix_size);
+	}
+	else if (SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT &&
+			 !SnapBuildXactNeedsSkip(builder, buf->origptr))
+	{
+		volatile Snapshot	snapshot_now;
+		ReorderBuffer	   *rb = ctx->reorder;
+
+		/* setup snapshot to allow catalog access */
+		snapshot_now = SnapBuildGetOrBuildSnapshot(builder, XLogRecGetXid(r));
+		SetupHistoricSnapshot(snapshot_now, NULL);
+		rb->message(rb, NULL, buf->origptr, false, message->message,
+					message->message_size,
+					message->message + message->prefix_size);
+		TeardownHistoricSnapshot(false);
+	}
+}
+
 static inline bool
 FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
diff --git a/src/backend/replication/logical/logical.c \
b/src/backend/replication/logical/logical.c index 2e6d3f9..3974cd4 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -62,6 +62,9 @@ static void commit_cb_wrapper(ReorderBuffer *cache, \
ReorderBufferTXN *txn,  XLogRecPtr commit_lsn);
 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  Relation relation, ReorderBufferChange *change);
+static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr message_lsn, bool transactional,
+				  const char *prefix, Size sz, const char *message);
 
 static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
 
@@ -178,6 +181,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->begin = begin_cb_wrapper;
 	ctx->reorder->apply_change = change_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
+	ctx->reorder->message = message_cb_wrapper;
 
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
@@ -702,6 +706,40 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, \
RepOriginId origin_id)  return ret;
 }
 
+static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+							   XLogRecPtr message_lsn,
+							   bool transactional, const char *prefix,
+							   Size sz, const char *message)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	if (ctx->callbacks.message_cb == NULL)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "message";
+	state.report_location = message_lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+	ctx->write_location = message_lsn;
+
+	/* do the actual work: call callback */
+	ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional,
+							  prefix, sz, message);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 /*
  * Set the required catalog xmin horizon for historic snapshots in the current
  * replication slot.
diff --git a/src/backend/replication/logical/logicalfuncs.c \
b/src/backend/replication/logical/logicalfuncs.c index f789fc1..fe22daa 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -24,6 +24,8 @@
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
 
+#include "access/xact.h"
+
 #include "catalog/pg_type.h"
 
 #include "nodes/makefuncs.h"
@@ -41,6 +43,7 @@
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/logicalfuncs.h"
+#include "replication/message.h"
 
 #include "storage/fd.h"
 
@@ -363,3 +366,27 @@ pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
 {
 	return pg_logical_slot_get_changes_guts(fcinfo, false, true);
 }
+
+
+/*
+ * SQL function returning the changestream in binary, only peeking ahead.
+ */
+Datum
+pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
+{
+	bool		transactional = PG_GETARG_BOOL(0);
+	char	   *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
+	bytea	   *data = PG_GETARG_BYTEA_PP(2);
+	XLogRecPtr	lsn;
+
+	lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
+							transactional);
+	PG_RETURN_LSN(lsn);
+}
+
+Datum
+pg_logical_emit_message_text(PG_FUNCTION_ARGS)
+{
+	/* bytea and text are compatible */
+	return pg_logical_emit_message_bytea(fcinfo);
+}
diff --git a/src/backend/replication/logical/message.c \
b/src/backend/replication/logical/message.c new file mode 100644
index 0000000..214be45
--- /dev/null
+++ b/src/backend/replication/logical/message.c
@@ -0,0 +1,87 @@
+/*-------------------------------------------------------------------------
+ *
+ * message.c
+ *	  Generic logical messages.
+ *
+ * Copyright (c) 2013-2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/message.c
+ *
+ * NOTES
+ *
+ * Generic logical messages allow XLOG logging of arbitrary binary blobs that
+ * get passed to the logical decoding plugin. In normal XLOG processing they
+ * are same as NOOP.
+ *
+ * These messages can be either transactional or non-transactional.
+ * Transactional messages are part of current transaction and will be sent to
+ * decoding plugin using in a same way as DML operations.
+ * Non-transactional messages are sent to the plugin at the time when the
+ * logical decoding reads them from XLOG.
+ *
+ * Every message carries prefix to avoid conflicts between different decoding
+ * plugins. The prefix has to be registered before the message using that
+ * prefix can be written to XLOG. The prefix can be registered exactly once to
+ * avoid situation where multiple third party extensions try to use same
+ * prefix.
+ *
+ * ---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+
+#include "catalog/indexing.h"
+
+#include "nodes/execnodes.h"
+
+#include "replication/message.h"
+#include "replication/logical.h"
+
+#include "utils/memutils.h"
+
+/*
+ * Write logical decoding message into XLog.
+ */
+XLogRecPtr
+LogLogicalMessage(const char *prefix, const char *message, size_t size,
+				  bool transactional)
+{
+	xl_logical_message	xlrec;
+
+	/*
+	 * Force xid to be allocated if we're emitting a transactional message.
+	 */
+	if (transactional)
+	{
+		Assert(IsTransactionState());
+		GetCurrentTransactionId();
+	}
+
+	xlrec.transactional = transactional;
+	xlrec.prefix_size = strlen(prefix) + 1;
+	xlrec.message_size = size;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
+	XLogRegisterData((char *) prefix, xlrec.prefix_size);
+	XLogRegisterData((char *) message, size);
+
+	return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
+}
+
+/*
+ * Redo is basically just noop for logical decoding messages.
+ */
+void
+logicalmsg_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info != XLOG_LOGICAL_MESSAGE)
+			elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
+
+	/* This is only interesting for logical decoding, see decode.c. */
+}
diff --git a/src/backend/replication/logical/reorderbuffer.c \
b/src/backend/replication/logical/reorderbuffer.c index 78acced..0312346 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -414,6 +414,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange \
*change)  change->data.tp.oldtuple = NULL;
 			}
 			break;
+		case REORDER_BUFFER_CHANGE_MESSAGE:
+			if (change->data.msg.prefix != NULL)
+				pfree(change->data.msg.prefix);
+			change->data.msg.prefix = NULL;
+			if (change->data.msg.message != NULL)
+				pfree(change->data.msg.message);
+			change->data.msg.message = NULL;
+			break;
 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
 			if (change->data.snapshot)
 			{
@@ -599,6 +607,26 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, \
XLogRecPtr lsn,  ReorderBufferCheckSerializeTXN(rb, txn);
 }
 
+void
+ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
+						  const char *prefix, Size msg_sz, const char *msg)
+{
+	ReorderBufferChange *change;
+
+	Assert(xid != InvalidTransactionId);
+
+	change = ReorderBufferGetChange(rb);
+	change->action = REORDER_BUFFER_CHANGE_MESSAGE;
+	change->data.msg.transactional = true;
+	change->data.msg.prefix = pstrdup(prefix);
+	change->data.msg.message_size = msg_sz;
+	change->data.msg.message = palloc(msg_sz);
+	memcpy(change->data.msg.message, msg, msg_sz);
+
+	ReorderBufferQueueChange(rb, xid, lsn, change);
+}
+
+
 static void
 AssertTXNLsnOrder(ReorderBuffer *rb)
 {
@@ -1465,6 +1493,14 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 					specinsert = change;
 					break;
 
+				case REORDER_BUFFER_CHANGE_MESSAGE:
+					rb->message(rb, txn, change->lsn,
+								change->data.msg.transactional,
+								change->data.msg.prefix,
+								change->data.msg.message_size,
+								change->data.msg.message);
+					break;
+
 				case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
 					/* get rid of the old */
 					TeardownHistoricSnapshot(false);
@@ -2117,6 +2153,21 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, \
ReorderBufferTXN *txn,  }
 				break;
 			}
+		case REORDER_BUFFER_CHANGE_MESSAGE:
+			{
+				char	   *data;
+				size_t		prefix_size = strlen(change->data.msg.prefix) + 1;
+
+				sz += prefix_size + change->data.msg.message_size;
+				ReorderBufferSerializeReserve(rb, sz);
+
+				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+				memcpy(data, change->data.msg.prefix,
+					   prefix_size);
+				memcpy(data + prefix_size, change->data.msg.message,
+					   change->data.msg.message_size);
+				break;
+			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
 			{
 				Snapshot	snap;
@@ -2354,6 +2405,22 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN \
*txn,  data += len;
 			}
 			break;
+		case REORDER_BUFFER_CHANGE_MESSAGE:
+			{
+				Size		message_size = change->data.msg.message_size;
+				/* prefix includes trailing zero */
+				Size		prefix_size = strlen(data) + 1;
+
+				change->data.msg.prefix = MemoryContextAlloc(rb->context,
+															 prefix_size);
+				memcpy(change->data.msg.prefix, data, prefix_size);
+				data += prefix_size;
+				change->data.msg.message = MemoryContextAlloc(rb->context,
+															  message_size);
+				memcpy(change->data.msg.message, data, message_size);
+				data += message_size;
+				break;
+			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
 			{
 				Snapshot	oldsnap;
diff --git a/src/backend/replication/logical/snapbuild.c \
b/src/backend/replication/logical/snapbuild.c index ed823ec..d7829c3 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -605,6 +605,25 @@ SnapBuildExportSnapshot(SnapBuild *builder)
 }
 
 /*
+ * Ensure there is a snapshot and if not build one for current transaction.
+ */
+Snapshot
+SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
+{
+	Assert(builder->state == SNAPBUILD_CONSISTENT);
+
+	/* only build a new snapshot if we don't have a prebuilt one */
+	if (builder->snapshot == NULL)
+	{
+		builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
+		/* inrease refcount for the snapshot builder */
+		SnapBuildSnapIncRefcount(builder->snapshot);
+	}
+
+	return builder->snapshot;
+}
+
+/*
  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
  * any. Aborts the previously started transaction and resets the resource
  * owner back to its original value.
diff --git a/src/bin/pg_xlogdump/rmgrdesc.c b/src/bin/pg_xlogdump/rmgrdesc.c
index f9cd395..6ba7f22 100644
--- a/src/bin/pg_xlogdump/rmgrdesc.c
+++ b/src/bin/pg_xlogdump/rmgrdesc.c
@@ -25,6 +25,7 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "replication/message.h"
 #include "replication/origin.h"
 #include "rmgrdesc.h"
 #include "storage/standbydefs.h"
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index fab912d..35c242d 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -45,3 +45,4 @@ PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, \
spg_xlog_start  PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, \
NULL, NULL)  PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, \
commit_ts_identify, NULL, NULL)  PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", \
replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL) \
+PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, \
                logicalmsg_identify, NULL, NULL)
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 62b9125..ff722fe 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5084,6 +5084,10 @@ DATA(insert OID = 3784 (  pg_logical_slot_peek_changes PGNSP \
PGUID 12 1000 1000  DESCR("peek at changes from replication slot");
 DATA(insert OID = 3785 (  pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 \
1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" \
"{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" \
_null_ _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ ));  \
DESCR("peek at binary changes from replication slot"); +DATA(insert OID = 3577 (  \
pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f f f v u 3 0 3220 "16 25 25" \
_null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_text _null_ _null_ _null_ \
)); +DESCR("emit a textual logical decoding message");
+DATA(insert OID = 3578 (  pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f f f \
v u 3 0 3220 "16 25 17" _null_ _null_ _null_ _null_ _null_ \
pg_logical_emit_message_bytea _null_ _null_ _null_ )); +DESCR("emit a binary logical \
decoding message");  
 /* event triggers */
 DATA(insert OID = 3566 (  pg_event_trigger_dropped_objects		PGNSP PGUID 12 10 100 0 \
0 f f f f t t s s 0 0 2249 "" "{26,26,23,16,16,16,25,25,25,25,1009,1009}" \
"{o,o,o,o,o,o,o,o,o,o,o,o}" "{classid, objid, objsubid, original, normal, \
is_temporary, object_type, schema_name, object_name, object_identity, address_names, \
address_args}" _null_ _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ \
                ));
diff --git a/src/include/replication/logicalfuncs.h \
b/src/include/replication/logicalfuncs.h index c87a1df..5540414 100644
--- a/src/include/replication/logicalfuncs.h
+++ b/src/include/replication/logicalfuncs.h
@@ -21,4 +21,6 @@ extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS);
 extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS);
 extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS);
 
+extern Datum pg_logical_emit_message_bytea(PG_FUNCTION_ARGS);
+extern Datum pg_logical_emit_message_text(PG_FUNCTION_ARGS);
 #endif
diff --git a/src/include/replication/message.h b/src/include/replication/message.h
new file mode 100644
index 0000000..b1730c9
--- /dev/null
+++ b/src/include/replication/message.h
@@ -0,0 +1,41 @@
+/*-------------------------------------------------------------------------
+ * message.h
+ *	   Exports from replication/logical/message.c
+ *
+ * Copyright (c) 2013-2016, PostgreSQL Global Development Group
+ *
+ * src/include/replication/message.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_MESSAGE_H
+#define PG_LOGICAL_MESSAGE_H
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+
+/*
+ * Generic logical decoding message wal record.
+ */
+typedef struct xl_logical_message
+{
+	bool		transactional;					/* is message transactional? */
+	size_t		prefix_size;					/* length of prefix */
+	size_t		message_size;					/* size of the message */
+	char		message[FLEXIBLE_ARRAY_MEMBER];	/* message including the null
+												 * terminated prefx of length
+												 * prefix_size */
+} xl_logical_message;
+
+#define SizeOfLogicalMessage	(offsetof(xl_logical_message, message))
+
+extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
+									size_t size, bool transactional);
+
+/* RMGR API*/
+#define XLOG_LOGICAL_MESSAGE	0x00
+void		logicalmsg_redo(XLogReaderState *record);
+void		logicalmsg_desc(StringInfo buf, XLogReaderState *record);
+const char *logicalmsg_identify(uint8 info);
+
+#endif   /* PG_LOGICAL_MESSAGE_H */
diff --git a/src/include/replication/output_plugin.h \
b/src/include/replication/output_plugin.h index 577b12e..3a2ca98 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -74,6 +74,18 @@ typedef void (*LogicalDecodeCommitCB) (
 												   XLogRecPtr commit_lsn);
 
 /*
+ * Called for the generic logical decoding messages.
+ */
+typedef void (*LogicalDecodeMessageCB) (
+											 struct LogicalDecodingContext *,
+											 ReorderBufferTXN *txn,
+											 XLogRecPtr message_lsn,
+											 bool transactional,
+											 const char *prefix,
+											 Size message_size,
+											 const char *message);
+
+/*
  * Filter changes by origin.
  */
 typedef bool (*LogicalDecodeFilterByOriginCB) (
@@ -96,6 +108,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeBeginCB begin_cb;
 	LogicalDecodeChangeCB change_cb;
 	LogicalDecodeCommitCB commit_cb;
+	LogicalDecodeMessageCB message_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 } OutputPluginCallbacks;
diff --git a/src/include/replication/reorderbuffer.h \
b/src/include/replication/reorderbuffer.h index d33ea27..a3af5a6 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -51,6 +51,7 @@ enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_INSERT,
 	REORDER_BUFFER_CHANGE_UPDATE,
 	REORDER_BUFFER_CHANGE_DELETE,
+	REORDER_BUFFER_CHANGE_MESSAGE,
 	REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
 	REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
 	REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
@@ -95,6 +96,14 @@ typedef struct ReorderBufferChange
 			ReorderBufferTupleBuf *newtuple;
 		}			tp;
 
+		struct
+		{
+			bool transactional;
+			char *prefix;
+			size_t message_size;
+			char *message;
+		} msg;
+
 		/* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
 		Snapshot	snapshot;
 
@@ -271,6 +280,15 @@ typedef void (*ReorderBufferCommitCB) (
 												   ReorderBufferTXN *txn,
 												   XLogRecPtr commit_lsn);
 
+/* message callback signature */
+typedef void (*ReorderBufferMessageCB) (
+												   ReorderBuffer *rb,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr message_lsn,
+												   bool transactional,
+												   const char *prefix, Size sz,
+												   const char *message);
+
 struct ReorderBuffer
 {
 	/*
@@ -297,6 +315,7 @@ struct ReorderBuffer
 	ReorderBufferBeginCB begin;
 	ReorderBufferApplyChangeCB apply_change;
 	ReorderBufferCommitCB commit;
+	ReorderBufferMessageCB message;
 
 	/*
 	 * Pointer that will be passed untouched to the callbacks.
@@ -347,6 +366,8 @@ ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
 void		ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
 
 void		ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, \
ReorderBufferChange *); +void		ReorderBufferQueueMessage(ReorderBuffer *, \
TransactionId, XLogRecPtr lsn, +									  const char *prefix, Size msg_sz, const \
char *msg);  void ReorderBufferCommit(ReorderBuffer *, TransactionId,
 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 	  TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
diff --git a/src/include/replication/snapbuild.h \
b/src/include/replication/snapbuild.h index 75955af..c4127a1 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -63,6 +63,8 @@ extern const char *SnapBuildExportSnapshot(SnapBuild *snapstate);
 extern void SnapBuildClearExportedSnapshot(void);
 
 extern SnapBuildState SnapBuildCurrentState(SnapBuild *snapstate);
+extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
+											TransactionId xid);
 
 extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
 
-- 
1.9.1



-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic