[prev in list] [next in list] [prev in thread] [next in thread]
List: postgresql-general
Subject: Re: [HACKERS] TODO : Allow parallel cores to be used by vacuumdb [ WIP ]
From: Amit Kapila <amit.kapila16 () gmail ! com>
Date: 2014-12-31 13:17:38
Message-ID: CAA4eK1Jp+u2+46wE8Nmg_gWiO_b7_L1afAwS9awgRkE=DfW_nA () mail ! gmail ! com
[Download RAW message or body]
[Attachment #2 (multipart/alternative)]
On Mon, Dec 29, 2014 at 11:10 AM, Dilip kumar <dilip.kumar@huawei.com>
wrote:
>
> On 29 December 2014 10:22 Amit Kapila Wrote,
>
>
> I think nothing more to be handled from my side, you can go ahead with
review..
>
The patch looks good to me. I have done couple of
cosmetic changes (spelling mistakes, improve some comments,
etc.), check the same once and if you are okay, we can move
ahead.
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
[Attachment #5 (text/html)]
<div dir="ltr">On Mon, Dec 29, 2014 at 11:10 AM, Dilip kumar <<a \
href="mailto:dilip.kumar@huawei.com">dilip.kumar@huawei.com</a>> \
wrote:<br>><br>> On 29 December 2014 10:22 Amit Kapila \
Wrote,<br>><br>><br>> I think nothing more to be handled from my side, you \
can go ahead with review..<br>><br><br><div>The patch looks good to me. I have \
done couple of</div><div style>cosmetic changes (spelling mistakes, improve some \
comments,</div><div style>etc.), check the same once and if you are okay, we can \
move</div><div style>ahead. </div><div><br><div><br clear="all"><div><div \
class="gmail_signature"><br>With Regards,<br>Amit Kapila.<br>EnterpriseDB: <a \
href="http://www.enterprisedb.com/" \
target="_blank">http://www.enterprisedb.com</a></div></div></div></div></div>
["vacuumdb_parallel_v21.patch" (application/octet-stream)]
diff --git a/doc/src/sgml/ref/vacuumdb.sgml b/doc/src/sgml/ref/vacuumdb.sgml
index 3ecd999..e4a971f 100644
--- a/doc/src/sgml/ref/vacuumdb.sgml
+++ b/doc/src/sgml/ref/vacuumdb.sgml
@@ -204,6 +204,27 @@ PostgreSQL documentation
</varlistentry>
<varlistentry>
+ <term><option>-j <replaceable class="parameter">jobs</replaceable></option></term>
+ <term><option>--jobs=<replaceable class="parameter">njobs</replaceable></option></term>
+ <listitem>
+ <para>
+ Number of concurrent connections to perform the operation.
+ This option will enable the vacuum operation to run on asynchronous
+ connections, at a time one table will be operated on one connection.
+ So at one time as many tables will be vacuumed parallely as number of
+ jobs. If number of jobs given are more than number of tables then
+ number of jobs will be set to number of tables.
+ </para>
+ <para>
+ <application>vacuumdb</application> will open
+ <replaceable class="parameter"> njobs</replaceable> connections to the
+ database, so make sure your <xref linkend="guc-max-connections">
+ setting is high enough to accommodate all connections.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><option>--analyze-in-stages</option></term>
<listitem>
<para>
diff --git a/src/bin/scripts/common.c b/src/bin/scripts/common.c
index 311fed5..2220a4e 100644
--- a/src/bin/scripts/common.c
+++ b/src/bin/scripts/common.c
@@ -19,10 +19,9 @@
#include "common.h"
-static void SetCancelConn(PGconn *conn);
-static void ResetCancelConn(void);
static PGcancel *volatile cancelConn = NULL;
+static bool inAbort = false;
#ifdef WIN32
static CRITICAL_SECTION cancelConnLock;
@@ -291,7 +290,7 @@ yesno_prompt(const char *question)
*
* Set cancelConn to point to the current database connection.
*/
-static void
+void
SetCancelConn(PGconn *conn)
{
PGcancel *oldCancelConn;
@@ -321,7 +320,7 @@ SetCancelConn(PGconn *conn)
*
* Free the current cancel connection, if any, and set to NULL.
*/
-static void
+void
ResetCancelConn(void)
{
PGcancel *oldCancelConn;
@@ -359,10 +358,15 @@ handle_sigint(SIGNAL_ARGS)
if (cancelConn != NULL)
{
if (PQcancel(cancelConn, errbuf, sizeof(errbuf)))
+ {
+ inAbort = true;
fprintf(stderr, _("Cancel request sent\n"));
+ }
else
fprintf(stderr, _("Could not send cancel request: %s"), errbuf);
}
+ else
+ inAbort = true;
errno = save_errno; /* just in case the write changed it */
}
@@ -392,10 +396,16 @@ consoleHandler(DWORD dwCtrlType)
if (cancelConn != NULL)
{
if (PQcancel(cancelConn, errbuf, sizeof(errbuf)))
+ {
fprintf(stderr, _("Cancel request sent\n"));
+ inAbort = true;
+ }
else
fprintf(stderr, _("Could not send cancel request: %s"), errbuf);
}
+ else
+ inAbort = true;
+
LeaveCriticalSection(&cancelConnLock);
return TRUE;
@@ -414,3 +424,8 @@ setup_cancel_handler(void)
}
#endif /* WIN32 */
+
+bool in_abort()
+{
+ return inAbort;
+}
diff --git a/src/bin/scripts/common.h b/src/bin/scripts/common.h
index 691f6c6..3bafde3 100644
--- a/src/bin/scripts/common.h
+++ b/src/bin/scripts/common.h
@@ -49,4 +49,9 @@ extern bool yesno_prompt(const char *question);
extern void setup_cancel_handler(void);
+extern void SetCancelConn(PGconn *conn);
+extern void ResetCancelConn(void);
+extern bool in_abort(void);
+
+
#endif /* COMMON_H */
diff --git a/src/bin/scripts/vacuumdb.c b/src/bin/scripts/vacuumdb.c
index 86e6ab3..05cd74f 100644
--- a/src/bin/scripts/vacuumdb.c
+++ b/src/bin/scripts/vacuumdb.c
@@ -14,6 +14,18 @@
#include "common.h"
#include "dumputils.h"
+#define NO_SLOT (-1)
+
+/* Arguments needed for a worker process */
+typedef struct ParallelSlot
+{
+ PGconn *connection;
+ bool isFree;
+ pgsocket sock;
+} ParallelSlot;
+
+#define ERRCODE_UNDEFINED_TABLE "42P01"
+#define ERROR_IN_ABORT -2
static void vacuum_one_database(const char *dbname, bool full, bool verbose,
bool and_analyze, bool analyze_only, bool analyze_in_stages, int stage, bool freeze,
@@ -25,10 +37,40 @@ static void vacuum_all_databases(bool full, bool verbose, bool and_analyze,
const char *maintenance_db,
const char *host, const char *port,
const char *username, enum trivalue prompt_password,
- const char *progname, bool echo, bool quiet);
+ const char *progname, bool echo, bool quiet,
+ int concurrentCons);
static void help(const char *progname);
+void vacuum_parallel(const char *dbname, bool full, bool verbose,
+ bool and_analyze, bool analyze_only,
+ bool analyze_in_stages, int stage, bool freeze,
+ const char *host, const char *port,
+ const char *username, enum trivalue prompt_password,
+ const char *progname, bool echo, int concurrentCons,
+ SimpleStringList *tables, bool quiet);
+
+
+void prepare_command(PGconn *conn, bool full, bool verbose, bool and_analyze,
+ bool analyze_only, bool freeze, PQExpBuffer sql);
+static void
+run_parallel_vacuum(bool echo, const char *dbname, SimpleStringList *tables,
+ bool full, bool verbose, bool and_analyze,
+ bool analyze_only, bool freeze, int concurrentCons,
+ const char *progname, int analyze_stage,
+ ParallelSlot *connSlot, bool completedb);
+static int
+GetIdleSlot(ParallelSlot *pSlot, int max_slot, const char *dbname,
+ const char *progname, bool completedb);
+
+static bool GetQueryResult(PGconn *conn, const char *dbname,
+ const char *progname, bool completedb);
+
+static int
+select_loop(int maxFd, fd_set *workerset);
+
+static void DisconnectDatabase(ParallelSlot *slot);
+
int
main(int argc, char *argv[])
@@ -49,6 +91,7 @@ main(int argc, char *argv[])
{"table", required_argument, NULL, 't'},
{"full", no_argument, NULL, 'f'},
{"verbose", no_argument, NULL, 'v'},
+ {"jobs", required_argument, NULL, 'j'},
{"maintenance-db", required_argument, NULL, 2},
{"analyze-in-stages", no_argument, NULL, 3},
{NULL, 0, NULL, 0}
@@ -74,13 +117,15 @@ main(int argc, char *argv[])
bool full = false;
bool verbose = false;
SimpleStringList tables = {NULL, NULL};
+ int concurrentCons = 0;
+ int tbl_count = 0;
progname = get_progname(argv[0]);
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
handle_help_version_opts(argc, argv, "vacuumdb", help);
- while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fv", long_options, &optindex)) != -1)
+ while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fv:j:", long_options, &optindex)) != -1)
{
switch (c)
{
@@ -121,14 +166,27 @@ main(int argc, char *argv[])
alldb = true;
break;
case 't':
+ {
simple_string_list_append(&tables, optarg);
+ tbl_count++;
break;
+ }
case 'f':
full = true;
break;
case 'v':
verbose = true;
break;
+ case 'j':
+ concurrentCons = atoi(optarg);
+ if (concurrentCons <= 0)
+ {
+ fprintf(stderr, _("%s: Number of parallel \"jobs\" should be at least 1\n"),
+ progname);
+ exit(1);
+ }
+
+ break;
case 2:
maintenance_db = pg_strdup(optarg);
break;
@@ -141,6 +199,7 @@ main(int argc, char *argv[])
}
}
+ optind++;
/*
* Non-option argument specifies database name as long as it wasn't
@@ -179,6 +238,10 @@ main(int argc, char *argv[])
setup_cancel_handler();
+ /* Avoid opening extra connections. */
+ if (tbl_count && (concurrentCons > tbl_count))
+ concurrentCons = tbl_count;
+
if (alldb)
{
if (dbname)
@@ -196,7 +259,7 @@ main(int argc, char *argv[])
vacuum_all_databases(full, verbose, and_analyze, analyze_only, analyze_in_stages, freeze,
maintenance_db, host, port, username,
- prompt_password, progname, echo, quiet);
+ prompt_password, progname, echo, quiet, concurrentCons);
}
else
{
@@ -210,25 +273,36 @@ main(int argc, char *argv[])
dbname = get_user_name_or_exit(progname);
}
- if (tables.head != NULL)
+ if (concurrentCons > 1)
{
- SimpleStringListCell *cell;
+ vacuum_parallel(dbname, full, verbose, and_analyze,
+ analyze_only, analyze_in_stages, -1,
+ freeze, host, port, username, prompt_password,
+ progname, echo, concurrentCons, &tables, quiet);
- for (cell = tables.head; cell; cell = cell->next)
+ }
+ else
+ {
+ if (tables.head != NULL)
{
+ SimpleStringListCell *cell;
+
+ for (cell = tables.head; cell; cell = cell->next)
+ {
+ vacuum_one_database(dbname, full, verbose, and_analyze,
+ analyze_only, analyze_in_stages, -1,
+ freeze, cell->val,
+ host, port, username, prompt_password,
+ progname, echo, quiet);
+ }
+ }
+ else
vacuum_one_database(dbname, full, verbose, and_analyze,
analyze_only, analyze_in_stages, -1,
- freeze, cell->val,
+ freeze, NULL,
host, port, username, prompt_password,
progname, echo, quiet);
- }
}
- else
- vacuum_one_database(dbname, full, verbose, and_analyze,
- analyze_only, analyze_in_stages, -1,
- freeze, NULL,
- host, port, username, prompt_password,
- progname, echo, quiet);
}
exit(0);
@@ -268,56 +342,9 @@ vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyz
conn = connectDatabase(dbname, host, port, username, prompt_password,
progname, false);
- if (analyze_only)
- {
- appendPQExpBufferStr(&sql, "ANALYZE");
- if (verbose)
- appendPQExpBufferStr(&sql, " VERBOSE");
- }
- else
- {
- appendPQExpBufferStr(&sql, "VACUUM");
- if (PQserverVersion(conn) >= 90000)
- {
- const char *paren = " (";
- const char *comma = ", ";
- const char *sep = paren;
+ prepare_command(conn, full, verbose,
+ and_analyze, analyze_only, freeze, &sql);
- if (full)
- {
- appendPQExpBuffer(&sql, "%sFULL", sep);
- sep = comma;
- }
- if (freeze)
- {
- appendPQExpBuffer(&sql, "%sFREEZE", sep);
- sep = comma;
- }
- if (verbose)
- {
- appendPQExpBuffer(&sql, "%sVERBOSE", sep);
- sep = comma;
- }
- if (and_analyze)
- {
- appendPQExpBuffer(&sql, "%sANALYZE", sep);
- sep = comma;
- }
- if (sep != paren)
- appendPQExpBufferStr(&sql, ")");
- }
- else
- {
- if (full)
- appendPQExpBufferStr(&sql, " FULL");
- if (freeze)
- appendPQExpBufferStr(&sql, " FREEZE");
- if (verbose)
- appendPQExpBufferStr(&sql, " VERBOSE");
- if (and_analyze)
- appendPQExpBufferStr(&sql, " ANALYZE");
- }
- }
if (table)
appendPQExpBuffer(&sql, " %s", table);
appendPQExpBufferStr(&sql, ";");
@@ -353,8 +380,10 @@ vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyz
}
else
{
- /* Otherwise, we got a stage from vacuum_all_databases(), so run
- * only that one. */
+ /*
+ * Otherwise, we got a stage from vacuum_all_databases(), so run
+ * only that one.
+ */
if (!quiet)
{
puts(gettext(stage_messages[stage]));
@@ -374,11 +403,12 @@ vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyz
static void
-vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_only,
- bool analyze_in_stages, bool freeze, const char *maintenance_db,
- const char *host, const char *port,
- const char *username, enum trivalue prompt_password,
- const char *progname, bool echo, bool quiet)
+vacuum_all_databases(bool full, bool verbose, bool and_analyze,
+ bool analyze_only, bool analyze_in_stages, bool freeze,
+ const char *maintenance_db, const char *host,
+ const char *port, const char *username,
+ enum trivalue prompt_password, const char *progname,
+ bool echo, bool quiet, int concurrentCons)
{
PGconn *conn;
PGresult *result;
@@ -390,7 +420,8 @@ vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_onl
PQfinish(conn);
/* If analyzing in stages, then run through all stages. Otherwise just
- * run once, passing -1 as the stage. */
+ * run once, passing -1 as the stage.
+ */
for (stage = (analyze_in_stages ? 0 : -1);
stage < (analyze_in_stages ? 3 : 0);
stage++)
@@ -407,6 +438,15 @@ vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_onl
fflush(stdout);
}
+ if (concurrentCons > 1)
+ {
+ vacuum_parallel(dbname, full, verbose, and_analyze,
+ analyze_only, analyze_in_stages, stage,
+ freeze, host, port, username, prompt_password,
+ progname, echo, concurrentCons, NULL, quiet);
+
+ }
+ else
vacuum_one_database(dbname, full, verbose, and_analyze, analyze_only,
analyze_in_stages, stage,
freeze, NULL, host, port, username, prompt_password,
@@ -417,6 +457,507 @@ vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_onl
PQclear(result);
}
+/*
+ * run_parallel_vacuum
+ * This function does the actual work for sending the jobs
+ * concurrently to server.
+ */
+static void
+run_parallel_vacuum(bool echo, const char *dbname, SimpleStringList *tables,
+ bool full, bool verbose, bool and_analyze,
+ bool analyze_only, bool freeze, int concurrentCons,
+ const char *progname, int analyze_stage,
+ ParallelSlot *connSlot, bool completedb)
+{
+ PQExpBufferData sql;
+ SimpleStringListCell *cell;
+ int max_slot = concurrentCons;
+ int i;
+ int free_slot;
+ PGconn *slotconn;
+ bool error = false;
+ const char *stage_commands[] = {
+ "SET default_statistics_target=1; SET vacuum_cost_delay=0;",
+ "SET default_statistics_target=10; RESET vacuum_cost_delay;",
+ "RESET default_statistics_target;"};
+
+ initPQExpBuffer(&sql);
+
+ if (analyze_stage >= 0)
+ {
+ for (i = 0; i < max_slot; i++)
+ {
+ executeCommand(connSlot[i].connection,
+ stage_commands[analyze_stage], progname, echo);
+ }
+ }
+
+ for (cell = tables->head; cell; cell = cell->next)
+ {
+ if (in_abort())
+ {
+ error = true;
+ goto fail;
+ }
+
+ /*
+ * This will give the free connection slot, if no slot is free it will
+ * wait for atleast one slot to get free.
+ */
+ free_slot = GetIdleSlot(connSlot, max_slot, dbname, progname,
+ completedb);
+ if (free_slot == NO_SLOT)
+ {
+ error = true;
+ goto fail;
+ }
+
+ prepare_command(connSlot[free_slot].connection, full, verbose,
+ and_analyze, analyze_only, freeze, &sql);
+
+ appendPQExpBuffer(&sql, " %s", cell->val);
+ appendPQExpBufferStr(&sql, ";");
+
+ connSlot[free_slot].isFree = false;
+
+ slotconn = connSlot[free_slot].connection;
+ PQsendQuery(slotconn, sql.data);
+
+ resetPQExpBuffer(&sql);
+ }
+
+ for (i = 0; i < max_slot; i++)
+ {
+ /* wait for all connection to return the results*/
+ if (!GetQueryResult(connSlot[i].connection, dbname, progname,
+ completedb))
+ {
+ error = true;
+ goto fail;
+ }
+
+ connSlot[i].isFree = true;
+ }
+
+fail:
+
+ termPQExpBuffer(&sql);
+
+ if (error)
+ {
+ for (i = 0; i < max_slot; i++)
+ {
+ DisconnectDatabase(&connSlot[i]);
+ }
+
+ pfree(connSlot);
+
+ exit(1);
+ }
+}
+
+/*
+ * GetIdleSlot
+ * Process the slot list, if any free slot is available then return
+ * the slotid else perform the select on all the socket's and wait
+ * until atleast one slot becomes available.
+ */
+static int
+GetIdleSlot(ParallelSlot *pSlot, int max_slot, const char *dbname,
+ const char *progname, bool completedb)
+{
+ int i;
+ fd_set slotset;
+ int firstFree = -1;
+ pgsocket maxFd;
+
+ for (i = 0; i < max_slot; i++)
+ if (pSlot[i].isFree)
+ return i;
+
+ FD_ZERO(&slotset);
+
+ maxFd = pSlot[0].sock;
+
+ for (i = 0; i < max_slot; i++)
+ {
+ FD_SET(pSlot[i].sock, &slotset);
+ if (pSlot[i].sock > maxFd)
+ maxFd = pSlot[i].sock;
+ }
+
+ /*
+ * No free slot found, so wait untill one of the connections
+ * has finished it's task and return the available slot.
+ */
+ do
+ {
+ SetCancelConn(pSlot[0].connection);
+
+ i = select_loop(maxFd, &slotset);
+
+ ResetCancelConn();
+
+ if (i == ERROR_IN_ABORT)
+ {
+ /*
+ * This can only happen if user has sent the cancel request using
+ * Ctrl+C, cancel is handled by 0th slot, so fetch the error result.
+ */
+ GetQueryResult(pSlot[0].connection, dbname, progname,
+ completedb);
+ return NO_SLOT;
+ }
+
+ Assert(i != 0);
+
+ for (i = 0; i < max_slot; i++)
+ {
+ if (!FD_ISSET(pSlot[i].sock, &slotset))
+ continue;
+
+ PQconsumeInput(pSlot[i].connection);
+ if (PQisBusy(pSlot[i].connection))
+ continue;
+
+ pSlot[i].isFree = true;
+
+ if (!GetQueryResult(pSlot[i].connection, dbname, progname,
+ completedb))
+ return NO_SLOT;
+
+ if (firstFree < 0)
+ firstFree = i;
+ }
+ }while(firstFree < 0);
+
+ return firstFree;
+}
+
+/*
+ * GetQueryResult
+ * Process the query result.
+ */
+static bool GetQueryResult(PGconn *conn, const char *dbname,
+ const char *progname, bool completedb)
+{
+ PGresult *result = NULL;
+ PGresult *lastResult = NULL;
+ bool r;
+
+
+ SetCancelConn(conn);
+ while((result = PQgetResult(conn)) != NULL)
+ {
+ PQclear(lastResult);
+ lastResult = result;
+ }
+
+ ResetCancelConn();
+
+ if (!lastResult)
+ return true;
+
+ r = (PQresultStatus(lastResult) == PGRES_COMMAND_OK);
+
+ /*
+ * If user has given the vacuum of complete db and vacuum for
+ * any of the object is failed, it can be ignored and vacuuming
+ * of other objects can be continued, this is the same behavior as
+ * vacuuming of complete db is handled without --jobs option
+ */
+ if (!r)
+ {
+ char *sqlState = PQresultErrorField(lastResult, PG_DIAG_SQLSTATE);
+
+ if (!completedb ||
+ (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0))
+ {
+
+ fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
+ progname, dbname, PQerrorMessage(conn));
+
+ PQclear(lastResult);
+ return false;
+ }
+ }
+
+ PQclear(lastResult);
+ return true;
+}
+
+/*
+ * vacuum_parallel
+ * This function will open multiple connections to perform the
+ * vacuum on table's concurrently. Incase vacuum needs to be performed
+ * on database, it retrieve's the list of tables and then perform
+ * vacuum.
+ */
+void
+vacuum_parallel(const char *dbname, bool full, bool verbose,
+ bool and_analyze, bool analyze_only, bool analyze_in_stages,
+ int stage, bool freeze, const char *host, const char *port,
+ const char *username, enum trivalue prompt_password,
+ const char *progname, bool echo, int concurrentCons,
+ SimpleStringList *tables, bool quiet)
+{
+
+ PGconn *conn;
+ int i;
+ ParallelSlot *connSlot;
+ SimpleStringList dbtables = {NULL, NULL};
+ bool completeDb = false;
+
+ conn = connectDatabase(dbname, host, port, username,
+ prompt_password, progname, false);
+
+ /*
+ * if table list is not provided then we need to do vaccum for whole DB
+ * get the list of all tables and prepare the list
+ */
+ if (!tables || !tables->head)
+ {
+ PGresult *res;
+ int ntuple;
+ int i;
+ PQExpBufferData sql;
+
+ initPQExpBuffer(&sql);
+
+ res = executeQuery(conn,
+ "SELECT c.relname, ns.nspname FROM pg_class c, pg_namespace ns"
+ " WHERE (relkind = \'r\' or relkind = \'m\')"
+ " and c.relnamespace = ns.oid ORDER BY c.relpages desc",
+ progname, echo);
+
+ ntuple = PQntuples(res);
+ for (i = 0; i < ntuple; i++)
+ {
+ appendPQExpBuffer(&sql, "%s",
+ fmtQualifiedId(PQserverVersion(conn),
+ PQgetvalue(res, i, 1),
+ PQgetvalue(res, i, 0)));
+
+ simple_string_list_append(&dbtables, sql.data);
+ resetPQExpBuffer(&sql);
+ }
+
+ termPQExpBuffer(&sql);
+ tables = &dbtables;
+
+ /* remember that we are vaccuming full database. */
+ completeDb = true;
+
+ if (concurrentCons > ntuple)
+ concurrentCons = ntuple;
+ }
+
+ connSlot = (ParallelSlot*)pg_malloc(concurrentCons * sizeof(ParallelSlot));
+ connSlot[0].connection = conn;
+ connSlot[0].sock = PQsocket(conn);
+
+ PQsetnonblocking(connSlot[0].connection, 1);
+
+ for (i = 1; i < concurrentCons; i++)
+ {
+ connSlot[i].connection = connectDatabase(dbname, host, port, username,
+ prompt_password, progname, false);
+
+ PQsetnonblocking(connSlot[i].connection, 1);
+ connSlot[i].isFree = true;
+ connSlot[i].sock = PQsocket(connSlot[i].connection);
+ }
+
+ if (analyze_in_stages)
+ {
+ const char *stage_messages[] = {
+ gettext_noop("Generating minimal optimizer statistics (1 target)"),
+ gettext_noop("Generating medium optimizer statistics (10 targets)"),
+ gettext_noop("Generating default (full) optimizer statistics")
+ };
+
+ if (stage == -1)
+ {
+ int i;
+ for (i = 0; i < 3; i++)
+ {
+ if (!quiet)
+ {
+ puts(gettext(stage_messages[i]));
+ fflush(stdout);
+ }
+
+ run_parallel_vacuum(echo, dbname, tables, full, verbose,
+ and_analyze, analyze_only, freeze,
+ concurrentCons, progname, i, connSlot,
+ completeDb);
+ }
+ }
+ else
+ {
+ /*
+ * Otherwise, we got a stage from vacuum_all_databases(), so run
+ * only that one.
+ */
+ if (!quiet)
+ {
+ puts(gettext(stage_messages[stage]));
+ fflush(stdout);
+ }
+
+ run_parallel_vacuum(echo, dbname, tables, full, verbose,
+ and_analyze, analyze_only, freeze,
+ concurrentCons, progname, stage,
+ connSlot, completeDb);
+ }
+ }
+ else
+ run_parallel_vacuum(echo, dbname, tables, full, verbose,
+ and_analyze, analyze_only, freeze,
+ concurrentCons, progname, -1, connSlot,
+ completeDb);
+
+ for (i = 0; i < concurrentCons; i++)
+ PQfinish(connSlot[i].connection);
+
+ pfree(connSlot);
+}
+
+/*
+ * A select loop that repeats calling select until a descriptor in the read
+ * set becomes readable. On Windows we have to check for the termination event
+ * from time to time, on Unix we can just block forever.
+ */
+static int
+select_loop(int maxFd, fd_set *workerset)
+{
+ int i;
+ fd_set saveSet = *workerset;
+
+#ifdef WIN32
+ /* should always be the master */
+ for (;;)
+ {
+ /*
+ * sleep a quarter of a second before checking if we should terminate.
+ */
+ struct timeval tv = {0, 250000};
+
+ *workerset = saveSet;
+ i = select(maxFd + 1, workerset, NULL, NULL, &tv);
+ if (in_abort())
+ {
+ return ERROR_IN_ABORT;
+ }
+
+ if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
+ continue;
+ if (i)
+ break;
+ }
+#else /* UNIX */
+
+ for (;;)
+ {
+ *workerset = saveSet;
+ i = select(maxFd + 1, workerset, NULL, NULL, NULL);
+ if (in_abort())
+ {
+ return -1;
+ }
+
+ if (i < 0 && errno == EINTR)
+ continue;
+ break;
+ }
+#endif
+
+ return i;
+}
+
+/*
+ * DisconnectDatabase
+ * disconnect all the connections.
+ */
+void
+DisconnectDatabase(ParallelSlot *slot)
+{
+ PGcancel *cancel;
+ char errbuf[1];
+
+ if (!slot->connection)
+ return;
+
+ if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE)
+ {
+ if ((cancel = PQgetCancel(slot->connection)))
+ {
+ PQcancel(cancel, errbuf, sizeof(errbuf));
+ PQfreeCancel(cancel);
+ }
+ }
+
+ PQfinish(slot->connection);
+ slot->connection= NULL;
+}
+
+
+
+void prepare_command(PGconn *conn, bool full, bool verbose, bool and_analyze,
+ bool analyze_only, bool freeze, PQExpBuffer sql)
+{
+ if (analyze_only)
+ {
+ appendPQExpBufferStr(sql, "ANALYZE");
+ if (verbose)
+ appendPQExpBufferStr(sql, " VERBOSE");
+ }
+ else
+ {
+ appendPQExpBufferStr(sql, "VACUUM");
+ if (PQserverVersion(conn) >= 90000)
+ {
+ const char *paren = " (";
+ const char *comma = ", ";
+ const char *sep = paren;
+
+ if (full)
+ {
+ appendPQExpBuffer(sql, "%sFULL", sep);
+ sep = comma;
+ }
+ if (freeze)
+ {
+ appendPQExpBuffer(sql, "%sFREEZE", sep);
+ sep = comma;
+ }
+ if (verbose)
+ {
+ appendPQExpBuffer(sql, "%sVERBOSE", sep);
+ sep = comma;
+ }
+ if (and_analyze)
+ {
+ appendPQExpBuffer(sql, "%sANALYZE", sep);
+ sep = comma;
+ }
+ if (sep != paren)
+ appendPQExpBufferStr(sql, ")");
+ }
+ else
+ {
+ if (full)
+ appendPQExpBufferStr(sql, " FULL");
+ if (freeze)
+ appendPQExpBufferStr(sql, " FREEZE");
+ if (verbose)
+ appendPQExpBufferStr(sql, " VERBOSE");
+ if (and_analyze)
+ appendPQExpBufferStr(sql, " ANALYZE");
+ }
+ }
+}
+
static void
help(const char *progname)
@@ -436,6 +977,7 @@ help(const char *progname)
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -z, --analyze update optimizer statistics\n"));
printf(_(" -Z, --analyze-only only update optimizer statistics\n"));
+ printf(_(" -j, --jobs=NUM use this many concurrent connections to vacuum\n"));
printf(_(" --analyze-in-stages only update optimizer statistics, in multiple\n"
" stages for faster results\n"));
printf(_(" -?, --help show this help, then exit\n"));
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic