[prev in list] [next in list] [prev in thread] [next in thread]
List: postgresql-general
Subject: [HACKERS] logical decoding of two-phase transactions
From: Stas Kelvich <s.kelvich () postgrespro ! ru>
Date: 2016-12-31 8:36:10
Message-ID: 02DA5F5E-CECE-4D9C-8B4B-418077E2C010 () postgrespro ! ru
[Download RAW message or body]
Here is resubmission of patch to implement logical decoding of two-phase transactions \
(instead of treating them as usual transaction when commit) [1] I've slightly \
polished things and used test_decoding output plugin as client.
General idea quite simple here:
* Write gid along with commit/prepare records in case of 2pc
* Add several routines to decode prepare records in the same way as it already \
happens in logical decoding.
I've also added explicit LOCK statement in test_decoding regression suit to check \
that it doesn't break thing. If somebody can create scenario that will block decoding \
because of existing dummy backend lock that will be great help. Right now all my \
tests passing (including TAP tests to check recovery of twophase tx in case of \
failures from adjacent mail thread).
If we will agree about current approach than I'm ready to add this stuff to proposed \
in-core logical replication.
[1] https://www.postgresql.org/message-id/EE7452CA-3C39-4A0E-97EC-17A414972884%40postgrespro.ru
["logical_twophase.diff" (logical_twophase.diff)]
diff --git a/contrib/test_decoding/expected/prepared.out \
b/contrib/test_decoding/expected/prepared.out index 46e915d..af81c47 100644
--- a/contrib/test_decoding/expected/prepared.out
+++ b/contrib/test_decoding/expected/prepared.out
@@ -25,6 +25,7 @@ BEGIN;
INSERT INTO test_prepared1 VALUES (5);
ALTER TABLE test_prepared1 ADD COLUMN data text;
INSERT INTO test_prepared1 VALUES (6, 'frakbar');
+LOCK test_prepared1;
PREPARE TRANSACTION 'test_prepared#3';
-- test that we decode correctly while an uncommitted prepared xact
-- with ddl exists.
@@ -44,27 +45,33 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', \
NULL, NULL, 'inc
-------------------------------------------------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:1
- COMMIT
+ PREPARE
+ COMMIT PREPARED
BEGIN
table public.test_prepared1: INSERT: id[integer]:2
COMMIT
BEGIN
- table public.test_prepared1: INSERT: id[integer]:4
- COMMIT
+ table public.test_prepared1: INSERT: id[integer]:3
+ PREPARE
+ ABORT PREPARED
BEGIN
- table public.test_prepared2: INSERT: id[integer]:7
+ table public.test_prepared1: INSERT: id[integer]:4
COMMIT
BEGIN
table public.test_prepared1: INSERT: id[integer]:5
table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
+ PREPARE
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:7
COMMIT
+ COMMIT PREPARED
BEGIN
table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
COMMIT
BEGIN
table public.test_prepared2: INSERT: id[integer]:9
COMMIT
-(22 rows)
+(28 rows)
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
diff --git a/contrib/test_decoding/sql/prepared.sql \
b/contrib/test_decoding/sql/prepared.sql index e726397..ac76b8c 100644
--- a/contrib/test_decoding/sql/prepared.sql
+++ b/contrib/test_decoding/sql/prepared.sql
@@ -25,6 +25,7 @@ BEGIN;
INSERT INTO test_prepared1 VALUES (5);
ALTER TABLE test_prepared1 ADD COLUMN data text;
INSERT INTO test_prepared1 VALUES (6, 'frakbar');
+LOCK test_prepared1;
PREPARE TRANSACTION 'test_prepared#3';
-- test that we decode correctly while an uncommitted prepared xact
diff --git a/contrib/test_decoding/test_decoding.c \
b/contrib/test_decoding/test_decoding.c index 949e9a7..53ced57 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -232,10 +232,25 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, \
ReorderBufferTXN *txn, return;
OutputPluginPrepareWrite(ctx, true);
+
+ switch(txn->xact_action)
+ {
+ case XLOG_XACT_COMMIT:
+ appendStringInfoString(ctx->out, "COMMIT");
+ break;
+ case XLOG_XACT_PREPARE:
+ appendStringInfoString(ctx->out, "PREPARE");
+ break;
+ case XLOG_XACT_COMMIT_PREPARED:
+ appendStringInfoString(ctx->out, "COMMIT PREPARED");
+ break;
+ case XLOG_XACT_ABORT_PREPARED:
+ appendStringInfoString(ctx->out, "ABORT PREPARED");
+ break;
+ }
+
if (data->include_xids)
- appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
- else
- appendStringInfoString(ctx->out, "COMMIT");
+ appendStringInfo(ctx->out, " %u", txn->xid);
if (data->include_timestamp)
appendStringInfo(ctx->out, " (at %s)",
diff --git a/src/backend/access/rmgrdesc/xactdesc.c \
b/src/backend/access/rmgrdesc/xactdesc.c index 91d27d0..679f457 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -98,10 +98,13 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, \
xl_xact_parsed_commit *pars if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
{
xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
+ uint8 gidlen = xl_twophase->gidlen;
parsed->twophase_xid = xl_twophase->xid;
+ data += MinSizeOfXactTwophase;
- data += sizeof(xl_xact_twophase);
+ strcpy(parsed->twophase_gid, data);
+ data += gidlen;
}
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
@@ -139,6 +142,16 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, \
xl_xact_parsed_abort *parsed) data += sizeof(xl_xact_xinfo);
}
+ if (parsed->xinfo & XACT_XINFO_HAS_DBINFO)
+ {
+ xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data;
+
+ parsed->dbId = xl_dbinfo->dbId;
+ parsed->tsId = xl_dbinfo->tsId;
+
+ data += sizeof(xl_xact_dbinfo);
+ }
+
if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
{
xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
@@ -164,10 +177,13 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, \
xl_xact_parsed_abort *parsed) if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
{
xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
+ uint8 gidlen = xl_twophase->gidlen;
parsed->twophase_xid = xl_twophase->xid;
+ data += MinSizeOfXactTwophase;
- data += sizeof(xl_xact_twophase);
+ strcpy(parsed->twophase_gid, data);
+ data += gidlen;
}
}
diff --git a/src/backend/access/transam/twophase.c \
b/src/backend/access/transam/twophase.c index 5415604..964bcaf 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -130,7 +130,6 @@ int max_prepared_xacts = 0;
* Note that the max value of GIDSIZE must fit in the uint16 gidlen,
* specified in TwoPhaseFileHeader.
*/
-#define GIDSIZE 200
typedef struct GlobalTransactionData
{
@@ -188,12 +187,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
RelFileNode *rels,
int ninvalmsgs,
SharedInvalidationMessage *invalmsgs,
- bool initfileinval);
+ bool initfileinval,
+ const char *gid);
static void RecordTransactionAbortPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
- RelFileNode *rels);
+ RelFileNode *rels,
+ const char *gid);
static void ProcessRecords(char *bufptr, TransactionId xid,
const TwoPhaseCallback callbacks[]);
static void RemoveGXact(GlobalTransaction gxact);
@@ -1236,6 +1237,41 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
return buf;
}
+/*
+ * ParsePrepareRecord
+ */
+void
+ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
+{
+ TwoPhaseFileHeader *hdr;
+ char *bufptr;
+
+ hdr = (TwoPhaseFileHeader *) xlrec;
+ bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
+
+ parsed->twophase_xid = hdr->xid;
+ parsed->dbId = hdr->database;
+ parsed->nsubxacts = hdr->nsubxacts;
+ parsed->ncommitrels = hdr->ncommitrels;
+ parsed->nabortrels = hdr->nabortrels;
+ parsed->nmsgs = hdr->ninvalmsgs;
+
+ strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
+ bufptr += MAXALIGN(hdr->gidlen);
+
+ parsed->subxacts = (TransactionId *) bufptr;
+ bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
+
+ parsed->commitrels = (RelFileNode *) bufptr;
+ bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
+
+ parsed->abortrels = (RelFileNode *) bufptr;
+ bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+
+ parsed->msgs = (SharedInvalidationMessage *) bufptr;
+ bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
+}
+
/*
* Reads 2PC data from xlog. During checkpoint this data will be moved to
@@ -1389,11 +1425,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
hdr->nsubxacts, children,
hdr->ncommitrels, commitrels,
hdr->ninvalmsgs, invalmsgs,
- hdr->initfileinval);
+ hdr->initfileinval, gid);
else
RecordTransactionAbortPrepared(xid,
hdr->nsubxacts, children,
- hdr->nabortrels, abortrels);
+ hdr->nabortrels, abortrels,
+ gid);
ProcArrayRemove(proc, latestXid);
@@ -2038,7 +2075,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
RelFileNode *rels,
int ninvalmsgs,
SharedInvalidationMessage *invalmsgs,
- bool initfileinval)
+ bool initfileinval,
+ const char *gid)
{
XLogRecPtr recptr;
TimestampTz committs = GetCurrentTimestamp();
@@ -2061,7 +2099,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
nchildren, children, nrels, rels,
ninvalmsgs, invalmsgs,
initfileinval, false,
- xid);
+ xid, gid);
if (replorigin)
@@ -2123,7 +2161,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
- RelFileNode *rels)
+ RelFileNode *rels,
+ const char *gid)
{
XLogRecPtr recptr;
@@ -2141,7 +2180,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
recptr = XactLogAbortRecord(GetCurrentTimestamp(),
nchildren, children,
nrels, rels,
- xid);
+ xid, gid);
/* Always flush, since we're about to remove the 2PC state file */
XLogFlush(recptr);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index e47fd44..1081f8c 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1230,7 +1230,7 @@ RecordTransactionCommit(void)
nchildren, children, nrels, rels,
nmsgs, invalMessages,
RelcacheInitFileInval, forceSyncCommit,
- InvalidTransactionId /* plain commit */ );
+ InvalidTransactionId, NULL /* plain commit */ );
if (replorigin)
/* Move LSNs forward for this replication origin */
@@ -1582,7 +1582,7 @@ RecordTransactionAbort(bool isSubXact)
XactLogAbortRecord(xact_time,
nchildren, children,
nrels, rels,
- InvalidTransactionId);
+ InvalidTransactionId, NULL);
/*
* Report the latest async abort LSN, so that the WAL writer knows to
@@ -3467,7 +3467,7 @@ BeginTransactionBlock(void)
* resource owner, etc while executing inside a Portal.
*/
bool
-PrepareTransactionBlock(char *gid)
+PrepareTransactionBlock(const char *gid)
{
TransactionState s;
bool result;
@@ -5106,7 +5106,7 @@ XactLogCommitRecord(TimestampTz commit_time,
int nrels, RelFileNode *rels,
int nmsgs, SharedInvalidationMessage *msgs,
bool relcacheInval, bool forceSync,
- TransactionId twophase_xid)
+ TransactionId twophase_xid, const char *twophase_gid)
{
xl_xact_commit xlrec;
xl_xact_xinfo xl_xinfo;
@@ -5178,6 +5178,7 @@ XactLogCommitRecord(TimestampTz commit_time,
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
xl_twophase.xid = twophase_xid;
+ xl_twophase.gidlen = strlen(twophase_gid) + 1; /* Include '\0' */
}
/* dump transaction origin information */
@@ -5228,7 +5229,10 @@ XactLogCommitRecord(TimestampTz commit_time,
}
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
- XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+ {
+ XLogRegisterData((char *) (&xl_twophase), MinSizeOfXactTwophase);
+ XLogRegisterData((char *) twophase_gid, xl_twophase.gidlen);
+ }
if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
@@ -5249,13 +5253,14 @@ XLogRecPtr
XactLogAbortRecord(TimestampTz abort_time,
int nsubxacts, TransactionId *subxacts,
int nrels, RelFileNode *rels,
- TransactionId twophase_xid)
+ TransactionId twophase_xid, const char *twophase_gid)
{
xl_xact_abort xlrec;
xl_xact_xinfo xl_xinfo;
xl_xact_subxacts xl_subxacts;
xl_xact_relfilenodes xl_relfilenodes;
xl_xact_twophase xl_twophase;
+ xl_xact_dbinfo xl_dbinfo;
uint8 info;
@@ -5290,6 +5295,14 @@ XactLogAbortRecord(TimestampTz abort_time,
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
xl_twophase.xid = twophase_xid;
+ xl_twophase.gidlen = strlen(twophase_gid) + 1; /* Include '\0' */
+ }
+
+ if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive())
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
+ xl_dbinfo.dbId = MyDatabaseId;
+ xl_dbinfo.tsId = MyDatabaseTableSpace;
}
if (xl_xinfo.xinfo != 0)
@@ -5304,6 +5317,9 @@ XactLogAbortRecord(TimestampTz abort_time,
if (xl_xinfo.xinfo != 0)
XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo));
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+ XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
{
XLogRegisterData((char *) (&xl_subxacts),
@@ -5321,7 +5337,13 @@ XactLogAbortRecord(TimestampTz abort_time,
}
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
- XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+ {
+ XLogRegisterData((char *) (&xl_twophase), MinSizeOfXactTwophase);
+ XLogRegisterData((char *) twophase_gid, xl_twophase.gidlen);
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+ XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
return XLogInsert(RM_XACT_ID, info);
}
diff --git a/src/backend/replication/logical/decode.c \
b/src/backend/replication/logical/decode.c index 46cd5ba..c15c2ed 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -34,6 +34,7 @@
#include "access/xlogutils.h"
#include "access/xlogreader.h"
#include "access/xlogrecord.h"
+#include "access/twophase.h"
#include "catalog/pg_control.h"
@@ -71,7 +72,9 @@ static void DecodeSpecConfirm(LogicalDecodingContext *ctx, \
XLogRecordBuffer *buf static void DecodeCommit(LogicalDecodingContext *ctx, \
XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid);
static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- xl_xact_parsed_abort *parsed, TransactionId xid);
+ xl_xact_parsed_abort *parsed, TransactionId xid);
+static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_prepare *parsed);
/* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
@@ -221,6 +224,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
+ reorder->xact_action = info;
+
switch (info)
{
case XLOG_XACT_COMMIT:
@@ -277,17 +282,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer \
*buf) break;
}
case XLOG_XACT_PREPARE:
-
- /*
- * Currently decoding ignores PREPARE TRANSACTION and will just
- * decode the transaction when the COMMIT PREPARED is sent or
- * throw away the transaction's contents when a ROLLBACK PREPARED
- * is received. In the future we could add code to expose prepared
- * transactions in the changestream allowing for a kind of
- * distributed 2PC.
- */
- ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
- break;
+ {
+ xl_xact_parsed_prepare parsed;
+ ParsePrepareRecord(XLogRecGetInfo(buf->record),
+ XLogRecGetData(buf->record), &parsed);
+ DecodePrepare(ctx, buf, &parsed);
+ break;
+ }
default:
elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
}
@@ -607,6 +608,67 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
buf->origptr, buf->endptr);
}
+ if (TransactionIdIsValid(parsed->twophase_xid)) {
+ /*
+ * We are processing COMMIT PREPARED and know that reorder buffer is
+ * empty. So we can skip use shortcut for coomiting bare xact.
+ */
+ strcpy(ctx->reorder->gid, parsed->twophase_gid);
+ ReorderBufferCommitBareXact(ctx->reorder, xid, buf->origptr, buf->endptr,
+ commit_time, origin_id, origin_lsn);
+ } else {
+ /* replay actions of all transaction + subtransactions in order */
+ ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
+ commit_time, origin_id, origin_lsn);
+ }
+}
+
+static void
+DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_prepare *parsed)
+{
+ XLogRecPtr origin_lsn = InvalidXLogRecPtr;
+ TimestampTz commit_time = 0;
+ XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
+ int i;
+ TransactionId xid = parsed->twophase_xid;
+ strcpy(ctx->reorder->gid, parsed->twophase_gid);
+
+ /*
+ * Process invalidation messages, even if we're not interested in the
+ * transaction's contents, since the various caches need to always be
+ * consistent.
+ */
+ if (parsed->nmsgs > 0)
+ {
+ ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
+ parsed->nmsgs, parsed->msgs);
+ ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
+ }
+
+ SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
+ parsed->nsubxacts, parsed->subxacts);
+
+ if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+ (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
+ FilterByOrigin(ctx, origin_id))
+ {
+ for (i = 0; i < parsed->nsubxacts; i++)
+ {
+ ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
+ }
+ ReorderBufferForget(ctx->reorder, xid, buf->origptr);
+
+ return;
+ }
+
+ /* tell the reorderbuffer about the surviving subtransactions */
+ for (i = 0; i < parsed->nsubxacts; i++)
+ {
+ ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
+ buf->origptr, buf->endptr);
+ }
+
/* replay actions of all transaction + subtransactions in order */
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
commit_time, origin_id, origin_lsn);
@@ -621,6 +683,22 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_abort *parsed, TransactionId xid)
{
int i;
+ XLogRecPtr origin_lsn = InvalidXLogRecPtr;
+ XLogRecPtr commit_time = InvalidXLogRecPtr;
+ XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
+
+ /*
+ * If that is ROLLBACK PREPARED than send that to callbacks.
+ */
+ if (TransactionIdIsValid(parsed->twophase_xid)
+ && (parsed->dbId == ctx->slot->data.database)) {
+
+ strcpy(ctx->reorder->gid, parsed->twophase_gid);
+
+ ReorderBufferCommitBareXact(ctx->reorder, xid, buf->origptr, buf->endptr,
+ commit_time, origin_id, origin_lsn);
+ return;
+ }
SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid,
parsed->nsubxacts, parsed->subxacts);
diff --git a/src/backend/replication/logical/reorderbuffer.c \
b/src/backend/replication/logical/reorderbuffer.c index fa84bd8..23176c6 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1373,6 +1373,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
txn->commit_time = commit_time;
txn->origin_id = origin_id;
txn->origin_lsn = origin_lsn;
+ txn->xact_action = rb->xact_action;
+ memcpy(txn->gid, rb->gid, GIDSIZE);
/*
* If this transaction didn't have any real changes in our database, it's
@@ -1708,6 +1710,32 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
PG_END_TRY();
}
+
+/*
+ * Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED.
+ */
+void
+ReorderBufferCommitBareXact(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn,
+ true);
+
+ txn->final_lsn = commit_lsn;
+ txn->end_lsn = end_lsn;
+ txn->commit_time = commit_time;
+ txn->origin_id = origin_id;
+ txn->origin_lsn = origin_lsn;
+ txn->xact_action = rb->xact_action;
+ strcpy(txn->gid, rb->gid);
+
+ rb->commit(rb, txn, commit_lsn);
+}
+
/*
* Abort a transaction that possibly has previous changes. Needs to be first
* called for subtransactions and then for the toplevel xid.
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index b7ce0c6..1b8e7a0 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -15,6 +15,7 @@
#define TWOPHASE_H
#include "access/xlogdefs.h"
+#include "access/xact.h"
#include "datatype/timestamp.h"
#include "storage/lock.h"
@@ -46,6 +47,8 @@ extern bool StandbyTransactionIdIsPrepared(TransactionId xid);
extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p,
int *nxids_p);
+extern void ParsePrepareRecord(uint8 info, char *xlrec,
+ xl_xact_parsed_prepare *parsed);
extern void StandbyRecoverPreparedTransactions(bool overwriteOK);
extern void RecoverPreparedTransactions(void);
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index a123d2a..eb052f9 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -21,6 +21,10 @@
#include "storage/sinval.h"
#include "utils/datetime.h"
+/*
+ * Maximum size of Global Transaction ID.
+ */
+#define GIDSIZE 200
/*
* Xact isolation levels
@@ -224,7 +228,10 @@ typedef struct xl_xact_invals
typedef struct xl_xact_twophase
{
TransactionId xid;
+ uint8 gidlen;
+ char gid[GIDSIZE];
} xl_xact_twophase;
+#define MinSizeOfXactTwophase offsetof(xl_xact_twophase, gid)
typedef struct xl_xact_origin
{
@@ -283,13 +290,37 @@ typedef struct xl_xact_parsed_commit
SharedInvalidationMessage *msgs;
TransactionId twophase_xid; /* only for 2PC */
+ char twophase_gid[GIDSIZE];
XLogRecPtr origin_lsn;
TimestampTz origin_timestamp;
} xl_xact_parsed_commit;
+typedef struct xl_xact_parsed_prepare
+{
+ Oid dbId; /* MyDatabaseId */
+
+ int nsubxacts;
+ TransactionId *subxacts;
+
+ int ncommitrels;
+ RelFileNode *commitrels;
+
+ int nabortrels;
+ RelFileNode *abortrels;
+
+ int nmsgs;
+ SharedInvalidationMessage *msgs;
+
+ TransactionId twophase_xid;
+ char twophase_gid[GIDSIZE];
+} xl_xact_parsed_prepare;
+
typedef struct xl_xact_parsed_abort
{
+ Oid dbId;
+ Oid tsId;
+
TimestampTz xact_time;
uint32 xinfo;
@@ -300,6 +331,7 @@ typedef struct xl_xact_parsed_abort
RelFileNode *xnodes;
TransactionId twophase_xid; /* only for 2PC */
+ char twophase_gid[GIDSIZE];
} xl_xact_parsed_abort;
@@ -331,7 +363,7 @@ extern void CommitTransactionCommand(void);
extern void AbortCurrentTransaction(void);
extern void BeginTransactionBlock(void);
extern bool EndTransactionBlock(void);
-extern bool PrepareTransactionBlock(char *gid);
+extern bool PrepareTransactionBlock(const char *gid);
extern void UserAbortTransactionBlock(void);
extern void ReleaseSavepoint(List *options);
extern void DefineSavepoint(char *name);
@@ -364,12 +396,12 @@ extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
int nrels, RelFileNode *rels,
int nmsgs, SharedInvalidationMessage *msgs,
bool relcacheInval, bool forceSync,
- TransactionId twophase_xid);
+ TransactionId twophase_xid, const char *twophase_gid);
extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
int nsubxacts, TransactionId *subxacts,
int nrels, RelFileNode *rels,
- TransactionId twophase_xid);
+ TransactionId twophase_xid, const char *twophase_gid);
extern void xact_redo(XLogReaderState *record);
/* xactdesc.c */
diff --git a/src/include/replication/reorderbuffer.h \
b/src/include/replication/reorderbuffer.h index 9e209ae..13a2195 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
#define REORDERBUFFER_H
#include "access/htup_details.h"
+#include "access/twophase.h"
#include "lib/ilist.h"
#include "storage/sinval.h"
#include "utils/hsearch.h"
@@ -144,6 +145,14 @@ typedef struct ReorderBufferTXN
*/
TransactionId xid;
+ /*
+ * Commit callback is used for COMMIT/PREPARE/COMMMIT PREPARED,
+ * as well as abort for ROLLBACK and ROLLBACK PREPARED. Here
+ * stored actual xact action allowing decoding plugin to distinguish them.
+ */
+ uint8 xact_action;
+ char gid[GIDSIZE];
+
/* did the TX have catalog changes */
bool has_catalog_changes;
@@ -299,6 +308,10 @@ struct ReorderBuffer
*/
HTAB *by_txn;
+ /* For twophase tx support we need to pass XACT action to ReorderBufferTXN */
+ uint8 xact_action;
+ char gid[GIDSIZE];
+
/*
* Transactions that could be a toplevel xact, ordered by LSN of the first
* record bearing that xid.
@@ -375,6 +388,10 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, \
Snapshot snapshot void ReorderBufferCommit(ReorderBuffer *, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
+void ReorderBufferCommitBareXact(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn);
void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, \
XLogRecPtr commit_lsn); void ReorderBufferCommitChild(ReorderBuffer *, \
TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
--
Stas Kelvich
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic