[prev in list] [next in list] [prev in thread] [next in thread] 

List:       postgresql-general
Subject:    Re: [HACKERS] TODO : Allow parallel cores to be used by vacuumdb [ WIP ]
From:       Amit Kapila <amit.kapila16 () gmail ! com>
Date:       2014-12-31 13:17:38
Message-ID: CAA4eK1Jp+u2+46wE8Nmg_gWiO_b7_L1afAwS9awgRkE=DfW_nA () mail ! gmail ! com
[Download RAW message or body]

[Attachment #2 (multipart/alternative)]


On Mon, Dec 29, 2014 at 11:10 AM, Dilip kumar <dilip.kumar@huawei.com>
wrote:
>
> On 29 December 2014 10:22 Amit Kapila Wrote,
>
>
> I think nothing more to be handled from my side, you can go ahead with
review..
>

The patch looks good to me.  I have done couple of
cosmetic changes (spelling mistakes, improve some comments,
etc.), check the same once and if you are okay, we can move
ahead.



With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

[Attachment #5 (text/html)]

<div dir="ltr">On Mon, Dec 29, 2014 at 11:10 AM, Dilip kumar &lt;<a \
href="mailto:dilip.kumar@huawei.com">dilip.kumar@huawei.com</a>&gt; \
wrote:<br>&gt;<br>&gt; On 29 December 2014 10:22 Amit Kapila \
Wrote,<br>&gt;<br>&gt;<br>&gt; I think nothing more to be handled from my side, you \
can go ahead with review..<br>&gt;<br><br><div>The patch looks good to me.   I have \
done couple of</div><div style>cosmetic changes (spelling mistakes, improve some \
comments,</div><div style>etc.), check the same once and if you are okay, we can \
move</div><div style>ahead.  </div><div><br><div><br clear="all"><div><div \
class="gmail_signature"><br>With Regards,<br>Amit Kapila.<br>EnterpriseDB:  <a \
href="http://www.enterprisedb.com/" \
target="_blank">http://www.enterprisedb.com</a></div></div></div></div></div>


["vacuumdb_parallel_v21.patch" (application/octet-stream)]

diff --git a/doc/src/sgml/ref/vacuumdb.sgml b/doc/src/sgml/ref/vacuumdb.sgml
index 3ecd999..e4a971f 100644
--- a/doc/src/sgml/ref/vacuumdb.sgml
+++ b/doc/src/sgml/ref/vacuumdb.sgml
@@ -204,6 +204,27 @@ PostgreSQL documentation
      </varlistentry>
 
      <varlistentry>
+      <term><option>-j <replaceable class="parameter">jobs</replaceable></option></term>
+      <term><option>--jobs=<replaceable class="parameter">njobs</replaceable></option></term>
+      <listitem>
+       <para>
+        Number of concurrent connections to perform the operation.
+        This option will enable the vacuum operation to run on asynchronous
+        connections, at a time one table will be operated on one connection.
+        So at one time as many tables will be vacuumed parallely as number of
+        jobs.  If number of jobs given are more than number of tables then
+        number of jobs will be set to number of tables.
+       </para>
+       <para>
+        <application>vacuumdb</application> will open
+        <replaceable class="parameter"> njobs</replaceable> connections to the
+        database, so make sure your <xref linkend="guc-max-connections">
+        setting is high enough to accommodate all connections.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><option>--analyze-in-stages</option></term>
       <listitem>
        <para>
diff --git a/src/bin/scripts/common.c b/src/bin/scripts/common.c
index 311fed5..2220a4e 100644
--- a/src/bin/scripts/common.c
+++ b/src/bin/scripts/common.c
@@ -19,10 +19,9 @@
 
 #include "common.h"
 
-static void SetCancelConn(PGconn *conn);
-static void ResetCancelConn(void);
 
 static PGcancel *volatile cancelConn = NULL;
+static bool inAbort = false;
 
 #ifdef WIN32
 static CRITICAL_SECTION cancelConnLock;
@@ -291,7 +290,7 @@ yesno_prompt(const char *question)
  *
  * Set cancelConn to point to the current database connection.
  */
-static void
+void
 SetCancelConn(PGconn *conn)
 {
 	PGcancel   *oldCancelConn;
@@ -321,7 +320,7 @@ SetCancelConn(PGconn *conn)
  *
  * Free the current cancel connection, if any, and set to NULL.
  */
-static void
+void
 ResetCancelConn(void)
 {
 	PGcancel   *oldCancelConn;
@@ -359,10 +358,15 @@ handle_sigint(SIGNAL_ARGS)
 	if (cancelConn != NULL)
 	{
 		if (PQcancel(cancelConn, errbuf, sizeof(errbuf)))
+		{
+			inAbort = true;
 			fprintf(stderr, _("Cancel request sent\n"));
+		}
 		else
 			fprintf(stderr, _("Could not send cancel request: %s"), errbuf);
 	}
+	else
+		inAbort = true;
 
 	errno = save_errno;			/* just in case the write changed it */
 }
@@ -392,10 +396,16 @@ consoleHandler(DWORD dwCtrlType)
 		if (cancelConn != NULL)
 		{
 			if (PQcancel(cancelConn, errbuf, sizeof(errbuf)))
+			{
 				fprintf(stderr, _("Cancel request sent\n"));
+				inAbort = true;
+			}
 			else
 				fprintf(stderr, _("Could not send cancel request: %s"), errbuf);
 		}
+		else
+			inAbort = true;
+
 		LeaveCriticalSection(&cancelConnLock);
 
 		return TRUE;
@@ -414,3 +424,8 @@ setup_cancel_handler(void)
 }
 
 #endif   /* WIN32 */
+
+bool in_abort()
+{
+	return inAbort;
+}
diff --git a/src/bin/scripts/common.h b/src/bin/scripts/common.h
index 691f6c6..3bafde3 100644
--- a/src/bin/scripts/common.h
+++ b/src/bin/scripts/common.h
@@ -49,4 +49,9 @@ extern bool yesno_prompt(const char *question);
 
 extern void setup_cancel_handler(void);
 
+extern void SetCancelConn(PGconn *conn);
+extern void ResetCancelConn(void);
+extern bool in_abort(void);
+
+
 #endif   /* COMMON_H */
diff --git a/src/bin/scripts/vacuumdb.c b/src/bin/scripts/vacuumdb.c
index 86e6ab3..05cd74f 100644
--- a/src/bin/scripts/vacuumdb.c
+++ b/src/bin/scripts/vacuumdb.c
@@ -14,6 +14,18 @@
 #include "common.h"
 #include "dumputils.h"
 
+#define NO_SLOT (-1)
+
+/* Arguments needed for a worker process */
+typedef struct ParallelSlot
+{
+	PGconn *connection;
+	bool isFree;
+	pgsocket sock;
+} ParallelSlot;
+
+#define ERRCODE_UNDEFINED_TABLE  "42P01"
+#define ERROR_IN_ABORT				-2
 
 static void vacuum_one_database(const char *dbname, bool full, bool verbose,
 	bool and_analyze, bool analyze_only, bool analyze_in_stages, int stage, bool freeze,
@@ -25,10 +37,40 @@ static void vacuum_all_databases(bool full, bool verbose, bool and_analyze,
 					 const char *maintenance_db,
 					 const char *host, const char *port,
 					 const char *username, enum trivalue prompt_password,
-					 const char *progname, bool echo, bool quiet);
+					 const char *progname, bool echo, bool quiet,
+					 int concurrentCons);
 
 static void help(const char *progname);
 
+void vacuum_parallel(const char *dbname, bool full, bool verbose,
+					 bool and_analyze, bool analyze_only,
+					 bool analyze_in_stages, int stage, bool freeze,
+					 const char *host, const char *port,
+					 const char *username, enum trivalue prompt_password,
+					 const char *progname, bool echo, int concurrentCons,
+					 SimpleStringList *tables, bool quiet);
+
+
+void prepare_command(PGconn *conn, bool full, bool verbose, bool and_analyze,
+					 bool analyze_only, bool freeze, PQExpBuffer sql);
+static void
+run_parallel_vacuum(bool echo, const char *dbname, SimpleStringList *tables,
+					bool full, bool verbose, bool and_analyze,
+					bool analyze_only, bool freeze, int concurrentCons,
+					const char *progname, int analyze_stage,
+					ParallelSlot *connSlot, bool completedb);
+static int
+GetIdleSlot(ParallelSlot *pSlot, int max_slot, const char *dbname,
+			const char *progname, bool completedb);
+
+static bool GetQueryResult(PGconn *conn, const char *dbname,
+						   const char *progname, bool completedb);
+
+static int
+select_loop(int maxFd, fd_set *workerset);
+
+static void DisconnectDatabase(ParallelSlot *slot);
+
 
 int
 main(int argc, char *argv[])
@@ -49,6 +91,7 @@ main(int argc, char *argv[])
 		{"table", required_argument, NULL, 't'},
 		{"full", no_argument, NULL, 'f'},
 		{"verbose", no_argument, NULL, 'v'},
+		{"jobs", required_argument, NULL, 'j'},
 		{"maintenance-db", required_argument, NULL, 2},
 		{"analyze-in-stages", no_argument, NULL, 3},
 		{NULL, 0, NULL, 0}
@@ -74,13 +117,15 @@ main(int argc, char *argv[])
 	bool		full = false;
 	bool		verbose = false;
 	SimpleStringList tables = {NULL, NULL};
+	int concurrentCons = 0;
+	int tbl_count = 0;
 
 	progname = get_progname(argv[0]);
 	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
 
 	handle_help_version_opts(argc, argv, "vacuumdb", help);
 
-	while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fv", long_options, &optindex)) != -1)
+	while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fv:j:", long_options, &optindex)) != -1)
 	{
 		switch (c)
 		{
@@ -121,14 +166,27 @@ main(int argc, char *argv[])
 				alldb = true;
 				break;
 			case 't':
+			{
 				simple_string_list_append(&tables, optarg);
+				tbl_count++;
 				break;
+			}
 			case 'f':
 				full = true;
 				break;
 			case 'v':
 				verbose = true;
 				break;
+			case 'j':
+				concurrentCons = atoi(optarg);
+				if (concurrentCons <= 0)
+				{
+					fprintf(stderr, _("%s: Number of parallel \"jobs\" should be at least 1\n"),
+							progname);
+					exit(1);
+				}
+
+				break;
 			case 2:
 				maintenance_db = pg_strdup(optarg);
 				break;
@@ -141,6 +199,7 @@ main(int argc, char *argv[])
 		}
 	}
 
+	optind++;
 
 	/*
 	 * Non-option argument specifies database name as long as it wasn't
@@ -179,6 +238,10 @@ main(int argc, char *argv[])
 
 	setup_cancel_handler();
 
+	/* Avoid opening extra connections. */
+	if (tbl_count && (concurrentCons > tbl_count))
+		concurrentCons = tbl_count;
+
 	if (alldb)
 	{
 		if (dbname)
@@ -196,7 +259,7 @@ main(int argc, char *argv[])
 
 		vacuum_all_databases(full, verbose, and_analyze, analyze_only, analyze_in_stages, freeze,
 							 maintenance_db, host, port, username,
-							 prompt_password, progname, echo, quiet);
+							 prompt_password, progname, echo, quiet, concurrentCons);
 	}
 	else
 	{
@@ -210,25 +273,36 @@ main(int argc, char *argv[])
 				dbname = get_user_name_or_exit(progname);
 		}
 
-		if (tables.head != NULL)
+		if (concurrentCons > 1)
 		{
-			SimpleStringListCell *cell;
+			vacuum_parallel(dbname, full, verbose, and_analyze,
+							analyze_only, analyze_in_stages, -1,
+							freeze, host, port, username, prompt_password,
+							progname, echo, concurrentCons, &tables, quiet);
 
-			for (cell = tables.head; cell; cell = cell->next)
+		}
+		else
+		{
+			if (tables.head != NULL)
 			{
+				SimpleStringListCell *cell;
+
+				for (cell = tables.head; cell; cell = cell->next)
+				{
+					vacuum_one_database(dbname, full, verbose, and_analyze,
+										analyze_only, analyze_in_stages, -1,
+										freeze, cell->val,
+										host, port, username, prompt_password,
+										progname, echo, quiet);
+				}
+			}
+			else
 				vacuum_one_database(dbname, full, verbose, and_analyze,
 									analyze_only, analyze_in_stages, -1,
-									freeze, cell->val,
+									freeze, NULL,
 									host, port, username, prompt_password,
 									progname, echo, quiet);
-			}
 		}
-		else
-			vacuum_one_database(dbname, full, verbose, and_analyze,
-								analyze_only, analyze_in_stages, -1,
-								freeze, NULL,
-								host, port, username, prompt_password,
-								progname, echo, quiet);
 	}
 
 	exit(0);
@@ -268,56 +342,9 @@ vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyz
 	conn = connectDatabase(dbname, host, port, username, prompt_password,
 						   progname, false);
 
-	if (analyze_only)
-	{
-		appendPQExpBufferStr(&sql, "ANALYZE");
-		if (verbose)
-			appendPQExpBufferStr(&sql, " VERBOSE");
-	}
-	else
-	{
-		appendPQExpBufferStr(&sql, "VACUUM");
-		if (PQserverVersion(conn) >= 90000)
-		{
-			const char *paren = " (";
-			const char *comma = ", ";
-			const char *sep = paren;
+	prepare_command(conn, full, verbose,
+					and_analyze, analyze_only, freeze, &sql);
 
-			if (full)
-			{
-				appendPQExpBuffer(&sql, "%sFULL", sep);
-				sep = comma;
-			}
-			if (freeze)
-			{
-				appendPQExpBuffer(&sql, "%sFREEZE", sep);
-				sep = comma;
-			}
-			if (verbose)
-			{
-				appendPQExpBuffer(&sql, "%sVERBOSE", sep);
-				sep = comma;
-			}
-			if (and_analyze)
-			{
-				appendPQExpBuffer(&sql, "%sANALYZE", sep);
-				sep = comma;
-			}
-			if (sep != paren)
-				appendPQExpBufferStr(&sql, ")");
-		}
-		else
-		{
-			if (full)
-				appendPQExpBufferStr(&sql, " FULL");
-			if (freeze)
-				appendPQExpBufferStr(&sql, " FREEZE");
-			if (verbose)
-				appendPQExpBufferStr(&sql, " VERBOSE");
-			if (and_analyze)
-				appendPQExpBufferStr(&sql, " ANALYZE");
-		}
-	}
 	if (table)
 		appendPQExpBuffer(&sql, " %s", table);
 	appendPQExpBufferStr(&sql, ";");
@@ -353,8 +380,10 @@ vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyz
 		}
 		else
 		{
-			/* Otherwise, we got a stage from vacuum_all_databases(), so run
-			 * only that one. */
+			/*
+			 * Otherwise, we got a stage from vacuum_all_databases(), so run
+			 * only that one.
+			 */
 			if (!quiet)
 			{
 				puts(gettext(stage_messages[stage]));
@@ -374,11 +403,12 @@ vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyz
 
 
 static void
-vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_only,
-			 bool analyze_in_stages, bool freeze, const char *maintenance_db,
-					 const char *host, const char *port,
-					 const char *username, enum trivalue prompt_password,
-					 const char *progname, bool echo, bool quiet)
+vacuum_all_databases(bool full, bool verbose, bool and_analyze,
+					bool analyze_only, bool analyze_in_stages, bool freeze,
+					const char *maintenance_db, const char *host,
+					const char *port, const char *username,
+					enum trivalue prompt_password, const char *progname,
+					bool echo, bool quiet, int concurrentCons)
 {
 	PGconn	   *conn;
 	PGresult   *result;
@@ -390,7 +420,8 @@ vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_onl
 	PQfinish(conn);
 
 	/* If analyzing in stages, then run through all stages.  Otherwise just
-	 * run once, passing -1 as the stage. */
+	 * run once, passing -1 as the stage.
+	 */
 	for (stage = (analyze_in_stages ? 0 : -1);
 		 stage < (analyze_in_stages ? 3 : 0);
 		 stage++)
@@ -407,6 +438,15 @@ vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_onl
 				fflush(stdout);
 			}
 
+		if (concurrentCons > 1)
+		{
+			vacuum_parallel(dbname, full, verbose, and_analyze,
+								analyze_only, analyze_in_stages, stage,
+								freeze, host, port, username, prompt_password,
+								progname, echo, concurrentCons, NULL, quiet);
+
+		}
+		else
 			vacuum_one_database(dbname, full, verbose, and_analyze, analyze_only,
 								analyze_in_stages, stage,
 							freeze, NULL, host, port, username, prompt_password,
@@ -417,6 +457,507 @@ vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_onl
 	PQclear(result);
 }
 
+/*
+ * run_parallel_vacuum
+ * This function does the actual work for sending the jobs
+ * concurrently to server.
+ */
+static void
+run_parallel_vacuum(bool echo, const char *dbname, SimpleStringList *tables,
+					bool full, bool verbose, bool and_analyze,
+					bool analyze_only, bool freeze, int concurrentCons,
+					const char *progname, int analyze_stage,
+					ParallelSlot *connSlot, bool completedb)
+{
+	PQExpBufferData sql;
+	SimpleStringListCell *cell;
+	int max_slot = concurrentCons;
+	int i;
+	int free_slot;
+	PGconn *slotconn;
+	bool  error = false;
+	const char *stage_commands[] = {
+			"SET default_statistics_target=1; SET vacuum_cost_delay=0;",
+			"SET default_statistics_target=10; RESET vacuum_cost_delay;",
+			"RESET default_statistics_target;"};
+
+	initPQExpBuffer(&sql);
+
+	if (analyze_stage >= 0)
+	{
+		for (i = 0; i < max_slot; i++)
+		{
+			executeCommand(connSlot[i].connection,
+					stage_commands[analyze_stage], progname, echo);
+		}
+	}
+
+	for (cell = tables->head; cell; cell = cell->next)
+	{
+		if (in_abort())
+		{
+			error = true;
+			goto fail;
+		}
+
+		/*
+		 * This will give the free connection slot, if no slot is free it will
+		 * wait for atleast one slot to get free.
+		 */
+		free_slot = GetIdleSlot(connSlot, max_slot, dbname, progname,
+								completedb);
+		if (free_slot == NO_SLOT)
+		{
+			error = true;
+			goto fail;
+		}
+
+		prepare_command(connSlot[free_slot].connection, full, verbose,
+					and_analyze, analyze_only, freeze, &sql);
+
+		appendPQExpBuffer(&sql, " %s", cell->val);
+		appendPQExpBufferStr(&sql, ";");
+
+		connSlot[free_slot].isFree = false;
+
+		slotconn = connSlot[free_slot].connection;
+		PQsendQuery(slotconn, sql.data);
+
+		resetPQExpBuffer(&sql);
+	}
+
+	for (i = 0; i < max_slot; i++)
+	{
+		/* wait for all connection to return the results*/
+		if (!GetQueryResult(connSlot[i].connection, dbname, progname,
+							completedb))
+		{
+			error = true;
+			goto fail;
+		}
+
+		connSlot[i].isFree = true;
+	}
+
+fail:
+
+	termPQExpBuffer(&sql);
+
+	if (error)
+	{
+		for (i = 0; i < max_slot; i++)
+		{
+			DisconnectDatabase(&connSlot[i]);
+		}
+
+		pfree(connSlot);
+
+		exit(1);
+	}
+}
+
+/*
+ * GetIdleSlot
+ * Process the slot list, if any free slot is available then return
+ * the slotid else perform the select on all the socket's and wait
+ * until atleast one slot becomes available.
+ */
+static int
+GetIdleSlot(ParallelSlot *pSlot, int max_slot, const char *dbname,
+			const char *progname, bool completedb)
+{
+	int		i;
+	fd_set	slotset;
+	int 	firstFree = -1;
+	pgsocket maxFd;
+
+	for (i = 0; i < max_slot; i++)
+		if (pSlot[i].isFree)
+			return i;
+
+	FD_ZERO(&slotset);
+
+	maxFd = pSlot[0].sock;
+
+	for (i = 0; i < max_slot; i++)
+	{
+		FD_SET(pSlot[i].sock, &slotset);
+		if (pSlot[i].sock > maxFd)
+			maxFd = pSlot[i].sock;
+	}
+
+	/*
+	 * No free slot found, so wait untill one of the connections
+	 * has finished it's task and return the available slot.
+	 */
+	do
+	{
+		SetCancelConn(pSlot[0].connection);
+
+		i = select_loop(maxFd, &slotset);
+
+		ResetCancelConn();
+
+		if (i == ERROR_IN_ABORT)
+		{
+			/*
+			 * This can only happen if user has sent the cancel request using
+			 * Ctrl+C, cancel is handled by 0th slot, so fetch the error result.
+			 */
+			GetQueryResult(pSlot[0].connection, dbname, progname,
+						   completedb);
+			return NO_SLOT;
+		}
+
+		Assert(i != 0);
+
+		for (i = 0; i < max_slot; i++)
+		{
+			if (!FD_ISSET(pSlot[i].sock, &slotset))
+				continue;
+
+			PQconsumeInput(pSlot[i].connection);
+			if (PQisBusy(pSlot[i].connection))
+				continue;
+
+			pSlot[i].isFree = true;
+
+			if (!GetQueryResult(pSlot[i].connection, dbname, progname,
+								completedb))
+				return NO_SLOT;
+
+			if (firstFree < 0)
+				firstFree = i;
+		}
+	}while(firstFree < 0);
+
+	return firstFree;
+}
+
+/*
+ * GetQueryResult
+ * Process the query result.
+ */
+static bool GetQueryResult(PGconn *conn, const char *dbname,
+						   const char *progname, bool completedb)
+{
+	PGresult    *result = NULL;
+	PGresult    *lastResult = NULL;
+	bool r;
+
+
+	SetCancelConn(conn);
+	while((result = PQgetResult(conn)) != NULL)
+	{
+		PQclear(lastResult);
+		lastResult = result;
+	}
+
+	ResetCancelConn();
+
+	if (!lastResult)
+		return true;
+
+	r = (PQresultStatus(lastResult) == PGRES_COMMAND_OK);
+
+	/*
+	 * If user has given the vacuum of complete db and vacuum for
+	 * any of the object is failed, it can be ignored and vacuuming
+	 * of other objects can be continued, this is the same behavior as
+	 * vacuuming of complete db is handled without --jobs option
+	 */
+	if (!r)
+	{
+		char *sqlState = PQresultErrorField(lastResult, PG_DIAG_SQLSTATE);
+
+		if (!completedb ||
+		   (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0))
+		{
+
+			fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
+					progname, dbname, PQerrorMessage(conn));
+
+			PQclear(lastResult);
+			return false;
+		}
+	}
+
+	PQclear(lastResult);
+	return true;
+}
+
+/*
+ * vacuum_parallel
+ * This function will open multiple connections to perform the
+ * vacuum on table's concurrently. Incase vacuum needs to be performed
+ * on database, it retrieve's the list of tables and then perform
+ * vacuum.
+ */
+void
+vacuum_parallel(const char *dbname, bool full, bool verbose,
+			bool and_analyze, bool analyze_only, bool analyze_in_stages,
+			int stage, bool freeze, const char *host, const char *port,
+			const char *username, enum trivalue prompt_password,
+			const char *progname, bool echo, int concurrentCons,
+			SimpleStringList *tables, bool quiet)
+{
+
+	PGconn	   *conn;
+	int i;
+	ParallelSlot *connSlot;
+	SimpleStringList dbtables = {NULL, NULL};
+	bool	completeDb = false;
+
+	conn = connectDatabase(dbname, host, port, username,
+					  prompt_password, progname, false);
+
+	/*
+	 * if table list is not provided then we need to do vaccum for whole DB
+	 * get the list of all tables and prepare the list
+	 */
+	if (!tables || !tables->head)
+	{
+		PGresult *res;
+		int ntuple;
+		int i;
+		PQExpBufferData sql;
+
+		initPQExpBuffer(&sql);
+
+		res = executeQuery(conn,
+				"SELECT c.relname, ns.nspname FROM pg_class c, pg_namespace ns"
+				" WHERE (relkind = \'r\' or relkind = \'m\')"
+				" and c.relnamespace = ns.oid ORDER BY c.relpages desc",
+				progname, echo);
+
+		ntuple = PQntuples(res);
+		for (i = 0; i < ntuple; i++)
+		{
+			appendPQExpBuffer(&sql, "%s",
+							fmtQualifiedId(PQserverVersion(conn),
+							PQgetvalue(res, i, 1),
+							PQgetvalue(res, i, 0)));
+
+			simple_string_list_append(&dbtables, sql.data);
+			resetPQExpBuffer(&sql);
+		}
+
+		termPQExpBuffer(&sql);
+		tables = &dbtables;
+
+		/* remember that we are vaccuming full database. */
+		completeDb = true;
+
+		if (concurrentCons > ntuple)
+			concurrentCons = ntuple;
+	}
+
+	connSlot = (ParallelSlot*)pg_malloc(concurrentCons * sizeof(ParallelSlot));
+	connSlot[0].connection = conn;
+	connSlot[0].sock = PQsocket(conn);
+
+	PQsetnonblocking(connSlot[0].connection, 1);
+
+	for (i = 1; i < concurrentCons; i++)
+	{
+		connSlot[i].connection = connectDatabase(dbname, host, port, username,
+								  prompt_password, progname, false);
+
+		PQsetnonblocking(connSlot[i].connection, 1);
+		connSlot[i].isFree = true;
+		connSlot[i].sock = PQsocket(connSlot[i].connection);
+	}
+
+	if (analyze_in_stages)
+	{
+		const char *stage_messages[] = {
+			gettext_noop("Generating minimal optimizer statistics (1 target)"),
+			gettext_noop("Generating medium optimizer statistics (10 targets)"),
+			gettext_noop("Generating default (full) optimizer statistics")
+			};
+
+		if (stage == -1)
+		{
+			int			i;
+			for (i = 0; i < 3; i++)
+			{
+				if (!quiet)
+				{
+					puts(gettext(stage_messages[i]));
+					fflush(stdout);
+				}
+
+				run_parallel_vacuum(echo, dbname, tables, full, verbose,
+									and_analyze, analyze_only, freeze,
+									concurrentCons, progname, i, connSlot,
+									completeDb);
+			}
+		}
+		else
+		{
+			/*
+			 * Otherwise, we got a stage from vacuum_all_databases(), so run
+			 * only that one.
+			 */
+			if (!quiet)
+			{
+				puts(gettext(stage_messages[stage]));
+				fflush(stdout);
+			}
+
+			run_parallel_vacuum(echo, dbname, tables, full, verbose,
+								and_analyze, analyze_only, freeze,
+								concurrentCons, progname, stage,
+								connSlot, completeDb);
+		}
+	}
+	else
+		run_parallel_vacuum(echo, dbname, tables, full, verbose,
+							and_analyze, analyze_only, freeze,
+							concurrentCons, progname, -1, connSlot,
+							completeDb);
+
+	for (i = 0; i < concurrentCons; i++)
+		 PQfinish(connSlot[i].connection);
+
+	pfree(connSlot);
+}
+
+/*
+ * A select loop that repeats calling select until a descriptor in the read
+ * set becomes readable. On Windows we have to check for the termination event
+ * from time to time, on Unix we can just block forever.
+ */
+static int
+select_loop(int maxFd, fd_set *workerset)
+{
+	int			i;
+	fd_set		saveSet = *workerset;
+
+#ifdef WIN32
+	/* should always be the master */
+	for (;;)
+	{
+		/*
+		 * sleep a quarter of a second before checking if we should terminate.
+		 */
+		struct timeval tv = {0, 250000};
+
+		*workerset = saveSet;
+		i = select(maxFd + 1, workerset, NULL, NULL, &tv);
+		if (in_abort())
+		{
+			return ERROR_IN_ABORT;
+		}
+
+		if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
+			continue;
+		if (i)
+			break;
+	}
+#else							/* UNIX */
+
+	for (;;)
+	{
+		*workerset = saveSet;
+		i = select(maxFd + 1, workerset, NULL, NULL, NULL);
+		if (in_abort())
+		{
+			return -1;
+		}
+
+		if (i < 0 && errno == EINTR)
+			continue;
+		break;
+	}
+#endif
+
+	return i;
+}
+
+/*
+ * DisconnectDatabase
+ * disconnect all the connections.
+ */
+void
+DisconnectDatabase(ParallelSlot *slot)
+{
+	PGcancel   *cancel;
+	char		errbuf[1];
+
+	if (!slot->connection)
+		return;
+
+	if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE)
+	{
+		if ((cancel = PQgetCancel(slot->connection)))
+		{
+			PQcancel(cancel, errbuf, sizeof(errbuf));
+			PQfreeCancel(cancel);
+		}
+	}
+
+	PQfinish(slot->connection);
+	slot->connection= NULL;
+}
+
+
+
+void prepare_command(PGconn *conn, bool full, bool verbose, bool and_analyze,
+					 bool analyze_only, bool freeze, PQExpBuffer sql)
+{
+	if (analyze_only)
+	{
+		appendPQExpBufferStr(sql, "ANALYZE");
+		if (verbose)
+			appendPQExpBufferStr(sql, " VERBOSE");
+	}
+	else
+	{
+		appendPQExpBufferStr(sql, "VACUUM");
+		if (PQserverVersion(conn) >= 90000)
+		{
+			const char *paren = " (";
+			const char *comma = ", ";
+			const char *sep = paren;
+
+			if (full)
+			{
+				appendPQExpBuffer(sql, "%sFULL", sep);
+				sep = comma;
+			}
+			if (freeze)
+			{
+				appendPQExpBuffer(sql, "%sFREEZE", sep);
+				sep = comma;
+			}
+			if (verbose)
+			{
+				appendPQExpBuffer(sql, "%sVERBOSE", sep);
+				sep = comma;
+			}
+			if (and_analyze)
+			{
+				appendPQExpBuffer(sql, "%sANALYZE", sep);
+				sep = comma;
+			}
+			if (sep != paren)
+				appendPQExpBufferStr(sql, ")");
+		}
+		else
+		{
+			if (full)
+				appendPQExpBufferStr(sql, " FULL");
+			if (freeze)
+				appendPQExpBufferStr(sql, " FREEZE");
+			if (verbose)
+				appendPQExpBufferStr(sql, " VERBOSE");
+			if (and_analyze)
+				appendPQExpBufferStr(sql, " ANALYZE");
+		}
+	}
+}
+
 
 static void
 help(const char *progname)
@@ -436,6 +977,7 @@ help(const char *progname)
 	printf(_("  -V, --version                   output version information, then exit\n"));
 	printf(_("  -z, --analyze                   update optimizer statistics\n"));
 	printf(_("  -Z, --analyze-only              only update optimizer statistics\n"));
+	printf(_("  -j, --jobs=NUM                  use this many concurrent connections to vacuum\n"));
 	printf(_("      --analyze-in-stages         only update optimizer statistics, in multiple\n"
 		   "                                  stages for faster results\n"));
 	printf(_("  -?, --help                      show this help, then exit\n"));


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic