[prev in list] [next in list] [prev in thread] [next in thread] 

List:       postgresql-general
Subject:    [HACKERS] logical decoding of two-phase transactions
From:       Stas Kelvich <s.kelvich () postgrespro ! ru>
Date:       2016-12-31 8:36:10
Message-ID: 02DA5F5E-CECE-4D9C-8B4B-418077E2C010 () postgrespro ! ru
[Download RAW message or body]

Here is resubmission of patch to implement logical decoding of two-phase transactions \
(instead of treating them as usual transaction when commit) [1] I've slightly \
polished things and used test_decoding output plugin as client.

General idea quite simple here:

* Write gid along with commit/prepare records in case of 2pc
* Add several routines to decode prepare records in the same way as it already \
happens in logical decoding.

I've also added explicit LOCK statement in test_decoding regression suit to check \
that it doesn't break thing. If somebody can create scenario that will block decoding \
because of existing dummy backend lock that will be great help. Right now all my \
tests passing (including TAP tests to check recovery of twophase tx in case of \
failures from  adjacent mail thread).

If we will agree about current approach than I'm ready to add this stuff to proposed \
in-core logical replication.

[1] https://www.postgresql.org/message-id/EE7452CA-3C39-4A0E-97EC-17A414972884%40postgrespro.ru



["logical_twophase.diff" (logical_twophase.diff)]

diff --git a/contrib/test_decoding/expected/prepared.out \
b/contrib/test_decoding/expected/prepared.out index 46e915d..af81c47 100644
--- a/contrib/test_decoding/expected/prepared.out
+++ b/contrib/test_decoding/expected/prepared.out
@@ -25,6 +25,7 @@ BEGIN;
 INSERT INTO test_prepared1 VALUES (5);
 ALTER TABLE test_prepared1 ADD COLUMN data text;
 INSERT INTO test_prepared1 VALUES (6, 'frakbar');
+LOCK test_prepared1;
 PREPARE TRANSACTION 'test_prepared#3';
 -- test that we decode correctly while an uncommitted prepared xact
 -- with ddl exists.
@@ -44,27 +45,33 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', \
                NULL, NULL, 'inc
 -------------------------------------------------------------------------
  BEGIN
  table public.test_prepared1: INSERT: id[integer]:1
- COMMIT
+ PREPARE
+ COMMIT PREPARED
  BEGIN
  table public.test_prepared1: INSERT: id[integer]:2
  COMMIT
  BEGIN
- table public.test_prepared1: INSERT: id[integer]:4
- COMMIT
+ table public.test_prepared1: INSERT: id[integer]:3
+ PREPARE
+ ABORT PREPARED
  BEGIN
- table public.test_prepared2: INSERT: id[integer]:7
+ table public.test_prepared1: INSERT: id[integer]:4
  COMMIT
  BEGIN
  table public.test_prepared1: INSERT: id[integer]:5
  table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
+ PREPARE
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:7
  COMMIT
+ COMMIT PREPARED
  BEGIN
  table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
  COMMIT
  BEGIN
  table public.test_prepared2: INSERT: id[integer]:9
  COMMIT
-(22 rows)
+(28 rows)
 
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
diff --git a/contrib/test_decoding/sql/prepared.sql \
b/contrib/test_decoding/sql/prepared.sql index e726397..ac76b8c 100644
--- a/contrib/test_decoding/sql/prepared.sql
+++ b/contrib/test_decoding/sql/prepared.sql
@@ -25,6 +25,7 @@ BEGIN;
 INSERT INTO test_prepared1 VALUES (5);
 ALTER TABLE test_prepared1 ADD COLUMN data text;
 INSERT INTO test_prepared1 VALUES (6, 'frakbar');
+LOCK test_prepared1;
 PREPARE TRANSACTION 'test_prepared#3';
 
 -- test that we decode correctly while an uncommitted prepared xact
diff --git a/contrib/test_decoding/test_decoding.c \
b/contrib/test_decoding/test_decoding.c index 949e9a7..53ced57 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -232,10 +232,25 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, \
ReorderBufferTXN *txn,  return;
 
 	OutputPluginPrepareWrite(ctx, true);
+
+	switch(txn->xact_action)
+	{
+		case XLOG_XACT_COMMIT:
+			appendStringInfoString(ctx->out, "COMMIT");
+			break;
+		case XLOG_XACT_PREPARE:
+			appendStringInfoString(ctx->out, "PREPARE");
+			break;
+		case XLOG_XACT_COMMIT_PREPARED:
+			appendStringInfoString(ctx->out, "COMMIT PREPARED");
+			break;
+		case XLOG_XACT_ABORT_PREPARED:
+			appendStringInfoString(ctx->out, "ABORT PREPARED");
+			break;
+	}
+
 	if (data->include_xids)
-		appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
-	else
-		appendStringInfoString(ctx->out, "COMMIT");
+		appendStringInfo(ctx->out, " %u", txn->xid);
 
 	if (data->include_timestamp)
 		appendStringInfo(ctx->out, " (at %s)",
diff --git a/src/backend/access/rmgrdesc/xactdesc.c \
b/src/backend/access/rmgrdesc/xactdesc.c index 91d27d0..679f457 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -98,10 +98,13 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, \
xl_xact_parsed_commit *pars  if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
 	{
 		xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
+		uint8 gidlen = xl_twophase->gidlen;
 
 		parsed->twophase_xid = xl_twophase->xid;
+		data += MinSizeOfXactTwophase;
 
-		data += sizeof(xl_xact_twophase);
+		strcpy(parsed->twophase_gid, data);
+		data += gidlen;
 	}
 
 	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
@@ -139,6 +142,16 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, \
xl_xact_parsed_abort *parsed)  data += sizeof(xl_xact_xinfo);
 	}
 
+	if (parsed->xinfo & XACT_XINFO_HAS_DBINFO)
+	{
+		xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data;
+
+		parsed->dbId = xl_dbinfo->dbId;
+		parsed->tsId = xl_dbinfo->tsId;
+
+		data += sizeof(xl_xact_dbinfo);
+	}
+
 	if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
 	{
 		xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
@@ -164,10 +177,13 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, \
xl_xact_parsed_abort *parsed)  if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
 	{
 		xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
+		uint8 gidlen = xl_twophase->gidlen;
 
 		parsed->twophase_xid = xl_twophase->xid;
+		data += MinSizeOfXactTwophase;
 
-		data += sizeof(xl_xact_twophase);
+		strcpy(parsed->twophase_gid, data);
+		data += gidlen;
 	}
 }
 
diff --git a/src/backend/access/transam/twophase.c \
b/src/backend/access/transam/twophase.c index 5415604..964bcaf 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -130,7 +130,6 @@ int			max_prepared_xacts = 0;
  * Note that the max value of GIDSIZE must fit in the uint16 gidlen,
  * specified in TwoPhaseFileHeader.
  */
-#define GIDSIZE 200
 
 typedef struct GlobalTransactionData
 {
@@ -188,12 +187,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
 								RelFileNode *rels,
 								int ninvalmsgs,
 								SharedInvalidationMessage *invalmsgs,
-								bool initfileinval);
+								bool initfileinval,
+								const char *gid);
 static void RecordTransactionAbortPrepared(TransactionId xid,
 							   int nchildren,
 							   TransactionId *children,
 							   int nrels,
-							   RelFileNode *rels);
+							   RelFileNode *rels,
+							   const char *gid);
 static void ProcessRecords(char *bufptr, TransactionId xid,
 			   const TwoPhaseCallback callbacks[]);
 static void RemoveGXact(GlobalTransaction gxact);
@@ -1236,6 +1237,41 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
 	return buf;
 }
 
+/*
+ * ParsePrepareRecord
+ */
+void
+ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
+{
+	TwoPhaseFileHeader *hdr;
+	char *bufptr;
+
+	hdr = (TwoPhaseFileHeader *) xlrec;
+	bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
+
+	parsed->twophase_xid = hdr->xid;
+	parsed->dbId = hdr->database;
+	parsed->nsubxacts = hdr->nsubxacts;
+	parsed->ncommitrels = hdr->ncommitrels;
+	parsed->nabortrels = hdr->nabortrels;
+	parsed->nmsgs = hdr->ninvalmsgs;
+
+	strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
+	bufptr += MAXALIGN(hdr->gidlen);
+
+	parsed->subxacts = (TransactionId *) bufptr;
+	bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
+
+	parsed->commitrels = (RelFileNode *) bufptr;
+	bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
+
+	parsed->abortrels = (RelFileNode *) bufptr;
+	bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+
+	parsed->msgs = (SharedInvalidationMessage *) bufptr;
+	bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
+}
+
 
 /*
  * Reads 2PC data from xlog. During checkpoint this data will be moved to
@@ -1389,11 +1425,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 										hdr->nsubxacts, children,
 										hdr->ncommitrels, commitrels,
 										hdr->ninvalmsgs, invalmsgs,
-										hdr->initfileinval);
+										hdr->initfileinval, gid);
 	else
 		RecordTransactionAbortPrepared(xid,
 									   hdr->nsubxacts, children,
-									   hdr->nabortrels, abortrels);
+									   hdr->nabortrels, abortrels,
+									   gid);
 
 	ProcArrayRemove(proc, latestXid);
 
@@ -2038,7 +2075,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
 								RelFileNode *rels,
 								int ninvalmsgs,
 								SharedInvalidationMessage *invalmsgs,
-								bool initfileinval)
+								bool initfileinval,
+								const char *gid)
 {
 	XLogRecPtr	recptr;
 	TimestampTz committs = GetCurrentTimestamp();
@@ -2061,7 +2099,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
 								 nchildren, children, nrels, rels,
 								 ninvalmsgs, invalmsgs,
 								 initfileinval, false,
-								 xid);
+								 xid, gid);
 
 
 	if (replorigin)
@@ -2123,7 +2161,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
 							   int nchildren,
 							   TransactionId *children,
 							   int nrels,
-							   RelFileNode *rels)
+							   RelFileNode *rels,
+							   const char *gid)
 {
 	XLogRecPtr	recptr;
 
@@ -2141,7 +2180,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
 	recptr = XactLogAbortRecord(GetCurrentTimestamp(),
 								nchildren, children,
 								nrels, rels,
-								xid);
+								xid, gid);
 
 	/* Always flush, since we're about to remove the 2PC state file */
 	XLogFlush(recptr);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index e47fd44..1081f8c 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1230,7 +1230,7 @@ RecordTransactionCommit(void)
 							nchildren, children, nrels, rels,
 							nmsgs, invalMessages,
 							RelcacheInitFileInval, forceSyncCommit,
-							InvalidTransactionId /* plain commit */ );
+							InvalidTransactionId, NULL /* plain commit */ );
 
 		if (replorigin)
 			/* Move LSNs forward for this replication origin */
@@ -1582,7 +1582,7 @@ RecordTransactionAbort(bool isSubXact)
 	XactLogAbortRecord(xact_time,
 					   nchildren, children,
 					   nrels, rels,
-					   InvalidTransactionId);
+					   InvalidTransactionId, NULL);
 
 	/*
 	 * Report the latest async abort LSN, so that the WAL writer knows to
@@ -3467,7 +3467,7 @@ BeginTransactionBlock(void)
  * resource owner, etc while executing inside a Portal.
  */
 bool
-PrepareTransactionBlock(char *gid)
+PrepareTransactionBlock(const char *gid)
 {
 	TransactionState s;
 	bool		result;
@@ -5106,7 +5106,7 @@ XactLogCommitRecord(TimestampTz commit_time,
 					int nrels, RelFileNode *rels,
 					int nmsgs, SharedInvalidationMessage *msgs,
 					bool relcacheInval, bool forceSync,
-					TransactionId twophase_xid)
+					TransactionId twophase_xid, const char *twophase_gid)
 {
 	xl_xact_commit xlrec;
 	xl_xact_xinfo xl_xinfo;
@@ -5178,6 +5178,7 @@ XactLogCommitRecord(TimestampTz commit_time,
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
 		xl_twophase.xid = twophase_xid;
+		xl_twophase.gidlen = strlen(twophase_gid) + 1; /* Include '\0' */
 	}
 
 	/* dump transaction origin information */
@@ -5228,7 +5229,10 @@ XactLogCommitRecord(TimestampTz commit_time,
 	}
 
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
-		XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+	{
+		XLogRegisterData((char *) (&xl_twophase), MinSizeOfXactTwophase);
+		XLogRegisterData((char *) twophase_gid, xl_twophase.gidlen);
+	}
 
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
 		XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
@@ -5249,13 +5253,14 @@ XLogRecPtr
 XactLogAbortRecord(TimestampTz abort_time,
 				   int nsubxacts, TransactionId *subxacts,
 				   int nrels, RelFileNode *rels,
-				   TransactionId twophase_xid)
+				   TransactionId twophase_xid, const char *twophase_gid)
 {
 	xl_xact_abort xlrec;
 	xl_xact_xinfo xl_xinfo;
 	xl_xact_subxacts xl_subxacts;
 	xl_xact_relfilenodes xl_relfilenodes;
 	xl_xact_twophase xl_twophase;
+	xl_xact_dbinfo xl_dbinfo;
 
 	uint8		info;
 
@@ -5290,6 +5295,14 @@ XactLogAbortRecord(TimestampTz abort_time,
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
 		xl_twophase.xid = twophase_xid;
+		xl_twophase.gidlen = strlen(twophase_gid) + 1; /* Include '\0' */
+	}
+
+	if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive())
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
+		xl_dbinfo.dbId = MyDatabaseId;
+		xl_dbinfo.tsId = MyDatabaseTableSpace;
 	}
 
 	if (xl_xinfo.xinfo != 0)
@@ -5304,6 +5317,9 @@ XactLogAbortRecord(TimestampTz abort_time,
 	if (xl_xinfo.xinfo != 0)
 		XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo));
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+		XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
 	{
 		XLogRegisterData((char *) (&xl_subxacts),
@@ -5321,7 +5337,13 @@ XactLogAbortRecord(TimestampTz abort_time,
 	}
 
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
-		XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+	{
+		XLogRegisterData((char *) (&xl_twophase), MinSizeOfXactTwophase);
+		XLogRegisterData((char *) twophase_gid, xl_twophase.gidlen);
+	}
+
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+		XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
 
 	return XLogInsert(RM_XACT_ID, info);
 }
diff --git a/src/backend/replication/logical/decode.c \
b/src/backend/replication/logical/decode.c index 46cd5ba..c15c2ed 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -34,6 +34,7 @@
 #include "access/xlogutils.h"
 #include "access/xlogreader.h"
 #include "access/xlogrecord.h"
+#include "access/twophase.h"
 
 #include "catalog/pg_control.h"
 
@@ -71,7 +72,9 @@ static void DecodeSpecConfirm(LogicalDecodingContext *ctx, \
XLogRecordBuffer *buf  static void DecodeCommit(LogicalDecodingContext *ctx, \
XLogRecordBuffer *buf,  xl_xact_parsed_commit *parsed, TransactionId xid);
 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
-			xl_xact_parsed_abort *parsed, TransactionId xid);
+			 xl_xact_parsed_abort *parsed, TransactionId xid);
+static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			 xl_xact_parsed_prepare *parsed);
 
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
@@ -221,6 +224,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
 		return;
 
+	reorder->xact_action = info;
+
 	switch (info)
 	{
 		case XLOG_XACT_COMMIT:
@@ -277,17 +282,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer \
*buf)  break;
 			}
 		case XLOG_XACT_PREPARE:
-
-			/*
-			 * Currently decoding ignores PREPARE TRANSACTION and will just
-			 * decode the transaction when the COMMIT PREPARED is sent or
-			 * throw away the transaction's contents when a ROLLBACK PREPARED
-			 * is received. In the future we could add code to expose prepared
-			 * transactions in the changestream allowing for a kind of
-			 * distributed 2PC.
-			 */
-			ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
-			break;
+ 			{
+ 				xl_xact_parsed_prepare parsed;
+ 				ParsePrepareRecord(XLogRecGetInfo(buf->record),
+ 									XLogRecGetData(buf->record), &parsed);
+ 				DecodePrepare(ctx, buf, &parsed);
+ 				break;
+			}
 		default:
 			elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
 	}
@@ -607,6 +608,67 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 								 buf->origptr, buf->endptr);
 	}
 
+	if (TransactionIdIsValid(parsed->twophase_xid)) {
+		/*
+		 * We are processing COMMIT PREPARED and know that reorder buffer is
+		 * empty. So we can skip use shortcut for coomiting bare xact.
+		 */
+		strcpy(ctx->reorder->gid, parsed->twophase_gid);
+		ReorderBufferCommitBareXact(ctx->reorder, xid, buf->origptr, buf->endptr,
+							commit_time, origin_id, origin_lsn);
+	} else {
+		/* replay actions of all transaction + subtransactions in order */
+		ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
+							commit_time, origin_id, origin_lsn);
+	}
+}
+
+static void
+DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			 xl_xact_parsed_prepare *parsed)
+{
+	XLogRecPtr	origin_lsn = InvalidXLogRecPtr;
+	TimestampTz	commit_time = 0;
+	XLogRecPtr	origin_id = XLogRecGetOrigin(buf->record);
+	int			i;
+	TransactionId xid = parsed->twophase_xid;
+	strcpy(ctx->reorder->gid, parsed->twophase_gid);
+
+	/*
+	 * Process invalidation messages, even if we're not interested in the
+	 * transaction's contents, since the various caches need to always be
+	 * consistent.
+	 */
+	if (parsed->nmsgs > 0)
+	{
+		ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
+									  parsed->nmsgs, parsed->msgs);
+		ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
+	}
+
+	SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
+					   parsed->nsubxacts, parsed->subxacts);
+
+	if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+		(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
+		FilterByOrigin(ctx, origin_id))
+	{
+		for (i = 0; i < parsed->nsubxacts; i++)
+		{
+			ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
+		}
+		ReorderBufferForget(ctx->reorder, xid, buf->origptr);
+
+		return;
+	}
+
+	/* tell the reorderbuffer about the surviving subtransactions */
+	for (i = 0; i < parsed->nsubxacts; i++)
+	{
+		ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
+								 buf->origptr, buf->endptr);
+	}
+
 	/* replay actions of all transaction + subtransactions in order */
 	ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
 						commit_time, origin_id, origin_lsn);
@@ -621,6 +683,22 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			xl_xact_parsed_abort *parsed, TransactionId xid)
 {
 	int			i;
+	XLogRecPtr	origin_lsn = InvalidXLogRecPtr;
+	XLogRecPtr	commit_time = InvalidXLogRecPtr;
+	XLogRecPtr	origin_id = XLogRecGetOrigin(buf->record);
+
+	/*
+	 * If that is ROLLBACK PREPARED than send that to callbacks.
+	 */
+	if (TransactionIdIsValid(parsed->twophase_xid)
+			&& (parsed->dbId == ctx->slot->data.database)) {
+
+		strcpy(ctx->reorder->gid, parsed->twophase_gid);
+
+		ReorderBufferCommitBareXact(ctx->reorder, xid, buf->origptr, buf->endptr,
+							commit_time, origin_id, origin_lsn);
+		return;
+	}
 
 	SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid,
 					  parsed->nsubxacts, parsed->subxacts);
diff --git a/src/backend/replication/logical/reorderbuffer.c \
b/src/backend/replication/logical/reorderbuffer.c index fa84bd8..23176c6 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1373,6 +1373,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 	txn->commit_time = commit_time;
 	txn->origin_id = origin_id;
 	txn->origin_lsn = origin_lsn;
+	txn->xact_action = rb->xact_action;
+	memcpy(txn->gid, rb->gid, GIDSIZE);
 
 	/*
 	 * If this transaction didn't have any real changes in our database, it's
@@ -1708,6 +1710,32 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 	PG_END_TRY();
 }
 
+
+/*
+ * Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED.
+ */
+void
+ReorderBufferCommitBareXact(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn,
+								true);
+
+	txn->final_lsn = commit_lsn;
+	txn->end_lsn = end_lsn;
+	txn->commit_time = commit_time;
+	txn->origin_id = origin_id;
+	txn->origin_lsn = origin_lsn;
+	txn->xact_action = rb->xact_action;
+	strcpy(txn->gid, rb->gid);
+
+	rb->commit(rb, txn, commit_lsn);
+}
+
 /*
  * Abort a transaction that possibly has previous changes. Needs to be first
  * called for subtransactions and then for the toplevel xid.
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index b7ce0c6..1b8e7a0 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -15,6 +15,7 @@
 #define TWOPHASE_H
 
 #include "access/xlogdefs.h"
+#include "access/xact.h"
 #include "datatype/timestamp.h"
 #include "storage/lock.h"
 
@@ -46,6 +47,8 @@ extern bool StandbyTransactionIdIsPrepared(TransactionId xid);
 
 extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p,
 							int *nxids_p);
+extern void ParsePrepareRecord(uint8 info, char *xlrec,
+							xl_xact_parsed_prepare *parsed);
 extern void StandbyRecoverPreparedTransactions(bool overwriteOK);
 extern void RecoverPreparedTransactions(void);
 
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index a123d2a..eb052f9 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -21,6 +21,10 @@
 #include "storage/sinval.h"
 #include "utils/datetime.h"
 
+/*
+ * Maximum size of Global Transaction ID.
+ */
+#define GIDSIZE 200
 
 /*
  * Xact isolation levels
@@ -224,7 +228,10 @@ typedef struct xl_xact_invals
 typedef struct xl_xact_twophase
 {
 	TransactionId xid;
+ 	uint8 gidlen;
+ 	char gid[GIDSIZE];
 } xl_xact_twophase;
+#define MinSizeOfXactTwophase offsetof(xl_xact_twophase, gid)
 
 typedef struct xl_xact_origin
 {
@@ -283,13 +290,37 @@ typedef struct xl_xact_parsed_commit
 	SharedInvalidationMessage *msgs;
 
 	TransactionId twophase_xid; /* only for 2PC */
+	char 		twophase_gid[GIDSIZE];
 
 	XLogRecPtr	origin_lsn;
 	TimestampTz origin_timestamp;
 } xl_xact_parsed_commit;
 
+typedef struct xl_xact_parsed_prepare
+{
+	Oid			dbId;			/* MyDatabaseId */
+
+	int			nsubxacts;
+	TransactionId *subxacts;
+
+	int			ncommitrels;
+	RelFileNode *commitrels;
+
+	int			nabortrels;
+	RelFileNode *abortrels;
+
+	int			nmsgs;
+	SharedInvalidationMessage *msgs;
+
+	TransactionId twophase_xid;
+	char 		twophase_gid[GIDSIZE];
+} xl_xact_parsed_prepare;
+
 typedef struct xl_xact_parsed_abort
 {
+	Oid			dbId;
+	Oid			tsId;
+
 	TimestampTz xact_time;
 	uint32		xinfo;
 
@@ -300,6 +331,7 @@ typedef struct xl_xact_parsed_abort
 	RelFileNode *xnodes;
 
 	TransactionId twophase_xid; /* only for 2PC */
+	char 		twophase_gid[GIDSIZE];
 } xl_xact_parsed_abort;
 
 
@@ -331,7 +363,7 @@ extern void CommitTransactionCommand(void);
 extern void AbortCurrentTransaction(void);
 extern void BeginTransactionBlock(void);
 extern bool EndTransactionBlock(void);
-extern bool PrepareTransactionBlock(char *gid);
+extern bool PrepareTransactionBlock(const char *gid);
 extern void UserAbortTransactionBlock(void);
 extern void ReleaseSavepoint(List *options);
 extern void DefineSavepoint(char *name);
@@ -364,12 +396,12 @@ extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
 					int nrels, RelFileNode *rels,
 					int nmsgs, SharedInvalidationMessage *msgs,
 					bool relcacheInval, bool forceSync,
-					TransactionId twophase_xid);
+					TransactionId twophase_xid, const char *twophase_gid);
 
 extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
 				   int nsubxacts, TransactionId *subxacts,
 				   int nrels, RelFileNode *rels,
-				   TransactionId twophase_xid);
+				   TransactionId twophase_xid, const char *twophase_gid);
 extern void xact_redo(XLogReaderState *record);
 
 /* xactdesc.c */
diff --git a/src/include/replication/reorderbuffer.h \
b/src/include/replication/reorderbuffer.h index 9e209ae..13a2195 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
 #define REORDERBUFFER_H
 
 #include "access/htup_details.h"
+#include "access/twophase.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
@@ -144,6 +145,14 @@ typedef struct ReorderBufferTXN
 	 */
 	TransactionId xid;
 
+	/*
+	 * Commit callback is used for COMMIT/PREPARE/COMMMIT PREPARED,
+	 * as well as abort for ROLLBACK and ROLLBACK PREPARED. Here
+	 * stored actual xact action allowing decoding plugin to distinguish them.
+	 */
+	uint8		xact_action;
+	char		gid[GIDSIZE];
+
 	/* did the TX have catalog changes */
 	bool		has_catalog_changes;
 
@@ -299,6 +308,10 @@ struct ReorderBuffer
 	 */
 	HTAB	   *by_txn;
 
+	/* For twophase tx support we need to pass XACT action to ReorderBufferTXN */
+	uint8		xact_action;
+	char		gid[GIDSIZE];
+
 	/*
 	 * Transactions that could be a toplevel xact, ordered by LSN of the first
 	 * record bearing that xid.
@@ -375,6 +388,10 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, \
Snapshot snapshot  void ReorderBufferCommit(ReorderBuffer *, TransactionId,
 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 	  TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
+void ReorderBufferCommitBareXact(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn);
 void		ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, \
XLogRecPtr commit_lsn);  void ReorderBufferCommitChild(ReorderBuffer *, \
TransactionId, TransactionId,  XLogRecPtr commit_lsn, XLogRecPtr end_lsn);



-- 
Stas Kelvich
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company




-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic