[prev in list] [next in list] [prev in thread] [next in thread] 

List:       pgsql-hackers
Subject:    Re: [HACKERS] libpq pipelining
From:       Matt Newell <newellm () blur ! com>
Date:       2014-12-10 21:29:59
Message-ID: 1499322.vmUVO5HYcT () obsidian
[Download RAW message or body]

On Friday, December 05, 2014 12:22:38 PM Heikki Linnakangas wrote:
> Oh, that's what the PQgetLastQuery/PQgetNextQuery functions work! I
> didn't understand that before. I'd suggest renaming them to something
> like PQgetSentQuery() and PQgetResultQuery(). The first/last/next names
> made me think that they're used to iterate a list of queries, but in
> fact they're supposed to be used at very different stages.
> 
> - Heikki


Okay, I have renamed them with your suggestions, and added 
PQsetPipelining/PQgetPipelining, defaulting to pipelining off.  There should be 
no behavior change unless pipelining is enabled.

Documentation should be mostly complete except the possible addition of an 
example and maybe a general pipelining overview paragraph.

I have implemented async query support (that takes advantage of pipelining) in 
Qt, along with a couple test cases.  This led to me discovering a bug with my 
last patch where a PGquery object could be reused twice in a row.  I have fixed 
that.  I contemplated not reusing the PGquery objects at all, but that 
wouldn't solve the problem because it's very possible that malloc will return 
a recent free of the same size anyway.  Making the guarantee that a PGquery 
won't be reused twice in a row should be sufficient, and the only alternative is 
to add a unique id, but that will add further complexity that I don't think is 
warranted.

Feedback is very welcome and appreciated.

Thanks,
Matt Newell



["libpq.pipeline.docs.patch" (libpq.pipeline.docs.patch)]

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index d829a4b..4e0431e 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -3947,9 +3947,14 @@ int PQsendQuery(PGconn *conn, const char *command);
 
        After successfully calling <function>PQsendQuery</function>, call
        <function>PQgetResult</function> one or more times to obtain the
-       results.  <function>PQsendQuery</function> cannot be called again
-       (on the same connection) until <function>PQgetResult</function>
-       has returned a null pointer, indicating that the command is done.
+       results.  If pipelining is enabled <function>PQsendQuery</function>
+       may be called multiple times before reading the results. See 
+       <function>PQsetPipelining</function> and <function>PQisPipelining</function>.
+       Call <function>PQgetSentQuery</function> to get a <structname>PGquery</structname>
+       which can be used to identify which results obtained from
+       <function>PQgetResult</function> belong to each pipelined query.
+       If only one query is dispatched at a time, you can call <function>PQgetResult</function>
+       until a NULL value is returned to indicate the end of the query.
       </para>
      </listitem>
     </varlistentry>
@@ -4133,8 +4138,8 @@ PGresult *PQgetResult(PGconn *conn);
 
       <para>
        <function>PQgetResult</function> must be called repeatedly until
-       it returns a null pointer, indicating that the command is done.
-       (If called when no command is active,
+       it returns a null pointer, indicating that all dispatched commands
+       are done. (If called when no command is active,
        <function>PQgetResult</function> will just return a null pointer
        at once.) Each non-null result from
        <function>PQgetResult</function> should be processed using the
@@ -4144,14 +4149,17 @@ PGresult *PQgetResult(PGconn *conn);
        <function>PQgetResult</function> will block only if a command is
        active and the necessary response data has not yet been read by
        <function>PQconsumeInput</function>.
+       If query pipelining is being used, <function>PQgetResultQuery</function>
+       can be called after PQgetResult to match the result to the query.
       </para>
 
       <note>
        <para>
         Even when <function>PQresultStatus</function> indicates a fatal
-        error, <function>PQgetResult</function> should be called until it
-        returns a null pointer, to allow <application>libpq</> to
-        process the error information completely.
+        error, <function>PQgetResult</function> should be called until the
+        query has no more results (null pointer return if not using query
+        pipelining, otherwise see <function>PQgetResultQuery</function>),
+        to allow <application>libpq</> to process the error information completely.
        </para>
       </note>
      </listitem>
@@ -4385,6 +4393,158 @@ int PQflush(PGconn *conn);
    read-ready and then read the response as described above.
   </para>
 
+ <variablelist>
+  <varlistentry id="libpq-pqsetpipelining">
+   <term>
+    <function>PQsetPipelining</function>
+    <indexterm>
+     <primary>PQsetPipelining</primary>
+    </indexterm>
+   </term>
+
+   <listitem>
+    <para>
+     Enables or disables query pipelining.
+<synopsis>
+int PQsetPipelining(PGconn *conn, int arg);
+</synopsis>
+    </para>
+
+    <para>
+     Enables pipelining for the connectino if arg is 1, or disables it
+     if arg is 0.  When pipelining is enabled multiple async queries can
+     be sent before processing the results of the first.  If pipelining
+     is disabled an error will be raised an async query is attempted
+     while another is active.
+    </para>
+   </listitem>
+  </varlistentry>
+
+  <varlistentry id="libpq-pqispipelining">
+   <term>
+    <function>PQisPipelining</function>
+    <indexterm>
+     <primary>PQisPipelining</primary>
+    </indexterm>
+   </term>
+
+   <listitem>
+    <para>
+     Returns the pipelining status of the connection
+<synopsis>
+int PQisPipelining(PGconn *conn);
+</synopsis>
+    </para>
+
+    <para>
+     Returns 1 if pipelining is enabled, or 0 if pipeling is disabled.
+     Query pipelining is disabled unless enabled with a call to
+     <function>PQsetPipelining</function>.
+    </para>
+   </listitem>
+  </varlistentry>
+
+  <varlistentry id="libpq-pqgetquerycommand">
+   <term>
+    <function>PQgetQueryCommand</function>
+    <indexterm>
+     <primary>PQgetQueryCommand</primary>
+    </indexterm>
+   </term>
+
+   <listitem>
+    <para>
+     Returns the query string associated with the <structure>PGquery</structure>.
+<synopsis>
+const char * PQgetQueryCommand(PGquery *query);
+</synopsis>
+    </para>
+
+    <para>
+     When using query pipelining this function can be used to retrieve the command
+     that created the query object.
+    </para>
+   </listitem>
+  </varlistentry>
+
+  <varlistentry id="libpq-pqgetresultquery">
+   <term>
+    <function>PQgetResultQuery</function>
+    <indexterm>
+     <primary>PQgetResultQuery</primary>
+    </indexterm>
+   </term>
+
+   <listitem>
+    <para>
+     Returns the first async query to recieve results, or NULL if no
+     async queries are active.
+<synopsis>
+PGquery * PQgetResultQuery(PGconn *conn);
+</synopsis>
+    </para>
+
+    <para>
+     When pipelining queries this function indicates which query the
+     result of <function>PQgetResult</function> results from.
+     Call this function immediately after calling 
+     <function>PQgetResult</function>, or immediately before if a result
+     is ready to read, indicated by <function>PQisBusy</function>
+     being false.  The <structure>PGquery</structure> remains valid
+     until the next libpq call that consumes input.
+    </para>
+   </listitem>
+  </varlistentry>
+
+  <varlistentry id="libpq-pqgetnextquery">
+   <term>
+    <function>PQgetNextQuery</function>
+    <indexterm>
+     <primary>PQgetNextQuery</primary>
+    </indexterm>
+   </term>
+
+   <listitem>
+    <para>
+     Returns the next <structure>PGquery</structure> in the list of
+     pipelined queries.
+<synopsis>
+PGquery * PQgetNextQuery(PGquery *query);
+</synopsis>
+    </para>
+
+    <para>
+     This function can be used to iterate each pending async query,
+     starting with <function>PQgetResultQuery</function>
+     and ending with <function>PQgetSentQuery</function>.
+    </para>
+   </varlistentry>
+
+   <varlistentry>
+    <term>
+     <function>PQgetSentQuery</function>
+     <indexterm>
+      <primary>PQgetSentQuery</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+      Returns the last <structure>PGquery</structure> in the list of
+      dispatched async queries waiting for results.
+<synopsis>
+PGquery * PQgetSentQuery(PGquery *query);
+</synopsis>
+     </para>
+
+     <para>
+      Call this function after dispatching an async query to get
+      a <structure>PGquery</structure> that can be used to identify
+      the originating query for each result obtained by
+      <function>PGgetResult</function>.
+     </para>
+   </varlistentry>
+  </variablelist>
  </sect1>
 
  <sect1 id="libpq-single-row-mode">
@@ -4411,7 +4571,7 @@ int PQflush(PGconn *conn);
    immediately after a successful call of <function>PQsendQuery</function>
    (or a sibling function).  This mode selection is effective only for the
    currently executing query.  Then call <function>PQgetResult</function>
-   repeatedly, until it returns null, as documented in <xref
+   repeatedly, until the last query result is returned, as documented in <xref
    linkend="libpq-async">.  If the query returns any rows, they are returned
    as individual <structname>PGresult</structname> objects, which look like
    normal query results except for having status code
@@ -4420,8 +4580,8 @@ int PQflush(PGconn *conn);
    the query returns zero rows, a zero-row object with status
    <literal>PGRES_TUPLES_OK</literal> is returned; this is the signal that no
    more rows will arrive.  (But note that it is still necessary to continue
-   calling <function>PQgetResult</function> until it returns null.)  All of
-   these <structname>PGresult</structname> objects will contain the same row
+   calling <function>PQgetResult</function> until the last query result is returned.)
+   All of these <structname>PGresult</structname> objects will contain the same row
    description data (column names, types, etc) that an ordinary
    <structname>PGresult</structname> object for the query would have.
    Each object should be freed with <function>PQclear</function> as usual.

["libpq.pipeline.src.patch" (libpq.pipeline.src.patch)]

diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 93da50d..050bf05 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -165,3 +165,9 @@ lo_lseek64                162
 lo_tell64                 163
 lo_truncate64             164
 PQconninfo                165
+PQgetResultQuery          166
+PQgetSentQuery            167
+PQgetNextQuery            168
+PQgetQueryCommand         169
+PQsetPipelining           170
+PQisPipelining            171
\ No newline at end of file
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 3af222b..fc72605 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2893,8 +2893,6 @@ freePGconn(PGconn *conn)
 		free(conn->gsslib);
 #endif
 	/* Note that conn->Pfdebug is not ours to close or free */
-	if (conn->last_query)
-		free(conn->last_query);
 	if (conn->inBuffer)
 		free(conn->inBuffer);
 	if (conn->outBuffer)
@@ -2956,6 +2954,29 @@ closePGconn(PGconn *conn)
 										 * absent */
 	conn->asyncStatus = PGASYNC_IDLE;
 	pqClearAsyncResult(conn);	/* deallocate result */
+
+	/*
+	 * Link active queries into the free list so we can free them
+	 */
+	if (conn->queryTail)
+	{
+		conn->queryTail->next = conn->queryFree;
+		conn->queryFree = conn->queryHead;
+	}
+	conn->queryHead = conn->queryTail = NULL;
+
+	/*
+	 * Free all query objects
+	 */
+	while (conn->queryFree)
+	{
+		PGquery * prev = conn->queryFree;
+		conn->queryFree = prev->next;
+		if (prev->querycmd)
+			free(prev->querycmd);
+		free(prev);
+	}
+
 	resetPQExpBuffer(&conn->errorMessage);
 	pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist);
 	conn->addrlist = NULL;
@@ -3135,7 +3156,7 @@ PQresetPoll(PGconn *conn)
 }
 
 /*
- * PQcancelGet: get a PGcancel structure corresponding to a connection.
+ * PQgetCancel: get a PGcancel structure corresponding to a connection.
  *
  * A copy is needed to be able to cancel a running query from a different
  * thread. If the same structure is used all structure members would have
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 4075e51..379c38c 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1020,7 +1020,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 	 * row; the original conn->result is left unchanged so that it can be used
 	 * again as the template for future rows.
 	 */
-	if (conn->singleRowMode)
+	if (conn->queryHead && conn->queryHead->singleRowMode)
 	{
 		/* Copy everything that should be in the result at this point */
 		res = PQcopyResult(res,
@@ -1080,7 +1080,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 	 * Success.  In single-row mode, make the result available to the client
 	 * immediately.
 	 */
-	if (conn->singleRowMode)
+	if (conn->queryHead && conn->queryHead->singleRowMode)
 	{
 		/* Change result status to special single-row value */
 		res->resultStatus = PGRES_SINGLE_TUPLE;
@@ -1132,13 +1132,11 @@ PQsendQuery(PGconn *conn, const char *query)
 	}
 
 	/* remember we are using simple query protocol */
-	conn->queryclass = PGQUERY_SIMPLE;
+	conn->queryTail->queryclass = PGQUERY_SIMPLE;
 
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
+	/* if insufficient memory, querycmd just winds up NULL */
+	conn->queryTail->querycmd = strdup(query);
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -1151,7 +1149,9 @@ PQsendQuery(PGconn *conn, const char *query)
 	}
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->asyncStatus == PGASYNC_IDLE)
+		conn->asyncStatus = PGASYNC_BUSY;
+
 	return 1;
 }
 
@@ -1272,13 +1272,11 @@ PQsendPrepare(PGconn *conn,
 		goto sendFailed;
 
 	/* remember we are doing just a Parse */
-	conn->queryclass = PGQUERY_PREPARE;
+	conn->queryTail->queryclass = PGQUERY_PREPARE;
 
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
+	/* if insufficient memory, querycmd just winds up NULL */
+	conn->queryTail->querycmd = strdup(query);
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -1288,7 +1286,9 @@ PQsendPrepare(PGconn *conn,
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->asyncStatus == PGASYNC_IDLE)
+		conn->asyncStatus = PGASYNC_BUSY;
+
 	return 1;
 
 sendFailed:
@@ -1344,6 +1344,8 @@ PQsendQueryPrepared(PGconn *conn,
 static bool
 PQsendQueryStart(PGconn *conn)
 {
+	PGquery * query;
+
 	if (!conn)
 		return false;
 
@@ -1357,20 +1359,59 @@ PQsendQueryStart(PGconn *conn)
 						  libpq_gettext("no connection to the server\n"));
 		return false;
 	}
-	/* Can't send while already busy, either. */
-	if (conn->asyncStatus != PGASYNC_IDLE)
+
+	/* Check if we are in a valid state to send an async query */
+	switch (conn->asyncStatus)
 	{
-		printfPQExpBuffer(&conn->errorMessage,
+		case PGASYNC_IDLE:
+			break;
+		/* Can only send a query during busy or ready state if
+		 * pipelining is enabled */
+		case PGASYNC_BUSY:
+		case PGASYNC_READY:
+			if (conn->pipelining)
+				break;
+			/* Fall through to error */
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			printfPQExpBuffer(&conn->errorMessage,
 				  libpq_gettext("another command is already in progress\n"));
-		return false;
+			return false;
 	}
 
-	/* initialize async result-accumulation state */
-	conn->result = NULL;
-	conn->next_result = NULL;
+	query = 0;
 
-	/* reset single-row processing mode */
-	conn->singleRowMode = false;
+	/* Check if we have a free PGquery to use if not we create one
+	 * We have to make sure we don't use the same PGquery twice
+	 * in a row, so we will try both the first and second free
+	 * entries, if not create a new one. */
+	if (conn->queryFree && conn->queryFree != conn->queryLast)
+	{
+		query = conn->queryFree;
+		conn->queryFree = query->next;
+		query->next = NULL;
+	}
+	else if(conn->queryFree && conn->queryFree->next)
+	{
+		query = conn->queryFree->next;
+		conn->queryFree->next = query->next;
+		query->next = NULL;
+		conn->queryLast = NULL; /* First is fine to use again now */
+	} else
+	{
+		query = (PGquery*) malloc(sizeof(PGquery));
+		query->querycmd = 0;
+		query->singleRowMode = false;
+		query->next = 0;
+	}
+
+	if( conn->queryTail )
+		conn->queryTail->next = query;
+	else
+		conn->queryHead = query;
+
+	conn->queryTail = query;
 
 	/* ready to send command message */
 	return true;
@@ -1522,16 +1563,12 @@ PQsendQueryGuts(PGconn *conn,
 		goto sendFailed;
 
 	/* remember we are using extended query protocol */
-	conn->queryclass = PGQUERY_EXTENDED;
+	conn->queryTail->queryclass = PGQUERY_EXTENDED;
 
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
+	/* if insufficient memory, querycmd just winds up NULL */
 	if (command)
-		conn->last_query = strdup(command);
-	else
-		conn->last_query = NULL;
+		conn->queryTail->querycmd = strdup(command);
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -1541,7 +1578,9 @@ PQsendQueryGuts(PGconn *conn,
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->asyncStatus == PGASYNC_IDLE)
+		conn->asyncStatus = PGASYNC_BUSY;
+
 	return 1;
 
 sendFailed:
@@ -1576,7 +1615,7 @@ pqHandleSendFailure(PGconn *conn)
 }
 
 /*
- * Select row-by-row processing mode
+ * Select row-by-row processing mode for the last sent query
  */
 int
 PQsetSingleRowMode(PGconn *conn)
@@ -1585,18 +1624,16 @@ PQsetSingleRowMode(PGconn *conn)
 	 * Only allow setting the flag when we have launched a query and not yet
 	 * received any results.
 	 */
-	if (!conn)
+	if (!conn || !conn->queryTail)
 		return 0;
-	if (conn->asyncStatus != PGASYNC_BUSY)
+	if (conn->asyncStatus != PGASYNC_BUSY && conn->queryTail == conn->queryHead)
 		return 0;
-	if (conn->queryclass != PGQUERY_SIMPLE &&
-		conn->queryclass != PGQUERY_EXTENDED)
-		return 0;
-	if (conn->result)
+	if (conn->queryTail->queryclass != PGQUERY_SIMPLE &&
+		conn->queryTail->queryclass != PGQUERY_EXTENDED)
 		return 0;
 
 	/* OK, set flag */
-	conn->singleRowMode = true;
+	conn->queryTail->singleRowMode = true;
 	return 1;
 }
 
@@ -1670,6 +1707,50 @@ PQisBusy(PGconn *conn)
 
 
 /*
+ * PQgetQueryCommand
+ */
+const char *
+PQgetQueryCommand(PGquery *query)
+{
+	if (!query)
+		return NULL;
+	return query->querycmd;
+}
+
+/*
+ * PQgetFirstQuery
+ */
+PGquery *
+PQgetResultQuery(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+	return conn->queryHead;
+}
+
+/*
+ * PQgetLastQuery
+ */
+PGquery *
+PQgetSentQuery(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+	return conn->queryTail;
+}
+
+/*
+ * PQgetNextQuery
+ */
+PGquery *
+PQgetNextQuery(PGquery *query)
+{
+	if (!query)
+		return 0;
+	return query->next;
+}
+
+/*
  * PQgetResult
  *	  Get the next PGresult produced by a query.  Returns NULL if no
  *	  query work remains or an error has occurred (e.g. out of
@@ -2132,14 +2213,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		goto sendFailed;
 
 	/* remember we are doing a Describe */
-	conn->queryclass = PGQUERY_DESCRIBE;
-
-	/* reset last-query string (not relevant now) */
-	if (conn->last_query)
-	{
-		free(conn->last_query);
-		conn->last_query = NULL;
-	}
+	conn->queryTail->queryclass = PGQUERY_DESCRIBE;
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -2301,7 +2375,7 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
 		 * If we sent the COPY command in extended-query mode, we must issue a
 		 * Sync as well.
 		 */
-		if (conn->queryclass != PGQUERY_SIMPLE)
+		if (conn->queryHead->queryclass != PGQUERY_SIMPLE)
 		{
 			if (pqPutMsgStart('S', false, conn) < 0 ||
 				pqPutMsgEnd(conn) < 0)
@@ -3112,6 +3186,31 @@ PQisnonblocking(const PGconn *conn)
 	return pqIsnonblocking(conn);
 }
 
+int
+PQsetPipelining(PGconn *conn, int arg)
+{
+	bool barg;
+
+	if (!conn)
+		return -1;
+
+	barg = (arg ? TRUE : FALSE);
+
+	/* Return error if they are trying to turn pipelining off and
+	 * multiple queries are pending */
+	if (!barg && conn->queryHead && conn->queryHead != conn->queryTail)
+		return -1;
+
+	conn->pipelining = barg;
+	return 0;
+}
+
+int
+PQisPipelining(PGconn *conn)
+{
+	return conn->pipelining ? 1 : 0;
+}
+
 /* libpq is thread-safe? */
 int
 PQisthreadsafe(void)
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index c514ca5..d0c5110 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -55,7 +55,29 @@ static void reportErrorPosition(PQExpBuffer msg, const char *query,
 					int loc, int encoding);
 static int build_startup_packet(const PGconn *conn, char *packet,
 					 const PQEnvironmentOption *options);
+static void pqQueryAdvance(PGconn *conn);
 
+void
+pqQueryAdvance(PGconn *conn)
+{
+	PGquery * query;
+
+	query = conn->queryHead;
+	if (query == NULL)
+		return;
+
+	conn->queryLast = query;
+	/* Advance queryHead */
+	conn->queryHead = query->next;
+	/* Push last query onto free stack */
+	query->next = conn->queryFree;
+	conn->queryFree = query;
+	free(query->querycmd);
+	query->querycmd = NULL;
+
+	if (conn->queryHead == NULL)
+		conn->queryTail = NULL;
+}
 
 /*
  * parseInput: if appropriate, parse input data from backend
@@ -218,7 +240,15 @@ pqParseInput3(PGconn *conn)
 				case 'Z':		/* backend is ready for new query */
 					if (getReadyForQuery(conn))
 						return;
-					conn->asyncStatus = PGASYNC_IDLE;
+
+					pqQueryAdvance(conn);
+					/* initialize async result-accumulation state */
+					conn->result = NULL;
+					conn->next_result = NULL;
+					if (conn->queryHead != NULL)
+						conn->asyncStatus = PGASYNC_BUSY;
+					else
+						conn->asyncStatus = PGASYNC_IDLE;
 					break;
 				case 'I':		/* empty query */
 					if (conn->result == NULL)
@@ -232,7 +262,7 @@ pqParseInput3(PGconn *conn)
 					break;
 				case '1':		/* Parse Complete */
 					/* If we're doing PQprepare, we're done; else ignore */
-					if (conn->queryclass == PGQUERY_PREPARE)
+					if (conn->queryHead->queryclass == PGQUERY_PREPARE)
 					{
 						if (conn->result == NULL)
 						{
@@ -266,7 +296,7 @@ pqParseInput3(PGconn *conn)
 					break;
 				case 'T':		/* Row Description */
 					if (conn->result == NULL ||
-						conn->queryclass == PGQUERY_DESCRIBE)
+						conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 					{
 						/* First 'T' in a query sequence */
 						if (getRowDescriptions(conn, msgLength))
@@ -299,7 +329,7 @@ pqParseInput3(PGconn *conn)
 					 * instead of TUPLES_OK.  Otherwise we can just ignore
 					 * this message.
 					 */
-					if (conn->queryclass == PGQUERY_DESCRIBE)
+					if (conn->queryHead && conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 					{
 						if (conn->result == NULL)
 						{
@@ -422,6 +452,8 @@ pqParseInput3(PGconn *conn)
 static void
 handleSyncLoss(PGconn *conn, char id, int msgLength)
 {
+	PGquery * query;
+
 	printfPQExpBuffer(&conn->errorMessage,
 					  libpq_gettext(
 	"lost synchronization with server: got message type \"%c\", length %d\n"),
@@ -430,6 +462,15 @@ handleSyncLoss(PGconn *conn, char id, int msgLength)
 	pqSaveErrorResult(conn);
 	conn->asyncStatus = PGASYNC_READY;	/* drop out of GetResult wait loop */
 
+	/* All queries are canceled, move them to the free list and free the query commands */
+	while ((query = conn->queryHead) != NULL)
+	{
+		free(query->querycmd);
+		query->querycmd = NULL;
+		conn->queryHead = query->next;
+		query->next = conn->queryFree;
+	}
+
 	pqDropConnection(conn);
 	conn->status = CONNECTION_BAD;		/* No more connection to backend */
 }
@@ -455,7 +496,7 @@ getRowDescriptions(PGconn *conn, int msgLength)
 	 * PGresult created by getParamDescriptions, and we should fill data into
 	 * that.  Otherwise, create a new, empty PGresult.
 	 */
-	if (conn->queryclass == PGQUERY_DESCRIBE)
+	if (conn->queryHead && conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 	{
 		if (conn->result)
 			result = conn->result;
@@ -562,7 +603,7 @@ getRowDescriptions(PGconn *conn, int msgLength)
 	 * If we're doing a Describe, we're done, and ready to pass the result
 	 * back to the client.
 	 */
-	if (conn->queryclass == PGQUERY_DESCRIBE)
+	if (conn->queryHead && conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 	{
 		conn->asyncStatus = PGASYNC_READY;
 		return 0;
@@ -865,10 +906,10 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
 	val = PQresultErrorField(res, PG_DIAG_STATEMENT_POSITION);
 	if (val)
 	{
-		if (conn->verbosity != PQERRORS_TERSE && conn->last_query != NULL)
+		if (conn->verbosity != PQERRORS_TERSE && conn->queryHead && conn->queryHead->querycmd != NULL)
 		{
 			/* emit position as a syntax cursor display */
-			querytext = conn->last_query;
+			querytext = conn->queryHead->querycmd;
 			querypos = atoi(val);
 		}
 		else
@@ -1696,7 +1737,7 @@ pqEndcopy3(PGconn *conn)
 		 * If we sent the COPY command in extended-query mode, we must issue a
 		 * Sync as well.
 		 */
-		if (conn->queryclass != PGQUERY_SIMPLE)
+		if (conn->queryHead && conn->queryHead->queryclass != PGQUERY_SIMPLE)
 		{
 			if (pqPutMsgStart('S', false, conn) < 0 ||
 				pqPutMsgEnd(conn) < 0)
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index b81dc16..ca54116 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -141,6 +141,13 @@ typedef struct pg_result PGresult;
  */
 typedef struct pg_cancel PGcancel;
 
+/* PGquery encapsulates the progress of a single query command issued
+ * to the async api functions
+ * The contents of this struct are not supposed to be known to applications.
+ */
+typedef struct pg_query PGquery;
+
+
 /* PGnotify represents the occurrence of a NOTIFY message.
  * Ideally this would be an opaque typedef, but it's so simple that it's
  * unlikely to change.
@@ -404,6 +411,14 @@ extern PGresult *PQgetResult(PGconn *conn);
 extern int	PQisBusy(PGconn *conn);
 extern int	PQconsumeInput(PGconn *conn);
 
+extern int PQsetPipelining(PGconn *conn, int arg);
+extern int PQisPipelining(PGconn *conn);
+
+extern const char * PQgetQueryCommand(PGquery *query);
+extern PGquery *PQgetResultQuery(PGconn *conn);
+extern PGquery *PQgetSentQuery(PGconn *conn);
+extern PGquery *PQgetNextQuery(PGquery *query);
+
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 4ef46ff..7d84d89 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -291,6 +291,16 @@ typedef struct pgDataValue
 	const char *value;			/* data value, without zero-termination */
 } PGdataValue;
 
+typedef struct pg_query
+{
+	PGQueryClass queryclass;
+	char	   *querycmd;		/* last SQL command, or NULL if unknown */
+	bool		singleRowMode;	/* return query result row-by-row? */
+	struct pg_query * next;
+	void	   *userptr;        /* convenience for the user */
+} PGquery;
+
+
 /*
  * PGconn stores all the state data associated with a single connection
  * to a backend.
@@ -350,13 +360,20 @@ struct pg_conn
 	ConnStatusType status;
 	PGAsyncStatusType asyncStatus;
 	PGTransactionStatusType xactStatus; /* never changes to ACTIVE */
-	PGQueryClass queryclass;
-	char	   *last_query;		/* last SQL command, or NULL if unknown */
+
+	/* queryHead and queryTail form a FIFO representing queries sent
+	 * to the backend.  queryHead is the first query sent, and is the
+	 * query we are receiving results from, or have received results from */
+	bool pipelining;
+	PGquery *queryHead;
+	PGquery *queryTail;
+	PGquery *queryFree; /* Reuse PGQuery allocations */
+	PGquery *queryLast; /* Ensure we never use a query twice in a row */
+
 	char		last_sqlstate[6];		/* last reported SQLSTATE */
 	bool		options_valid;	/* true if OK to attempt connection */
 	bool		nonblocking;	/* whether this connection is using nonblock
 								 * sending semantics */
-	bool		singleRowMode;	/* return current query result row-by-row? */
 	char		copy_is_binary; /* 1 = copy binary, 0 = copy text */
 	int			copy_already_done;		/* # bytes already returned in COPY
 										 * OUT */
diff --git a/src/test/examples/Makefile b/src/test/examples/Makefile
index aee5c04..3996760 100644
--- a/src/test/examples/Makefile
+++ b/src/test/examples/Makefile
@@ -14,7 +14,7 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 override LDLIBS := $(libpq_pgport) $(LDLIBS)
 
 
-PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64
+PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64 testlibpqpipeline testlibpqpipeline2
 
 all: $(PROGS)
 

["testlibpqpipeline2.c" (testlibpqpipeline2.c)]

/*
 * src/test/examples/testlibpqpipeline2.c
 *
 *
 * testlibpqpipeline.c
 *		this test program tests query pipelining.  It shows how to issue multiple
 *      pipelined queries, and identify from which query a result originated.  It 
 *      also demonstrates how failure of one query does not impact subsequent queries
 *      when they are not part of the same transaction.
 *
 *
 */
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>

#include "libpq-fe.h"

static void checkResult(PGconn *conn, PGresult *result, PGquery *query, int expectedResultStatus)
{
	if (PQresultStatus(result) != expectedResultStatus)
	{
		printf( "Got unexpected result status '%s', expected '%s'\nQuery:%s\n", 
			PQresStatus(PQresultStatus(result)), PQresStatus(expectedResultStatus),
			PQgetQueryCommand(query));
		PQclear(result);
		PQclear(PQexec(conn,"DROP TABLE test"));
		PQfinish(conn);
		exit(1);
	}
	PQclear(result);
}

int
main(int argc, char **argv)
{
	PGconn * conn;
	PGquery * query1;
	PGquery * query2;
	PGquery * query3;
	PGquery * curQuery;
	PGresult * result;
	
	conn = NULL;
	query1 = query2 = query3 = curQuery = NULL;
	result = NULL;
	
	/* make a connection to the database */
	conn = PQsetdb(NULL, NULL, NULL, NULL, NULL);

	/* check to see that the backend connection was successfully made */
	if (PQstatus(conn) != CONNECTION_OK)
	{
		fprintf(stderr, "Connection to database failed: %s",
				PQerrorMessage(conn));
		exit(1);
	}

	PQsetPipelining(conn,1);
	
	checkResult(conn,PQexec(conn,"DROP TABLE IF EXISTS test"),NULL,PGRES_COMMAND_OK);
	checkResult(conn,PQexec(conn,"CREATE TABLE test ( id SERIAL PRIMARY KEY )"),NULL,PGRES_COMMAND_OK);
	
	PQsendQuery(conn, "INSERT INTO test(id) VALUES (DEFAULT),(DEFAULT) RETURNING id");
	query1 = PQgetSentQuery(conn);
	
	/* Duplicate primary key error */
	PQsendQuery(conn, "UPDATE test SET id=2 WHERE id=1");
	query2 = PQgetSentQuery(conn);
	
	PQsendQuery(conn, "SELECT * FROM test");
	query3 = PQgetSentQuery(conn);
	
	while( (result = PQgetResult(conn)) != NULL )
	{
		curQuery = PQgetResultQuery(conn);
		
		if (curQuery == query1)
			checkResult(conn,result,curQuery,PGRES_TUPLES_OK);
		if (curQuery == query2)
			checkResult(conn,result,curQuery,PGRES_FATAL_ERROR);
		if (curQuery == query3)
			checkResult(conn,result,curQuery,PGRES_TUPLES_OK);
	}

	PQclear(PQexec(conn,"DROP TABLE test"));
	
	PQfinish(conn);
	
	return 0;
} 

["testlibpqpipeline.c" (testlibpqpipeline.c)]

/*
 * src/test/examples/testlibpqpipeline.c
 *
 *
 * testlibpqpipeline.c
 *		this test program test query pipelining and it's performance impact
 *
 *
 */
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>

#include "libpq-fe.h"

// If defined we won't issue more sql commands if the socket's
// write buffer is full
//#define MIN_LOCAL_Q

//#define PRINT_QUERY_PROGRESS

static int testPipelined( PGconn * conn, int totalQueries, int totalQueued, const char * sql );
static int testPipelinedSeries( PGconn * conn, int totalQueries, int totalQueued, int baseline_usecs );


int
testPipelined( PGconn * conn, int totalQueries, int totalQueued, const char * sql )
{
	int nQueriesQueued;
	int nQueriesTotal;
	PGresult * result;
	PGquery * firstQuery;
	PGquery * curQuery;
	
	nQueriesQueued = nQueriesTotal = 0;
	result = NULL;
	firstQuery = curQuery = NULL;
	
	while( nQueriesQueued > 0 || nQueriesTotal < totalQueries ) {
		
		if( PQconsumeInput(conn) == 0 ) {
			printf( "PQconsumeInput ERROR: %s\n", PQerrorMessage(conn) );
			return 1;
		}
		
		do {
			curQuery = PQgetResultQuery(conn);
			
			/* firstQuery is finished */
			if( firstQuery != curQuery )
			{
				//printf( "%p done, curQuery=%p\n", firstQuery, curQuery );
#ifdef PRINT_QUERY_PROGRESS
				printf("-");
#endif
				firstQuery = curQuery;
				nQueriesQueued--;
			}
			
			/* Break if no queries are ready */
			if( !firstQuery || PQisBusy(conn) )
				break;
			
			if( (result = PQgetResult(conn)) != 0 )
				PQclear(result);
		}
		while(1);
		
		if( nQueriesTotal < totalQueries && nQueriesQueued < totalQueued ) {
#ifdef MIN_LOCAL_Q
			int flushResult = PQflush(conn);
			 if( flushResult == -1 ) {
				printf( "PQflush ERROR: %s\n", PQerrorMessage(conn) );
				return 1;
			} else if ( flushResult == 1 )
				continue;
#endif
			if( !PQsendQuery(conn,sql) ){
				printf( "PQsendQuery failed with error: %s\n", PQerrorMessage(conn) );
				return 1;
			}
			if( firstQuery == NULL )
				firstQuery = PQgetSentQuery(conn);
			nQueriesTotal++;
			nQueriesQueued++;
#ifdef PRINT_QUERY_PROGRESS
			printf( "+" );
#endif
		}
	}
#ifdef PRINT_QUERY_PROGRESS
	printf( "\n" );
#endif
	return 0;
}

int testPipelinedSeries( PGconn * conn, int totalQueries, int totalQueued, int baseline_usecs )
{
	int result;
	struct timeval tv1, tv2;
	int secs, usecs;
	
	gettimeofday(&tv1,NULL);
#define TEST_P(q) \
	if( (result = testPipelined(conn,totalQueries,totalQueued,q)) != 0 ) \
		return result;
	TEST_P("INSERT INTO test(id) VALUES (DEFAULT)");
	TEST_P("SELECT * FROM test LIMIT 1");
	TEST_P("SELECT * FROM test");
	TEST_P("DELETE FROM test");
	gettimeofday(&tv2,NULL);
	secs = tv2.tv_sec - tv1.tv_sec;
	usecs = secs * 1000000 + tv2.tv_usec - tv1.tv_usec;
	printf("testPipelinedSeries(%i,%i) took %i.%06i",totalQueries,totalQueued,secs,usecs);
	if (baseline_usecs == 0)
		printf("\n");
	else
		printf(", speedup %.2f\n", (double)baseline_usecs / usecs );
	return usecs;
}

int
main(int argc, char **argv)
{
	PGconn * conn;
	int baseline;
	
	conn = NULL;
	
	/* make a connection to the database */
	conn = PQsetdb(NULL, NULL, NULL, NULL, NULL);

	/* check to see that the backend connection was successfully made */
	if (PQstatus(conn) != CONNECTION_OK)
	{
		fprintf(stderr, "Connection to database failed: %s",
				PQerrorMessage(conn));
		exit(1);
	}

	PQsetPipelining(conn,1);
	PQsetnonblocking(conn,1);
	
	PQclear(PQexec(conn,"CREATE TABLE test ( id SERIAL PRIMARY KEY )"));

	baseline = testPipelinedSeries(conn,10,1,0);
	testPipelinedSeries(conn,10,3,baseline);
	testPipelinedSeries(conn,10,10,baseline);
	
	baseline = testPipelinedSeries(conn,100,1,0);
	testPipelinedSeries(conn,100,3,baseline);
	testPipelinedSeries(conn,100,10,baseline);
	testPipelinedSeries(conn,100,25,baseline);
	testPipelinedSeries(conn,100,50,baseline);
	testPipelinedSeries(conn,100,100,baseline);
	
	PQclear(PQexec(conn,"DROP TABLE test"));
	
	PQfinish(conn);
	
	return 0;
}

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic