[prev in list] [next in list] [prev in thread] [next in thread] 

List:       pgsql-hackers
Subject:    [Proposal] Add foreign-server health checks infrastructure
From:       "kuroda.hayato () fujitsu ! com" <kuroda ! hayato () fujitsu ! com>
Date:       2021-10-30 3:50:15
Message-ID: TYAPR01MB58662809E678253B90E82CE5F5889 () TYAPR01MB5866 ! jpnprd01 ! prod ! outlook ! com
[Download RAW message or body]

Dear Hackers,

I want to propose the feature that checks the health of foreign servers. 
As a first step I want to add an infrastructure for periodical checking to PostgreSQL \
core. Currently this is the WIP, it does not contain docs.

## Background 

Currently there is no way to check the status of an foreign server in PostgreSQL.
If an foreign server's postmaster goes down, or if the network between servers is \
lost, the backend process will only detect these when it uses the connection \
corresponding to that foreign server.

Consider a workload that updates data on an foreign server only at the beginning of a \
transaction, and runs a lot of local SQLs. Even if the network is disconnected \
immediately after accessing the foreign server, the backend process will continue to \
execute local SQLs without realizing it.

The process will eventually finish to execute SQLs and try to commit.
Only then will it realize that the foreign server cannot be connect and will abort \
the transaction. This situation should be detected as soon as possible
because it is impossible to commit a transaction when the foreign server goes down.
This can be more of a problem if you have system-wide downtime requirements.
That's why I want to implement the health-check feature to postgres.

## Design

In general, PostgreSQL can have a variety of RDBMSs as foreign servers,
so the core cannot support all of them.
Therefore, I propose a method to leave the monitoring of the foreign server to each \
FDW extensions and register it as a callback function on the body side.
The attached patch adds this monitoring infrastructure to core.
Within the callback functions, it is expected
that each FDWs will check the state of the connection they hold and call ereport \
(ERROR) if it cannot connect to someone.
Of course, you can also have the callback function return false.
There is no particular reason to choose the current method.
Callback functions will be called periodically.

## Implementation

This patch introduces a new timeout and a new GUC parameter. GUC controls the timeout \
interval. The timeout takes effect when the callback function is first registered,
before each SQL command is executed, and at the end of the timeout.
This implementation is based on the client_connection_check_interval and other \
timeouts.

## Further work

As the next step I have a plan to implement the callback function to postgres_fdw.
I already made a PoC, but it deeply depends on the following thread:
https://commitfest.postgresql.org/35/3098/

I also want you to review the postgres_fdw part,
but I think it should not be attached because cfbot cannot understand such a \
dependency and will throw build error. Do you know how to deal with them in this \
case?

Your comments and suggestions are very welcome.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED


["v01_add_checking_infrastracture.patch" (application/octet-stream)]

diff --git a/src/backend/commands/copyfromparse.c \
b/src/backend/commands/copyfromparse.c index aac10165ec..0c4d61a34c 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -269,6 +269,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int \
maxread)  
 			readmessage:
 					HOLD_CANCEL_INTERRUPTS();
+					HOLD_CHECKING_REMOTE_SERVERS_INTERRUPTS();
 					pq_startmsgread();
 					mtype = pq_getbyte();
 					if (mtype == EOF)
@@ -300,6 +301,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int \
maxread)  ereport(ERROR,
 								(errcode(ERRCODE_CONNECTION_FAILURE),
 								 errmsg("unexpected EOF on client connection with an open transaction")));
+					RESUME_CHECKING_REMOTE_SERVERS_INTERRUPTS();
 					RESUME_CANCEL_INTERRUPTS();
 					/* ... and process it */
 					switch (mtype)
diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c
index e07cc57431..eb9c4f3f05 100644
--- a/src/backend/foreign/foreign.c
+++ b/src/backend/foreign/foreign.c
@@ -26,7 +26,11 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
+#include "utils/timeout.h"
 
+/* for checking remote servers */
+int remote_servers_connection_check_interval = 0;
+static CheckingRemoteServersCallbackItem *fdw_callbacks = NULL;
 
 /*
  * GetForeignDataWrapper -	look up the foreign-data wrapper by OID.
@@ -836,3 +840,51 @@ GetExistingLocalJoinPath(RelOptInfo *joinrel)
 	}
 	return NULL;
 }
+
+/*
+ * Register callbacks for checking remote servers.
+ *
+ * This function is intended for use by FDW extensions.
+ * The checking timeout will be fired after registering the first callback.
+ */
+void
+RegisterCheckingRemoteServersCallback(CheckingRemoteServersCallback callback, void \
*arg) +{
+	CheckingRemoteServersCallbackItem *item;
+	/* should we start checking timeout? */
+	bool first_exec = HaveCheckingRemoteServersCallbacks();
+
+	item = (CheckingRemoteServersCallbackItem *)
+			MemoryContextAlloc(TopMemoryContext, sizeof(CheckingRemoteServersCallbackItem));
+	item->callback = callback;
+	item->arg = arg;
+	item->next = fdw_callbacks;
+	fdw_callbacks = item;
+
+	if (first_exec && remote_servers_connection_check_interval > 0)
+		enable_timeout_after(CHECKING_REMOTE_SERVERS_TIMEOUT, \
remote_servers_connection_check_interval); +}
+
+
+/*
+ * Call callbacks for checking remote servers.
+ *
+ * Note that this function will not return anything.
+ * Callback functions must throw ereport(ERROR) if disconnection has been detected.
+ */
+void
+CallCheckingRemoteServersCallbacks(void)
+{
+	CheckingRemoteServersCallbackItem *item;
+	for (item = fdw_callbacks; item; item = item->next)
+		item->callback(item->arg);
+}
+
+/*
+ * Check whether any callbacks has been registered.
+ */
+bool
+HaveCheckingRemoteServersCallbacks(void)
+{
+	return fdw_callbacks != NULL;
+}
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 89a5f901aa..31e39b6cbf 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -1159,7 +1159,6 @@ pq_startmsgread(void)
 		ereport(FATAL,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg("terminating connection because protocol synchronization was lost")));
-
 	PqCommReadingMsg = true;
 }
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 0775abe35d..d7890b5307 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -43,6 +43,7 @@
 #include "commands/async.h"
 #include "commands/prepare.h"
 #include "executor/spi.h"
+#include "foreign/foreign.h"
 #include "jit/jit.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
@@ -349,6 +350,7 @@ SocketBackend(StringInfo inBuf)
 	 * Get message type code from the frontend.
 	 */
 	HOLD_CANCEL_INTERRUPTS();
+	HOLD_CHECKING_REMOTE_SERVERS_INTERRUPTS();
 	pq_startmsgread();
 	qtype = pq_getbyte();
 
@@ -455,6 +457,7 @@ SocketBackend(StringInfo inBuf)
 	 */
 	if (pq_getmessage(inBuf, maxmsglen))
 		return EOF;				/* suitable message already logged */
+	RESUME_CHECKING_REMOTE_SERVERS_INTERRUPTS();
 	RESUME_CANCEL_INTERRUPTS();
 
 	return qtype;
@@ -2708,6 +2711,13 @@ start_xact_command(void)
 		!get_timeout_active(CLIENT_CONNECTION_CHECK_TIMEOUT))
 		enable_timeout_after(CLIENT_CONNECTION_CHECK_TIMEOUT,
 							 client_connection_check_interval);
+	if (remote_servers_connection_check_interval > 0 &&
+		IsUnderPostmaster &&
+		MyProcPort &&
+		HaveCheckingRemoteServersCallbacks() &&
+		!get_timeout_active(CHECKING_REMOTE_SERVERS_TIMEOUT))
+		enable_timeout_after(CHECKING_REMOTE_SERVERS_TIMEOUT,
+							 remote_servers_connection_check_interval);
 }
 
 static void
@@ -3213,6 +3223,24 @@ ProcessInterrupts(void)
 		}
 	}
 
+	if (CheckingRemoteServersTimeoutPending && CheckingRemoteServersHoldoffCount != 0)
+	{
+		/*
+		 * Skip checking foreign servers while reading messages.
+		 */
+		InterruptPending = true;
+	}
+	else if (CheckingRemoteServersTimeoutPending)
+	{
+		CheckingRemoteServersTimeoutPending = false;
+
+		CallCheckingRemoteServersCallbacks();
+
+		if (remote_servers_connection_check_interval > 0)
+			enable_timeout_after(CHECKING_REMOTE_SERVERS_TIMEOUT,
+									remote_servers_connection_check_interval);
+	}
+
 	if (ClientConnectionLost)
 	{
 		QueryCancelPending = false; /* lost connection trumps QueryCancel */
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 381d9e548d..15b0c2727b 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -36,9 +36,11 @@ volatile sig_atomic_t IdleInTransactionSessionTimeoutPending = \
false;  volatile sig_atomic_t IdleSessionTimeoutPending = false;
 volatile sig_atomic_t ProcSignalBarrierPending = false;
 volatile sig_atomic_t LogMemoryContextPending = false;
+volatile sig_atomic_t CheckingRemoteServersTimeoutPending = false;
 volatile uint32 InterruptHoldoffCount = 0;
 volatile uint32 QueryCancelHoldoffCount = 0;
 volatile uint32 CritSectionCount = 0;
+volatile uint32 CheckingRemoteServersHoldoffCount = 0;
 
 int			MyProcPid;
 pg_time_t	MyStartTime;
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 78bc64671e..fcf5d24248 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -73,6 +73,7 @@ static void LockTimeoutHandler(void);
 static void IdleInTransactionSessionTimeoutHandler(void);
 static void IdleSessionTimeoutHandler(void);
 static void ClientCheckTimeoutHandler(void);
+static void CheckingRemoteServersTimeoutHandler(void);
 static bool ThereIsAtLeastOneRole(void);
 static void process_startup_options(Port *port, bool am_superuser);
 static void process_settings(Oid databaseid, Oid roleid);
@@ -615,6 +616,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char \
*username,  IdleInTransactionSessionTimeoutHandler);
 		RegisterTimeout(IDLE_SESSION_TIMEOUT, IdleSessionTimeoutHandler);
 		RegisterTimeout(CLIENT_CONNECTION_CHECK_TIMEOUT, ClientCheckTimeoutHandler);
+		RegisterTimeout(CHECKING_REMOTE_SERVERS_TIMEOUT, \
CheckingRemoteServersTimeoutHandler);  }
 
 	/*
@@ -1245,6 +1247,14 @@ ClientCheckTimeoutHandler(void)
 	SetLatch(MyLatch);
 }
 
+static void
+CheckingRemoteServersTimeoutHandler(void)
+{
+	CheckingRemoteServersTimeoutPending = true;
+	InterruptPending = true;
+	SetLatch(MyLatch);
+}
+
 /*
  * Returns true if at least one role is defined in this database cluster.
  */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index e91d5a3cfd..24172de7a9 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -51,6 +51,7 @@
 #include "commands/vacuum.h"
 #include "commands/variable.h"
 #include "common/string.h"
+#include "foreign/foreign.h"
 #include "funcapi.h"
 #include "jit/jit.h"
 #include "libpq/auth.h"
@@ -105,6 +106,7 @@
 #include "utils/queryjumble.h"
 #include "utils/rls.h"
 #include "utils/snapmgr.h"
+#include "utils/timeout.h"
 #include "utils/tzparser.h"
 #include "utils/inval.h"
 #include "utils/varlena.h"
@@ -3583,6 +3585,17 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+
+	{
+		{"remote_servers_connection_check_interval", PGC_USERSET, CONN_AUTH_SETTINGS,
+			gettext_noop("Sets the time interval between checks for disconnection of remote \
servers."), +			NULL,
+			GUC_UNIT_MS
+		},
+		&remote_servers_connection_check_interval,
+		0, 0, INT_MAX,
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample \
b/src/backend/utils/misc/postgresql.conf.sample index 1cbc9feeb6..9b70761d9c 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -90,6 +90,10 @@
 					# disconnection while running queries;
 					# 0 for never
 
+#remote_servers_connection_check_interval = 0	# time between time between checks for
+					# foreign server disconnection;
+					# 0 for never
+
 # - Authentication -
 
 #authentication_timeout = 1min		# 1s-600s
diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h
index 8169eb76b1..98f59c1153 100644
--- a/src/include/foreign/foreign.h
+++ b/src/include/foreign/foreign.h
@@ -81,4 +81,20 @@ extern List *GetForeignColumnOptions(Oid relid, AttrNumber \
attnum);  extern Oid	get_foreign_data_wrapper_oid(const char *fdwname, bool \
missing_ok);  extern Oid	get_foreign_server_oid(const char *servername, bool \
missing_ok);  
+
+/* functions and variables for fdw checking. */
+typedef void (*CheckingRemoteServersCallback) (void *arg);
+typedef struct CheckingRemoteServersCallbackItem
+{
+	struct CheckingRemoteServersCallbackItem *next;
+	CheckingRemoteServersCallback callback;
+	void		*arg;
+} CheckingRemoteServersCallbackItem;
+
+extern void RegisterCheckingRemoteServersCallback(CheckingRemoteServersCallback \
callback, void *arg); +extern void CallCheckingRemoteServersCallbacks(void);
+extern bool HaveCheckingRemoteServersCallbacks(void);
+
+extern int remote_servers_connection_check_interval;
+
 #endif							/* FOREIGN_H */
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 90a3016065..377fc68aaa 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -98,10 +98,13 @@ extern PGDLLIMPORT volatile sig_atomic_t LogMemoryContextPending;
 extern PGDLLIMPORT volatile sig_atomic_t CheckClientConnectionPending;
 extern PGDLLIMPORT volatile sig_atomic_t ClientConnectionLost;
 
+extern PGDLLIMPORT volatile sig_atomic_t CheckingRemoteServersTimeoutPending;
+
 /* these are marked volatile because they are examined by signal handlers: */
 extern PGDLLIMPORT volatile uint32 InterruptHoldoffCount;
 extern PGDLLIMPORT volatile uint32 QueryCancelHoldoffCount;
 extern PGDLLIMPORT volatile uint32 CritSectionCount;
+extern PGDLLIMPORT volatile uint32 CheckingRemoteServersHoldoffCount;
 
 /* in tcop/postgres.c */
 extern void ProcessInterrupts(void);
@@ -126,7 +129,7 @@ do { \
 /* Is ProcessInterrupts() guaranteed to clear InterruptPending? */
 #define INTERRUPTS_CAN_BE_PROCESSED() \
 	(InterruptHoldoffCount == 0 && CritSectionCount == 0 && \
-	 QueryCancelHoldoffCount == 0)
+	 QueryCancelHoldoffCount == 0 && CheckingRemoteServersHoldoffCount == 0)
 
 #define HOLD_INTERRUPTS()  (InterruptHoldoffCount++)
 
@@ -152,6 +155,13 @@ do { \
 	CritSectionCount--; \
 } while(0)
 
+#define HOLD_CHECKING_REMOTE_SERVERS_INTERRUPTS()  \
(CheckingRemoteServersHoldoffCount++) +
+#define RESUME_CHECKING_REMOTE_SERVERS_INTERRUPTS() \
+do { \
+	Assert(CheckingRemoteServersHoldoffCount > 0); \
+	CheckingRemoteServersHoldoffCount--; \
+} while(0)
 
 /*****************************************************************************
  *	  globals.h --															 *
diff --git a/src/include/utils/timeout.h b/src/include/utils/timeout.h
index 2cbc5de4d9..ceb6b1c12c 100644
--- a/src/include/utils/timeout.h
+++ b/src/include/utils/timeout.h
@@ -34,6 +34,7 @@ typedef enum TimeoutId
 	IDLE_SESSION_TIMEOUT,
 	CLIENT_CONNECTION_CHECK_TIMEOUT,
 	STARTUP_PROGRESS_TIMEOUT,
+	CHECKING_REMOTE_SERVERS_TIMEOUT,
 	/* First user-definable timeout reason */
 	USER_TIMEOUT,
 	/* Maximum number of timeout reasons */



[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic