[prev in list] [next in list] [prev in thread] [next in thread]
List: pgsql-hackers
Subject: Re: [HACKERS] libpq pipelining
From: Matt Newell <newellm () blur ! com>
Date: 2014-12-10 21:29:59
Message-ID: 1499322.vmUVO5HYcT () obsidian
[Download RAW message or body]
On Friday, December 05, 2014 12:22:38 PM Heikki Linnakangas wrote:
> Oh, that's what the PQgetLastQuery/PQgetNextQuery functions work! I
> didn't understand that before. I'd suggest renaming them to something
> like PQgetSentQuery() and PQgetResultQuery(). The first/last/next names
> made me think that they're used to iterate a list of queries, but in
> fact they're supposed to be used at very different stages.
>
> - Heikki
Okay, I have renamed them with your suggestions, and added
PQsetPipelining/PQgetPipelining, defaulting to pipelining off. There should be
no behavior change unless pipelining is enabled.
Documentation should be mostly complete except the possible addition of an
example and maybe a general pipelining overview paragraph.
I have implemented async query support (that takes advantage of pipelining) in
Qt, along with a couple test cases. This led to me discovering a bug with my
last patch where a PGquery object could be reused twice in a row. I have fixed
that. I contemplated not reusing the PGquery objects at all, but that
wouldn't solve the problem because it's very possible that malloc will return
a recent free of the same size anyway. Making the guarantee that a PGquery
won't be reused twice in a row should be sufficient, and the only alternative is
to add a unique id, but that will add further complexity that I don't think is
warranted.
Feedback is very welcome and appreciated.
Thanks,
Matt Newell
["libpq.pipeline.docs.patch" (libpq.pipeline.docs.patch)]
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index d829a4b..4e0431e 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -3947,9 +3947,14 @@ int PQsendQuery(PGconn *conn, const char *command);
After successfully calling <function>PQsendQuery</function>, call
<function>PQgetResult</function> one or more times to obtain the
- results. <function>PQsendQuery</function> cannot be called again
- (on the same connection) until <function>PQgetResult</function>
- has returned a null pointer, indicating that the command is done.
+ results. If pipelining is enabled <function>PQsendQuery</function>
+ may be called multiple times before reading the results. See
+ <function>PQsetPipelining</function> and <function>PQisPipelining</function>.
+ Call <function>PQgetSentQuery</function> to get a <structname>PGquery</structname>
+ which can be used to identify which results obtained from
+ <function>PQgetResult</function> belong to each pipelined query.
+ If only one query is dispatched at a time, you can call <function>PQgetResult</function>
+ until a NULL value is returned to indicate the end of the query.
</para>
</listitem>
</varlistentry>
@@ -4133,8 +4138,8 @@ PGresult *PQgetResult(PGconn *conn);
<para>
<function>PQgetResult</function> must be called repeatedly until
- it returns a null pointer, indicating that the command is done.
- (If called when no command is active,
+ it returns a null pointer, indicating that all dispatched commands
+ are done. (If called when no command is active,
<function>PQgetResult</function> will just return a null pointer
at once.) Each non-null result from
<function>PQgetResult</function> should be processed using the
@@ -4144,14 +4149,17 @@ PGresult *PQgetResult(PGconn *conn);
<function>PQgetResult</function> will block only if a command is
active and the necessary response data has not yet been read by
<function>PQconsumeInput</function>.
+ If query pipelining is being used, <function>PQgetResultQuery</function>
+ can be called after PQgetResult to match the result to the query.
</para>
<note>
<para>
Even when <function>PQresultStatus</function> indicates a fatal
- error, <function>PQgetResult</function> should be called until it
- returns a null pointer, to allow <application>libpq</> to
- process the error information completely.
+ error, <function>PQgetResult</function> should be called until the
+ query has no more results (null pointer return if not using query
+ pipelining, otherwise see <function>PQgetResultQuery</function>),
+ to allow <application>libpq</> to process the error information completely.
</para>
</note>
</listitem>
@@ -4385,6 +4393,158 @@ int PQflush(PGconn *conn);
read-ready and then read the response as described above.
</para>
+ <variablelist>
+ <varlistentry id="libpq-pqsetpipelining">
+ <term>
+ <function>PQsetPipelining</function>
+ <indexterm>
+ <primary>PQsetPipelining</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ Enables or disables query pipelining.
+<synopsis>
+int PQsetPipelining(PGconn *conn, int arg);
+</synopsis>
+ </para>
+
+ <para>
+ Enables pipelining for the connectino if arg is 1, or disables it
+ if arg is 0. When pipelining is enabled multiple async queries can
+ be sent before processing the results of the first. If pipelining
+ is disabled an error will be raised an async query is attempted
+ while another is active.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="libpq-pqispipelining">
+ <term>
+ <function>PQisPipelining</function>
+ <indexterm>
+ <primary>PQisPipelining</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ Returns the pipelining status of the connection
+<synopsis>
+int PQisPipelining(PGconn *conn);
+</synopsis>
+ </para>
+
+ <para>
+ Returns 1 if pipelining is enabled, or 0 if pipeling is disabled.
+ Query pipelining is disabled unless enabled with a call to
+ <function>PQsetPipelining</function>.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="libpq-pqgetquerycommand">
+ <term>
+ <function>PQgetQueryCommand</function>
+ <indexterm>
+ <primary>PQgetQueryCommand</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ Returns the query string associated with the <structure>PGquery</structure>.
+<synopsis>
+const char * PQgetQueryCommand(PGquery *query);
+</synopsis>
+ </para>
+
+ <para>
+ When using query pipelining this function can be used to retrieve the command
+ that created the query object.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="libpq-pqgetresultquery">
+ <term>
+ <function>PQgetResultQuery</function>
+ <indexterm>
+ <primary>PQgetResultQuery</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ Returns the first async query to recieve results, or NULL if no
+ async queries are active.
+<synopsis>
+PGquery * PQgetResultQuery(PGconn *conn);
+</synopsis>
+ </para>
+
+ <para>
+ When pipelining queries this function indicates which query the
+ result of <function>PQgetResult</function> results from.
+ Call this function immediately after calling
+ <function>PQgetResult</function>, or immediately before if a result
+ is ready to read, indicated by <function>PQisBusy</function>
+ being false. The <structure>PGquery</structure> remains valid
+ until the next libpq call that consumes input.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="libpq-pqgetnextquery">
+ <term>
+ <function>PQgetNextQuery</function>
+ <indexterm>
+ <primary>PQgetNextQuery</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ Returns the next <structure>PGquery</structure> in the list of
+ pipelined queries.
+<synopsis>
+PGquery * PQgetNextQuery(PGquery *query);
+</synopsis>
+ </para>
+
+ <para>
+ This function can be used to iterate each pending async query,
+ starting with <function>PQgetResultQuery</function>
+ and ending with <function>PQgetSentQuery</function>.
+ </para>
+ </varlistentry>
+
+ <varlistentry>
+ <term>
+ <function>PQgetSentQuery</function>
+ <indexterm>
+ <primary>PQgetSentQuery</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ Returns the last <structure>PGquery</structure> in the list of
+ dispatched async queries waiting for results.
+<synopsis>
+PGquery * PQgetSentQuery(PGquery *query);
+</synopsis>
+ </para>
+
+ <para>
+ Call this function after dispatching an async query to get
+ a <structure>PGquery</structure> that can be used to identify
+ the originating query for each result obtained by
+ <function>PGgetResult</function>.
+ </para>
+ </varlistentry>
+ </variablelist>
</sect1>
<sect1 id="libpq-single-row-mode">
@@ -4411,7 +4571,7 @@ int PQflush(PGconn *conn);
immediately after a successful call of <function>PQsendQuery</function>
(or a sibling function). This mode selection is effective only for the
currently executing query. Then call <function>PQgetResult</function>
- repeatedly, until it returns null, as documented in <xref
+ repeatedly, until the last query result is returned, as documented in <xref
linkend="libpq-async">. If the query returns any rows, they are returned
as individual <structname>PGresult</structname> objects, which look like
normal query results except for having status code
@@ -4420,8 +4580,8 @@ int PQflush(PGconn *conn);
the query returns zero rows, a zero-row object with status
<literal>PGRES_TUPLES_OK</literal> is returned; this is the signal that no
more rows will arrive. (But note that it is still necessary to continue
- calling <function>PQgetResult</function> until it returns null.) All of
- these <structname>PGresult</structname> objects will contain the same row
+ calling <function>PQgetResult</function> until the last query result is returned.)
+ All of these <structname>PGresult</structname> objects will contain the same row
description data (column names, types, etc) that an ordinary
<structname>PGresult</structname> object for the query would have.
Each object should be freed with <function>PQclear</function> as usual.
["libpq.pipeline.src.patch" (libpq.pipeline.src.patch)]
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 93da50d..050bf05 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -165,3 +165,9 @@ lo_lseek64 162
lo_tell64 163
lo_truncate64 164
PQconninfo 165
+PQgetResultQuery 166
+PQgetSentQuery 167
+PQgetNextQuery 168
+PQgetQueryCommand 169
+PQsetPipelining 170
+PQisPipelining 171
\ No newline at end of file
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 3af222b..fc72605 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2893,8 +2893,6 @@ freePGconn(PGconn *conn)
free(conn->gsslib);
#endif
/* Note that conn->Pfdebug is not ours to close or free */
- if (conn->last_query)
- free(conn->last_query);
if (conn->inBuffer)
free(conn->inBuffer);
if (conn->outBuffer)
@@ -2956,6 +2954,29 @@ closePGconn(PGconn *conn)
* absent */
conn->asyncStatus = PGASYNC_IDLE;
pqClearAsyncResult(conn); /* deallocate result */
+
+ /*
+ * Link active queries into the free list so we can free them
+ */
+ if (conn->queryTail)
+ {
+ conn->queryTail->next = conn->queryFree;
+ conn->queryFree = conn->queryHead;
+ }
+ conn->queryHead = conn->queryTail = NULL;
+
+ /*
+ * Free all query objects
+ */
+ while (conn->queryFree)
+ {
+ PGquery * prev = conn->queryFree;
+ conn->queryFree = prev->next;
+ if (prev->querycmd)
+ free(prev->querycmd);
+ free(prev);
+ }
+
resetPQExpBuffer(&conn->errorMessage);
pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist);
conn->addrlist = NULL;
@@ -3135,7 +3156,7 @@ PQresetPoll(PGconn *conn)
}
/*
- * PQcancelGet: get a PGcancel structure corresponding to a connection.
+ * PQgetCancel: get a PGcancel structure corresponding to a connection.
*
* A copy is needed to be able to cancel a running query from a different
* thread. If the same structure is used all structure members would have
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 4075e51..379c38c 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1020,7 +1020,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
* row; the original conn->result is left unchanged so that it can be used
* again as the template for future rows.
*/
- if (conn->singleRowMode)
+ if (conn->queryHead && conn->queryHead->singleRowMode)
{
/* Copy everything that should be in the result at this point */
res = PQcopyResult(res,
@@ -1080,7 +1080,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
* Success. In single-row mode, make the result available to the client
* immediately.
*/
- if (conn->singleRowMode)
+ if (conn->queryHead && conn->queryHead->singleRowMode)
{
/* Change result status to special single-row value */
res->resultStatus = PGRES_SINGLE_TUPLE;
@@ -1132,13 +1132,11 @@ PQsendQuery(PGconn *conn, const char *query)
}
/* remember we are using simple query protocol */
- conn->queryclass = PGQUERY_SIMPLE;
+ conn->queryTail->queryclass = PGQUERY_SIMPLE;
/* and remember the query text too, if possible */
- /* if insufficient memory, last_query just winds up NULL */
- if (conn->last_query)
- free(conn->last_query);
- conn->last_query = strdup(query);
+ /* if insufficient memory, querycmd just winds up NULL */
+ conn->queryTail->querycmd = strdup(query);
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
@@ -1151,7 +1149,9 @@ PQsendQuery(PGconn *conn, const char *query)
}
/* OK, it's launched! */
- conn->asyncStatus = PGASYNC_BUSY;
+ if (conn->asyncStatus == PGASYNC_IDLE)
+ conn->asyncStatus = PGASYNC_BUSY;
+
return 1;
}
@@ -1272,13 +1272,11 @@ PQsendPrepare(PGconn *conn,
goto sendFailed;
/* remember we are doing just a Parse */
- conn->queryclass = PGQUERY_PREPARE;
+ conn->queryTail->queryclass = PGQUERY_PREPARE;
/* and remember the query text too, if possible */
- /* if insufficient memory, last_query just winds up NULL */
- if (conn->last_query)
- free(conn->last_query);
- conn->last_query = strdup(query);
+ /* if insufficient memory, querycmd just winds up NULL */
+ conn->queryTail->querycmd = strdup(query);
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
@@ -1288,7 +1286,9 @@ PQsendPrepare(PGconn *conn,
goto sendFailed;
/* OK, it's launched! */
- conn->asyncStatus = PGASYNC_BUSY;
+ if (conn->asyncStatus == PGASYNC_IDLE)
+ conn->asyncStatus = PGASYNC_BUSY;
+
return 1;
sendFailed:
@@ -1344,6 +1344,8 @@ PQsendQueryPrepared(PGconn *conn,
static bool
PQsendQueryStart(PGconn *conn)
{
+ PGquery * query;
+
if (!conn)
return false;
@@ -1357,20 +1359,59 @@ PQsendQueryStart(PGconn *conn)
libpq_gettext("no connection to the server\n"));
return false;
}
- /* Can't send while already busy, either. */
- if (conn->asyncStatus != PGASYNC_IDLE)
+
+ /* Check if we are in a valid state to send an async query */
+ switch (conn->asyncStatus)
{
- printfPQExpBuffer(&conn->errorMessage,
+ case PGASYNC_IDLE:
+ break;
+ /* Can only send a query during busy or ready state if
+ * pipelining is enabled */
+ case PGASYNC_BUSY:
+ case PGASYNC_READY:
+ if (conn->pipelining)
+ break;
+ /* Fall through to error */
+ case PGASYNC_COPY_IN:
+ case PGASYNC_COPY_OUT:
+ case PGASYNC_COPY_BOTH:
+ printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("another command is already in progress\n"));
- return false;
+ return false;
}
- /* initialize async result-accumulation state */
- conn->result = NULL;
- conn->next_result = NULL;
+ query = 0;
- /* reset single-row processing mode */
- conn->singleRowMode = false;
+ /* Check if we have a free PGquery to use if not we create one
+ * We have to make sure we don't use the same PGquery twice
+ * in a row, so we will try both the first and second free
+ * entries, if not create a new one. */
+ if (conn->queryFree && conn->queryFree != conn->queryLast)
+ {
+ query = conn->queryFree;
+ conn->queryFree = query->next;
+ query->next = NULL;
+ }
+ else if(conn->queryFree && conn->queryFree->next)
+ {
+ query = conn->queryFree->next;
+ conn->queryFree->next = query->next;
+ query->next = NULL;
+ conn->queryLast = NULL; /* First is fine to use again now */
+ } else
+ {
+ query = (PGquery*) malloc(sizeof(PGquery));
+ query->querycmd = 0;
+ query->singleRowMode = false;
+ query->next = 0;
+ }
+
+ if( conn->queryTail )
+ conn->queryTail->next = query;
+ else
+ conn->queryHead = query;
+
+ conn->queryTail = query;
/* ready to send command message */
return true;
@@ -1522,16 +1563,12 @@ PQsendQueryGuts(PGconn *conn,
goto sendFailed;
/* remember we are using extended query protocol */
- conn->queryclass = PGQUERY_EXTENDED;
+ conn->queryTail->queryclass = PGQUERY_EXTENDED;
/* and remember the query text too, if possible */
- /* if insufficient memory, last_query just winds up NULL */
- if (conn->last_query)
- free(conn->last_query);
+ /* if insufficient memory, querycmd just winds up NULL */
if (command)
- conn->last_query = strdup(command);
- else
- conn->last_query = NULL;
+ conn->queryTail->querycmd = strdup(command);
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
@@ -1541,7 +1578,9 @@ PQsendQueryGuts(PGconn *conn,
goto sendFailed;
/* OK, it's launched! */
- conn->asyncStatus = PGASYNC_BUSY;
+ if (conn->asyncStatus == PGASYNC_IDLE)
+ conn->asyncStatus = PGASYNC_BUSY;
+
return 1;
sendFailed:
@@ -1576,7 +1615,7 @@ pqHandleSendFailure(PGconn *conn)
}
/*
- * Select row-by-row processing mode
+ * Select row-by-row processing mode for the last sent query
*/
int
PQsetSingleRowMode(PGconn *conn)
@@ -1585,18 +1624,16 @@ PQsetSingleRowMode(PGconn *conn)
* Only allow setting the flag when we have launched a query and not yet
* received any results.
*/
- if (!conn)
+ if (!conn || !conn->queryTail)
return 0;
- if (conn->asyncStatus != PGASYNC_BUSY)
+ if (conn->asyncStatus != PGASYNC_BUSY && conn->queryTail == conn->queryHead)
return 0;
- if (conn->queryclass != PGQUERY_SIMPLE &&
- conn->queryclass != PGQUERY_EXTENDED)
- return 0;
- if (conn->result)
+ if (conn->queryTail->queryclass != PGQUERY_SIMPLE &&
+ conn->queryTail->queryclass != PGQUERY_EXTENDED)
return 0;
/* OK, set flag */
- conn->singleRowMode = true;
+ conn->queryTail->singleRowMode = true;
return 1;
}
@@ -1670,6 +1707,50 @@ PQisBusy(PGconn *conn)
/*
+ * PQgetQueryCommand
+ */
+const char *
+PQgetQueryCommand(PGquery *query)
+{
+ if (!query)
+ return NULL;
+ return query->querycmd;
+}
+
+/*
+ * PQgetFirstQuery
+ */
+PGquery *
+PQgetResultQuery(PGconn *conn)
+{
+ if (!conn)
+ return 0;
+ return conn->queryHead;
+}
+
+/*
+ * PQgetLastQuery
+ */
+PGquery *
+PQgetSentQuery(PGconn *conn)
+{
+ if (!conn)
+ return 0;
+ return conn->queryTail;
+}
+
+/*
+ * PQgetNextQuery
+ */
+PGquery *
+PQgetNextQuery(PGquery *query)
+{
+ if (!query)
+ return 0;
+ return query->next;
+}
+
+/*
* PQgetResult
* Get the next PGresult produced by a query. Returns NULL if no
* query work remains or an error has occurred (e.g. out of
@@ -2132,14 +2213,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
goto sendFailed;
/* remember we are doing a Describe */
- conn->queryclass = PGQUERY_DESCRIBE;
-
- /* reset last-query string (not relevant now) */
- if (conn->last_query)
- {
- free(conn->last_query);
- conn->last_query = NULL;
- }
+ conn->queryTail->queryclass = PGQUERY_DESCRIBE;
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
@@ -2301,7 +2375,7 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
* If we sent the COPY command in extended-query mode, we must issue a
* Sync as well.
*/
- if (conn->queryclass != PGQUERY_SIMPLE)
+ if (conn->queryHead->queryclass != PGQUERY_SIMPLE)
{
if (pqPutMsgStart('S', false, conn) < 0 ||
pqPutMsgEnd(conn) < 0)
@@ -3112,6 +3186,31 @@ PQisnonblocking(const PGconn *conn)
return pqIsnonblocking(conn);
}
+int
+PQsetPipelining(PGconn *conn, int arg)
+{
+ bool barg;
+
+ if (!conn)
+ return -1;
+
+ barg = (arg ? TRUE : FALSE);
+
+ /* Return error if they are trying to turn pipelining off and
+ * multiple queries are pending */
+ if (!barg && conn->queryHead && conn->queryHead != conn->queryTail)
+ return -1;
+
+ conn->pipelining = barg;
+ return 0;
+}
+
+int
+PQisPipelining(PGconn *conn)
+{
+ return conn->pipelining ? 1 : 0;
+}
+
/* libpq is thread-safe? */
int
PQisthreadsafe(void)
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index c514ca5..d0c5110 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -55,7 +55,29 @@ static void reportErrorPosition(PQExpBuffer msg, const char *query,
int loc, int encoding);
static int build_startup_packet(const PGconn *conn, char *packet,
const PQEnvironmentOption *options);
+static void pqQueryAdvance(PGconn *conn);
+void
+pqQueryAdvance(PGconn *conn)
+{
+ PGquery * query;
+
+ query = conn->queryHead;
+ if (query == NULL)
+ return;
+
+ conn->queryLast = query;
+ /* Advance queryHead */
+ conn->queryHead = query->next;
+ /* Push last query onto free stack */
+ query->next = conn->queryFree;
+ conn->queryFree = query;
+ free(query->querycmd);
+ query->querycmd = NULL;
+
+ if (conn->queryHead == NULL)
+ conn->queryTail = NULL;
+}
/*
* parseInput: if appropriate, parse input data from backend
@@ -218,7 +240,15 @@ pqParseInput3(PGconn *conn)
case 'Z': /* backend is ready for new query */
if (getReadyForQuery(conn))
return;
- conn->asyncStatus = PGASYNC_IDLE;
+
+ pqQueryAdvance(conn);
+ /* initialize async result-accumulation state */
+ conn->result = NULL;
+ conn->next_result = NULL;
+ if (conn->queryHead != NULL)
+ conn->asyncStatus = PGASYNC_BUSY;
+ else
+ conn->asyncStatus = PGASYNC_IDLE;
break;
case 'I': /* empty query */
if (conn->result == NULL)
@@ -232,7 +262,7 @@ pqParseInput3(PGconn *conn)
break;
case '1': /* Parse Complete */
/* If we're doing PQprepare, we're done; else ignore */
- if (conn->queryclass == PGQUERY_PREPARE)
+ if (conn->queryHead->queryclass == PGQUERY_PREPARE)
{
if (conn->result == NULL)
{
@@ -266,7 +296,7 @@ pqParseInput3(PGconn *conn)
break;
case 'T': /* Row Description */
if (conn->result == NULL ||
- conn->queryclass == PGQUERY_DESCRIBE)
+ conn->queryHead->queryclass == PGQUERY_DESCRIBE)
{
/* First 'T' in a query sequence */
if (getRowDescriptions(conn, msgLength))
@@ -299,7 +329,7 @@ pqParseInput3(PGconn *conn)
* instead of TUPLES_OK. Otherwise we can just ignore
* this message.
*/
- if (conn->queryclass == PGQUERY_DESCRIBE)
+ if (conn->queryHead && conn->queryHead->queryclass == PGQUERY_DESCRIBE)
{
if (conn->result == NULL)
{
@@ -422,6 +452,8 @@ pqParseInput3(PGconn *conn)
static void
handleSyncLoss(PGconn *conn, char id, int msgLength)
{
+ PGquery * query;
+
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext(
"lost synchronization with server: got message type \"%c\", length %d\n"),
@@ -430,6 +462,15 @@ handleSyncLoss(PGconn *conn, char id, int msgLength)
pqSaveErrorResult(conn);
conn->asyncStatus = PGASYNC_READY; /* drop out of GetResult wait loop */
+ /* All queries are canceled, move them to the free list and free the query commands */
+ while ((query = conn->queryHead) != NULL)
+ {
+ free(query->querycmd);
+ query->querycmd = NULL;
+ conn->queryHead = query->next;
+ query->next = conn->queryFree;
+ }
+
pqDropConnection(conn);
conn->status = CONNECTION_BAD; /* No more connection to backend */
}
@@ -455,7 +496,7 @@ getRowDescriptions(PGconn *conn, int msgLength)
* PGresult created by getParamDescriptions, and we should fill data into
* that. Otherwise, create a new, empty PGresult.
*/
- if (conn->queryclass == PGQUERY_DESCRIBE)
+ if (conn->queryHead && conn->queryHead->queryclass == PGQUERY_DESCRIBE)
{
if (conn->result)
result = conn->result;
@@ -562,7 +603,7 @@ getRowDescriptions(PGconn *conn, int msgLength)
* If we're doing a Describe, we're done, and ready to pass the result
* back to the client.
*/
- if (conn->queryclass == PGQUERY_DESCRIBE)
+ if (conn->queryHead && conn->queryHead->queryclass == PGQUERY_DESCRIBE)
{
conn->asyncStatus = PGASYNC_READY;
return 0;
@@ -865,10 +906,10 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
val = PQresultErrorField(res, PG_DIAG_STATEMENT_POSITION);
if (val)
{
- if (conn->verbosity != PQERRORS_TERSE && conn->last_query != NULL)
+ if (conn->verbosity != PQERRORS_TERSE && conn->queryHead && conn->queryHead->querycmd != NULL)
{
/* emit position as a syntax cursor display */
- querytext = conn->last_query;
+ querytext = conn->queryHead->querycmd;
querypos = atoi(val);
}
else
@@ -1696,7 +1737,7 @@ pqEndcopy3(PGconn *conn)
* If we sent the COPY command in extended-query mode, we must issue a
* Sync as well.
*/
- if (conn->queryclass != PGQUERY_SIMPLE)
+ if (conn->queryHead && conn->queryHead->queryclass != PGQUERY_SIMPLE)
{
if (pqPutMsgStart('S', false, conn) < 0 ||
pqPutMsgEnd(conn) < 0)
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index b81dc16..ca54116 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -141,6 +141,13 @@ typedef struct pg_result PGresult;
*/
typedef struct pg_cancel PGcancel;
+/* PGquery encapsulates the progress of a single query command issued
+ * to the async api functions
+ * The contents of this struct are not supposed to be known to applications.
+ */
+typedef struct pg_query PGquery;
+
+
/* PGnotify represents the occurrence of a NOTIFY message.
* Ideally this would be an opaque typedef, but it's so simple that it's
* unlikely to change.
@@ -404,6 +411,14 @@ extern PGresult *PQgetResult(PGconn *conn);
extern int PQisBusy(PGconn *conn);
extern int PQconsumeInput(PGconn *conn);
+extern int PQsetPipelining(PGconn *conn, int arg);
+extern int PQisPipelining(PGconn *conn);
+
+extern const char * PQgetQueryCommand(PGquery *query);
+extern PGquery *PQgetResultQuery(PGconn *conn);
+extern PGquery *PQgetSentQuery(PGconn *conn);
+extern PGquery *PQgetNextQuery(PGquery *query);
+
/* LISTEN/NOTIFY support */
extern PGnotify *PQnotifies(PGconn *conn);
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 4ef46ff..7d84d89 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -291,6 +291,16 @@ typedef struct pgDataValue
const char *value; /* data value, without zero-termination */
} PGdataValue;
+typedef struct pg_query
+{
+ PGQueryClass queryclass;
+ char *querycmd; /* last SQL command, or NULL if unknown */
+ bool singleRowMode; /* return query result row-by-row? */
+ struct pg_query * next;
+ void *userptr; /* convenience for the user */
+} PGquery;
+
+
/*
* PGconn stores all the state data associated with a single connection
* to a backend.
@@ -350,13 +360,20 @@ struct pg_conn
ConnStatusType status;
PGAsyncStatusType asyncStatus;
PGTransactionStatusType xactStatus; /* never changes to ACTIVE */
- PGQueryClass queryclass;
- char *last_query; /* last SQL command, or NULL if unknown */
+
+ /* queryHead and queryTail form a FIFO representing queries sent
+ * to the backend. queryHead is the first query sent, and is the
+ * query we are receiving results from, or have received results from */
+ bool pipelining;
+ PGquery *queryHead;
+ PGquery *queryTail;
+ PGquery *queryFree; /* Reuse PGQuery allocations */
+ PGquery *queryLast; /* Ensure we never use a query twice in a row */
+
char last_sqlstate[6]; /* last reported SQLSTATE */
bool options_valid; /* true if OK to attempt connection */
bool nonblocking; /* whether this connection is using nonblock
* sending semantics */
- bool singleRowMode; /* return current query result row-by-row? */
char copy_is_binary; /* 1 = copy binary, 0 = copy text */
int copy_already_done; /* # bytes already returned in COPY
* OUT */
diff --git a/src/test/examples/Makefile b/src/test/examples/Makefile
index aee5c04..3996760 100644
--- a/src/test/examples/Makefile
+++ b/src/test/examples/Makefile
@@ -14,7 +14,7 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
override LDLIBS := $(libpq_pgport) $(LDLIBS)
-PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64
+PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64 testlibpqpipeline testlibpqpipeline2
all: $(PROGS)
["testlibpqpipeline2.c" (testlibpqpipeline2.c)]
/*
* src/test/examples/testlibpqpipeline2.c
*
*
* testlibpqpipeline.c
* this test program tests query pipelining. It shows how to issue multiple
* pipelined queries, and identify from which query a result originated. It
* also demonstrates how failure of one query does not impact subsequent queries
* when they are not part of the same transaction.
*
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include "libpq-fe.h"
static void checkResult(PGconn *conn, PGresult *result, PGquery *query, int expectedResultStatus)
{
if (PQresultStatus(result) != expectedResultStatus)
{
printf( "Got unexpected result status '%s', expected '%s'\nQuery:%s\n",
PQresStatus(PQresultStatus(result)), PQresStatus(expectedResultStatus),
PQgetQueryCommand(query));
PQclear(result);
PQclear(PQexec(conn,"DROP TABLE test"));
PQfinish(conn);
exit(1);
}
PQclear(result);
}
int
main(int argc, char **argv)
{
PGconn * conn;
PGquery * query1;
PGquery * query2;
PGquery * query3;
PGquery * curQuery;
PGresult * result;
conn = NULL;
query1 = query2 = query3 = curQuery = NULL;
result = NULL;
/* make a connection to the database */
conn = PQsetdb(NULL, NULL, NULL, NULL, NULL);
/* check to see that the backend connection was successfully made */
if (PQstatus(conn) != CONNECTION_OK)
{
fprintf(stderr, "Connection to database failed: %s",
PQerrorMessage(conn));
exit(1);
}
PQsetPipelining(conn,1);
checkResult(conn,PQexec(conn,"DROP TABLE IF EXISTS test"),NULL,PGRES_COMMAND_OK);
checkResult(conn,PQexec(conn,"CREATE TABLE test ( id SERIAL PRIMARY KEY )"),NULL,PGRES_COMMAND_OK);
PQsendQuery(conn, "INSERT INTO test(id) VALUES (DEFAULT),(DEFAULT) RETURNING id");
query1 = PQgetSentQuery(conn);
/* Duplicate primary key error */
PQsendQuery(conn, "UPDATE test SET id=2 WHERE id=1");
query2 = PQgetSentQuery(conn);
PQsendQuery(conn, "SELECT * FROM test");
query3 = PQgetSentQuery(conn);
while( (result = PQgetResult(conn)) != NULL )
{
curQuery = PQgetResultQuery(conn);
if (curQuery == query1)
checkResult(conn,result,curQuery,PGRES_TUPLES_OK);
if (curQuery == query2)
checkResult(conn,result,curQuery,PGRES_FATAL_ERROR);
if (curQuery == query3)
checkResult(conn,result,curQuery,PGRES_TUPLES_OK);
}
PQclear(PQexec(conn,"DROP TABLE test"));
PQfinish(conn);
return 0;
}
["testlibpqpipeline.c" (testlibpqpipeline.c)]
/*
* src/test/examples/testlibpqpipeline.c
*
*
* testlibpqpipeline.c
* this test program test query pipelining and it's performance impact
*
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include "libpq-fe.h"
// If defined we won't issue more sql commands if the socket's
// write buffer is full
//#define MIN_LOCAL_Q
//#define PRINT_QUERY_PROGRESS
static int testPipelined( PGconn * conn, int totalQueries, int totalQueued, const char * sql );
static int testPipelinedSeries( PGconn * conn, int totalQueries, int totalQueued, int baseline_usecs );
int
testPipelined( PGconn * conn, int totalQueries, int totalQueued, const char * sql )
{
int nQueriesQueued;
int nQueriesTotal;
PGresult * result;
PGquery * firstQuery;
PGquery * curQuery;
nQueriesQueued = nQueriesTotal = 0;
result = NULL;
firstQuery = curQuery = NULL;
while( nQueriesQueued > 0 || nQueriesTotal < totalQueries ) {
if( PQconsumeInput(conn) == 0 ) {
printf( "PQconsumeInput ERROR: %s\n", PQerrorMessage(conn) );
return 1;
}
do {
curQuery = PQgetResultQuery(conn);
/* firstQuery is finished */
if( firstQuery != curQuery )
{
//printf( "%p done, curQuery=%p\n", firstQuery, curQuery );
#ifdef PRINT_QUERY_PROGRESS
printf("-");
#endif
firstQuery = curQuery;
nQueriesQueued--;
}
/* Break if no queries are ready */
if( !firstQuery || PQisBusy(conn) )
break;
if( (result = PQgetResult(conn)) != 0 )
PQclear(result);
}
while(1);
if( nQueriesTotal < totalQueries && nQueriesQueued < totalQueued ) {
#ifdef MIN_LOCAL_Q
int flushResult = PQflush(conn);
if( flushResult == -1 ) {
printf( "PQflush ERROR: %s\n", PQerrorMessage(conn) );
return 1;
} else if ( flushResult == 1 )
continue;
#endif
if( !PQsendQuery(conn,sql) ){
printf( "PQsendQuery failed with error: %s\n", PQerrorMessage(conn) );
return 1;
}
if( firstQuery == NULL )
firstQuery = PQgetSentQuery(conn);
nQueriesTotal++;
nQueriesQueued++;
#ifdef PRINT_QUERY_PROGRESS
printf( "+" );
#endif
}
}
#ifdef PRINT_QUERY_PROGRESS
printf( "\n" );
#endif
return 0;
}
int testPipelinedSeries( PGconn * conn, int totalQueries, int totalQueued, int baseline_usecs )
{
int result;
struct timeval tv1, tv2;
int secs, usecs;
gettimeofday(&tv1,NULL);
#define TEST_P(q) \
if( (result = testPipelined(conn,totalQueries,totalQueued,q)) != 0 ) \
return result;
TEST_P("INSERT INTO test(id) VALUES (DEFAULT)");
TEST_P("SELECT * FROM test LIMIT 1");
TEST_P("SELECT * FROM test");
TEST_P("DELETE FROM test");
gettimeofday(&tv2,NULL);
secs = tv2.tv_sec - tv1.tv_sec;
usecs = secs * 1000000 + tv2.tv_usec - tv1.tv_usec;
printf("testPipelinedSeries(%i,%i) took %i.%06i",totalQueries,totalQueued,secs,usecs);
if (baseline_usecs == 0)
printf("\n");
else
printf(", speedup %.2f\n", (double)baseline_usecs / usecs );
return usecs;
}
int
main(int argc, char **argv)
{
PGconn * conn;
int baseline;
conn = NULL;
/* make a connection to the database */
conn = PQsetdb(NULL, NULL, NULL, NULL, NULL);
/* check to see that the backend connection was successfully made */
if (PQstatus(conn) != CONNECTION_OK)
{
fprintf(stderr, "Connection to database failed: %s",
PQerrorMessage(conn));
exit(1);
}
PQsetPipelining(conn,1);
PQsetnonblocking(conn,1);
PQclear(PQexec(conn,"CREATE TABLE test ( id SERIAL PRIMARY KEY )"));
baseline = testPipelinedSeries(conn,10,1,0);
testPipelinedSeries(conn,10,3,baseline);
testPipelinedSeries(conn,10,10,baseline);
baseline = testPipelinedSeries(conn,100,1,0);
testPipelinedSeries(conn,100,3,baseline);
testPipelinedSeries(conn,100,10,baseline);
testPipelinedSeries(conn,100,25,baseline);
testPipelinedSeries(conn,100,50,baseline);
testPipelinedSeries(conn,100,100,baseline);
PQclear(PQexec(conn,"DROP TABLE test"));
PQfinish(conn);
return 0;
}
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic