[prev in list] [next in list] [prev in thread] [next in thread] 

List:       postgresql-general
Subject:    ParallelFinish-hook of FDW/CSP (Re: [HACKERS] Steps inside ExecEndGather)
From:       Kouhei Kaigai <kaigai () ak ! jp ! nec ! com>
Date:       2016-10-31 14:33:04
Message-ID: 9A28C8860F777E439AA12E8AEA7694F801250675 () BPXM15GP ! gisp ! nec ! co ! jp
[Download RAW message or body]

[Attachment #2 (text/plain)]

Hello,

The attached patch implements the suggestion by Amit before.

What I'm motivated is to collect extra run-time statistics specific
to a particular ForeignScan/CustomScan, not only the standard
Instrumentation; like DMA transfer rate or execution time of GPU
kernels in my case.

Per-node DSM toc is one of the best way to return run-time statistics
to the master backend, because FDW/CSP can assign arbitrary length of
the region according to its needs. It is quite easy to require.
However, one problem is, the per-node DSM toc is already released when
ExecEndNode() is called on the child node of Gather.

This patch allows extensions to get control on the master backend's
context when all the worker node gets finished but prior to release
of the DSM segment. If FDW/CSP has its special statistics on the
segment, it can move to the private memory area for EXPLAIN output
or something other purpose.

One design consideration is whether the hook shall be called from
ExecParallelRetrieveInstrumentation() or ExecParallelFinish().
The former is a function to retrieve the standard Instrumentation
information, thus, it is valid only if EXPLAIN ANALYZE.
On the other hands, if we put entrypoint at ExecParallelFinish(),
extension can get control regardless of EXPLAIN ANALYZE, however,
it also needs an extra planstate_tree_walker().

Right now, we don't assume anything onto the requirement by FDW/CSP.
It may want run-time statistics regardless of EXPLAIN ANALYZE, thus,
hook shall be invoked always when Gather node confirmed termination
of the worker processes.

Thanks,
--
NEC OSS Promotion Center / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>


> -----Original Message-----
> From: Amit Kapila [mailto:amit.kapila16@gmail.com]
> Sent: Monday, October 17, 2016 11:22 AM
> To: Kaigai Kouhei(海外 浩平)
> Cc: Robert Haas; pgsql-hackers
> Subject: ##freemail## Re: [HACKERS] Steps inside ExecEndGather
> 
> On Mon, Oct 17, 2016 at 6:22 AM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:
> > Hello,
> >
> > I'm now trying to carry extra performance statistics on CustomScan
> > (like DMA transfer rate, execution time of GPU kernels, etc...)
> > from parallel workers to the leader process using the DSM segment
> > attached by the parallel-context.
> > We can require an arbitrary length of DSM using ExecCustomScanEstimate
> > hook by extension, then it looks leader/worker can share the DSM area.
> > However, we have a problem on this design.
> >
> > Below is the implementation of ExecEndGather().
> >
> >   void
> >   ExecEndGather(GatherState *node)
> >   {
> >       ExecShutdownGather(node);
> >       ExecFreeExprContext(&node->ps);
> >       ExecClearTuple(node->ps.ps_ResultTupleSlot);
> >       ExecEndNode(outerPlanState(node));
> >   }
> >
> > It calls ExecShutdownGather() prior to the recursive call of ExecEndNode().
> > The DSM segment shall be released on this call, so child node cannot
> > reference the DSM at the time of ExecEndNode().
> >
> 
> Before releasing DSM, we do collect all the statistics or
> instrumentation information of each node.  Refer
> ExecParallelFinish()->ExecParallelRetrieveInstrumentation(), so I am
> wondering why can't you collect the additional information in the same
> way?
> 
> 
> --
> With Regards,
> Amit Kapila.
> EnterpriseDB: http://www.enterprisedb.com

["parallel-finish-fdw_csp.v1.patch" (application/octet-stream)]

 doc/src/sgml/custom-scan.sgml          | 12 ++++++++++++
 doc/src/sgml/fdwhandler.sgml           | 13 +++++++++++++
 src/backend/executor/execParallel.c    | 31 +++++++++++++++++++++++++++++--
 src/backend/executor/nodeCustom.c      |  9 +++++++++
 src/backend/executor/nodeForeignscan.c | 16 ++++++++++++++++
 src/include/executor/nodeCustom.h      |  2 ++
 src/include/executor/nodeForeignscan.h |  2 ++
 src/include/foreign/fdwapi.h           |  3 +++
 src/include/nodes/extensible.h         |  2 ++
 9 files changed, 88 insertions(+), 2 deletions(-)

diff --git a/doc/src/sgml/custom-scan.sgml b/doc/src/sgml/custom-scan.sgml
index 1ca9247..4bd20dd 100644
--- a/doc/src/sgml/custom-scan.sgml
+++ b/doc/src/sgml/custom-scan.sgml
@@ -340,6 +340,18 @@ void (*InitializeWorkerCustomScan) (CustomScanState *node,
 
    <para>
 <programlisting>
+void (*ParallelFinishCustomScan) (CustomScanState *node,
+                                  ParallelContext *pcxt);
+</programlisting>
+    Retrieve the custom state after all the worker get finished but prior
+    to the release of DSM segment.
+    This callback is optional, and needs only be supplied if this custom
+    path wants to reference the DSM segment in the master process's context
+    after the worker's exit.
+   </para>
+
+   <para>
+<programlisting>
 void (*ExplainCustomScan) (CustomScanState *node,
                            List *ancestors,
                            ExplainState *es);
diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml
index 0c1db07..9ae006a 100644
--- a/doc/src/sgml/fdwhandler.sgml
+++ b/doc/src/sgml/fdwhandler.sgml
@@ -1254,6 +1254,19 @@ InitializeWorkerForeignScan(ForeignScanState *node, shm_toc *toc,
     This callback is optional, and needs only be supplied if this
     custom path supports parallel execution.
    </para>
+
+   <para>
+<programlisting>
+void
+ParallelFinishForeignScan(ForeignScanState *node, ParallelContext *pcxt);
+</programlisting>
+    Retrieve the custom state after all the worker get finished but prior
+    to the release of DSM segment.
+    This callback is optional, and needs only be supplied if this custom
+    path wants to reference the DSM segment in the master process's context
+    after the worker's exit.
+   </para>
+
    </sect2>
 
    </sect1>
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 5aa6f02..3be3090 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -536,8 +536,33 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
 }
 
 /*
+ * ExecParallelFinishRecursive - allows FDW/CSP to retrieve its own run-time
+ * statistics stores in the shared memory segment.
+ */
+static bool
+ExecParallelFinishRecursive(PlanState *planstate, ParallelContext *pcxt)
+{
+	switch (nodeTag(planstate))
+	{
+		case T_ForeignScanState:
+			ExecForeignScanParallelFinish((ForeignScanState *) planstate,
+										  pcxt);
+			break;
+		case T_CustomScanState:
+			ExecCustomScanParallelFinish((CustomScanState *) planstate,
+										 pcxt);
+			break;
+		default:
+			break;
+	}
+	return planstate_tree_walker(planstate,
+								 ExecParallelFinishRecursive,
+								 pcxt);
+}
+
+/*
  * Finish parallel execution.  We wait for parallel workers to finish, and
- * accumulate their buffer usage and instrumentation.
+ * accumulate their buffer usage, instrumentation and others.
  */
 void
 ExecParallelFinish(ParallelExecutorInfo *pei)
@@ -554,10 +579,12 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
 	for (i = 0; i < pei->pcxt->nworkers_launched; ++i)
 		InstrAccumParallelQuery(&pei->buffer_usage[i]);
 
-	/* Finally, accumulate instrumentation, if any. */
+	/* Next, accumulate instrumentation, if any. */
 	if (pei->instrumentation)
 		ExecParallelRetrieveInstrumentation(pei->planstate,
 											pei->instrumentation);
+	/* Finally, accumulate extension's stuff, if any */
+	ExecParallelFinishRecursive(pei->planstate, pei->pcxt);
 
 	pei->finished = true;
 }
diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c
index 322abca..b56e80b 100644
--- a/src/backend/executor/nodeCustom.c
+++ b/src/backend/executor/nodeCustom.c
@@ -204,3 +204,12 @@ ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
 		methods->InitializeWorkerCustomScan(node, toc, coordinate);
 	}
 }
+
+void
+ExecCustomScanParallelFinish(CustomScanState *node, ParallelContext *pcxt)
+{
+	const CustomExecMethods *methods = node->methods;
+
+	if (methods->ParallelFinishCustomScan)
+		methods->ParallelFinishCustomScan(node, pcxt);
+}
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index d886aaf..d2cb1f7 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -355,3 +355,19 @@ ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc)
 		fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate);
 	}
 }
+
+/* ----------------------------------------------------------------
+ *		ExecForeignScanParallelFinish
+ *
+ *		Retrieve FDW's own run-time statistics on the parallel coordication
+ *		information prior to its release.
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanParallelFinish(ForeignScanState *node, ParallelContext *pcxt)
+{
+	FdwRoutine *fdwroutine = node->fdwroutine;
+
+	if (fdwroutine->ParallelFinishForeignScan)
+		fdwroutine->ParallelFinishForeignScan(node, pcxt);
+}
diff --git a/src/include/executor/nodeCustom.h b/src/include/executor/nodeCustom.h
index 7d16c2b..92ba894 100644
--- a/src/include/executor/nodeCustom.h
+++ b/src/include/executor/nodeCustom.h
@@ -37,5 +37,7 @@ extern void ExecCustomScanInitializeDSM(CustomScanState *node,
 							ParallelContext *pcxt);
 extern void ExecCustomScanInitializeWorker(CustomScanState *node,
 							   shm_toc *toc);
+extern void ExecCustomScanParallelFinish(CustomScanState *node,
+										 ParallelContext *pcxt);
 
 #endif   /* NODECUSTOM_H */
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index 0cdec4e..3384a01 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -28,5 +28,7 @@ extern void ExecForeignScanInitializeDSM(ForeignScanState *node,
 							 ParallelContext *pcxt);
 extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
 								shm_toc *toc);
+extern void ExecForeignScanParallelFinish(ForeignScanState *node,
+										  ParallelContext *pcxt);
 
 #endif   /* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index e1b0d0d..1100f6d 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -151,6 +151,8 @@ typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node,
 typedef void (*InitializeWorkerForeignScan_function) (ForeignScanState *node,
 																shm_toc *toc,
 														   void *coordinate);
+typedef void (*ParallelFinishForeignScan_function) (ForeignScanState *node,
+													ParallelContext *pcxt);
 typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root,
 															 RelOptInfo *rel,
 														 RangeTblEntry *rte);
@@ -224,6 +226,7 @@ typedef struct FdwRoutine
 	EstimateDSMForeignScan_function EstimateDSMForeignScan;
 	InitializeDSMForeignScan_function InitializeDSMForeignScan;
 	InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
+	ParallelFinishForeignScan_function ParallelFinishForeignScan;
 } FdwRoutine;
 
 
diff --git a/src/include/nodes/extensible.h b/src/include/nodes/extensible.h
index 17afe58..9decbf6 100644
--- a/src/include/nodes/extensible.h
+++ b/src/include/nodes/extensible.h
@@ -139,6 +139,8 @@ typedef struct CustomExecMethods
 	void		(*InitializeWorkerCustomScan) (CustomScanState *node,
 														   shm_toc *toc,
 														   void *coordinate);
+	void		(*ParallelFinishCustomScan) (CustomScanState *node,
+											 ParallelContext *pcxt);
 
 	/* Optional: print additional information in EXPLAIN */
 	void		(*ExplainCustomScan) (CustomScanState *node,


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic