[prev in list] [next in list] [prev in thread] [next in thread] 

List:       postgresql-hackers
Subject:    RE: Parallel INSERT SELECT take 2
From:       "houzj.fnst () fujitsu ! com" <houzj ! fnst () fujitsu ! com>
Date:       2021-05-31 5:34:09
Message-ID: OS0PR01MB5716F50BCC5722D17F721990943F9 () OS0PR01MB5716 ! jpnprd01 ! prod ! outlook ! com
[Download RAW message or body]

[Attachment #2 (text/plain)]

From: Greg Nancarrow <gregn4422@gmail.com>
Sent: Friday, May 28, 2021 4:42 PM
> On Mon, May 24, 2021 at 3:15 PM houzj.fnst@fujitsu.com
> <houzj.fnst@fujitsu.com> wrote:
> >
> >
> > Thanks for the comments and your descriptions looks good.
> > Attaching v5 patchset with all these changes.
> >
> 
> A few other minor things I noticed:
> 
> (1) error message wording when declaring a table SAFE for parallel DML
> 
> src/backend/commands/tablecmds.c
> 
> Since data modification for the RELKIND_FOREIGN_TABLE and
> RELPERSISTENCE_TEMP types are allowed in the parallel-restricted case (i.e.
> leader may modify in parallel mode) I'm thinking it may be better to use
> wording like:
> 
>     "cannot support foreign or temporary table data modification by parallel
> workers"
> 
> instead of
> 
>     "cannot support parallel data modification on a foreign or temporary table"
> 
> There are TWO places where this error message is used.
> 
> (What do you think?)

I think your change looks good.
I used your msg in the latest patchset.

> (2) Minor formatting issue
> 
> src/backend/optimizer/util/clauses.c
> 
>     static safety_object *make_safety_object(Oid objid, Oid classid, char
> proparallel)
> 
> should be:
> 
>     static safety_object *
>     make_safety_object(Oid objid, Oid classid, char proparallel)

Changed.
 
> (3) Minor formatting issue
>
> src/backend/utils/cache/typcache.c
> 
> 
>     List *GetDomainConstraints(Oid type_id)
> 
> should be:
> 
>     List *
>     GetDomainConstraints(Oid type_id)

Changed.

Attaching v6 patchset.
And I registered it in CF https://commitfest.postgresql.org/33/3143/,
comments are welcome.

Best regards,
houzj






["v6-0004-regression-test-and-doc-updates.patch" (application/octet-stream)]

From 847432ca595a2e9b91b42cd7be3aa892b496a733 Mon Sep 17 00:00:00 2001
From: houzj <houzj.fnst@fujitsu.com>
Date: Mon, 31 May 2021 12:51:00 +0800
Subject: [PATCH] regression-test-and doc updates

---
 doc/src/sgml/func.sgml                        |  41 ++
 doc/src/sgml/ref/create_foreign_table.sgml    |  24 ++
 doc/src/sgml/ref/create_table.sgml            |  25 ++
 doc/src/sgml/ref/create_table_as.sgml         |  23 +
 src/test/regress/expected/insert_parallel.out | 580 ++++++++++++++++++++++++++
 src/test/regress/parallel_schedule            |   1 +
 src/test/regress/sql/insert_parallel.sql      | 346 +++++++++++++++
 7 files changed, 1040 insertions(+)
 create mode 100644 src/test/regress/expected/insert_parallel.out
 create mode 100644 src/test/regress/sql/insert_parallel.sql

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 08b07f5..e0a9a90 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -23909,6 +23909,47 @@ SELECT collation for ('foo' COLLATE "de_DE");
         Undefined objects are identified with <literal>NULL</literal> values.
        </para></entry>
       </row>
+
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_get_parallel_safety</primary>
+        </indexterm>
+        <function>pg_get_parallel_safety</function> ( \
<parameter>table_name</parameter> <type>regclass</type> ) +        \
<returnvalue>record</returnvalue> +        ( <parameter>objid</parameter> \
<type>oid</type>, +        <parameter>classid</parameter> <type>oid</type>,
+        <parameter>proparallel</parameter> <type>char</type> )
+       </para>
+       <para>
+        Returns a row containing enough information to uniquely identify the
+        parallel unsafe/restricted table-related objects from which the
+        table's parallel DML safety is determined. The user can use this
+        information during development in order to accurately declare a
+        table's parallel DML safety. Or to identify any problematic objects
+        if a parallel DML fails or behaves unexpectedly. Note that When the
+        use of an object-related parallel unsafe/restricted function is
+        detected, both the function OID and the object OID are returned.
+        <parameter>classid</parameter> is the OID of the system catalog
+        containing the object;
+        <parameter>objid</parameter> is the OID of the object itself.
+       </para></entry>
+      </row>
+
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_get_max_parallel_hazard</primary>
+        </indexterm>
+        <function>pg_get_max_parallel_hazard</function> ( <type>regclass</type> )
+        <returnvalue>char</returnvalue>
+       </para>
+       <para>
+        Returns the worst parallel DML safety hazard that can be found in the
+        given relation. Users can use this function to do a quick check without
+        caring about specific parallel-related objects.
+       </para></entry>
+      </row>
      </tbody>
     </tgroup>
    </table>
diff --git a/doc/src/sgml/ref/create_foreign_table.sgml \
b/doc/src/sgml/ref/create_foreign_table.sgml index f9477ef..3f089f2 100644
--- a/doc/src/sgml/ref/create_foreign_table.sgml
+++ b/doc/src/sgml/ref/create_foreign_table.sgml
@@ -27,6 +27,7 @@ CREATE FOREIGN TABLE [ IF NOT EXISTS ] <replaceable \
class="parameter">table_name  [, ... ]
 ] )
 [ INHERITS ( <replaceable>parent_table</replaceable> [, ... ] ) ]
+[ PARALLEL DML { UNSAFE | RESTRICTED | SAFE } ]
   SERVER <replaceable class="parameter">server_name</replaceable>
 [ OPTIONS ( <replaceable class="parameter">option</replaceable> '<replaceable \
class="parameter">value</replaceable>' [, ... ] ) ]  
@@ -36,6 +37,7 @@ CREATE FOREIGN TABLE [ IF NOT EXISTS ] <replaceable \
class="parameter">table_name  | <replaceable>table_constraint</replaceable> }
     [, ... ]
 ) ] <replaceable class="parameter">partition_bound_spec</replaceable>
+[ PARALLEL DML { UNSAFE | RESTRICTED | SAFE } ]
   SERVER <replaceable class="parameter">server_name</replaceable>
 [ OPTIONS ( <replaceable class="parameter">option</replaceable> '<replaceable \
class="parameter">value</replaceable>' [, ... ] ) ]  
@@ -291,6 +293,28 @@ CHECK ( <replaceable class="parameter">expression</replaceable> \
) [ NO INHERIT ]  </varlistentry>
 
    <varlistentry>
+    <term><literal>PARALLEL DML { UNSAFE | RESTRICTED | SAFE } </literal></term>
+    <listitem>
+     <para>
+      <literal>PARALLEL DML UNSAFE</literal> indicates that the data in table
+      can't be modified in parallel mode. This is the default.
+      <literal>PARALLEL DML RESTRICTED</literal> indicates that the data in
+      table can be modified in parallel mode, but the modification is
+      restricted to parallel group leader. <literal>PARALLEL DML SAFE</literal>
+      indicates that the table is safe to be modified in parallel mode without
+      restriction. But note that <productname>PostgreSQL</productname>
+      does not support data modification in parallel worker for now.
+     </para>
+
+     <para>
+      Tables should be labeled parallel dml unsafe/restricted if any parallel
+      unsafe/restricted function could be executed when modifying the data in
+      table (e.g., functions in trigger/index expression/constraints ...).
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
     <term><replaceable class="parameter">server_name</replaceable></term>
     <listitem>
      <para>
diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml
index c6d0a35..07182b9 100644
--- a/doc/src/sgml/ref/create_table.sgml
+++ b/doc/src/sgml/ref/create_table.sgml
@@ -33,6 +33,7 @@ CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE \
[ IF NOT EXI  [ WITH ( <replaceable class="parameter">storage_parameter</replaceable> \
[= <replaceable class="parameter">value</replaceable>] [, ... ] ) | WITHOUT OIDS ]  [ \
ON COMMIT { PRESERVE ROWS | DELETE ROWS | DROP } ]  [ TABLESPACE <replaceable \
class="parameter">tablespace_name</replaceable> ] +[ PARALLEL DML { UNSAFE | \
RESTRICTED | SAFE } ]  
 CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXISTS \
] <replaceable class="parameter">table_name</replaceable>  OF <replaceable \
class="parameter">type_name</replaceable> [ ( @@ -45,6 +46,7 @@ CREATE [ [ GLOBAL | \
LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI  [ WITH ( <replaceable \
class="parameter">storage_parameter</replaceable> [= <replaceable \
class="parameter">value</replaceable>] [, ... ] ) | WITHOUT OIDS ]  [ ON COMMIT { \
PRESERVE ROWS | DELETE ROWS | DROP } ]  [ TABLESPACE <replaceable \
class="parameter">tablespace_name</replaceable> ] +[ PARALLEL DML { UNSAFE | \
RESTRICTED | SAFE } ]  
 CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXISTS \
                ] <replaceable class="parameter">table_name</replaceable>
     PARTITION OF <replaceable class="parameter">parent_table</replaceable> [ (
@@ -57,6 +59,7 @@ CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE \
[ IF NOT EXI  [ WITH ( <replaceable class="parameter">storage_parameter</replaceable> \
[= <replaceable class="parameter">value</replaceable>] [, ... ] ) | WITHOUT OIDS ]  [ \
ON COMMIT { PRESERVE ROWS | DELETE ROWS | DROP } ]  [ TABLESPACE <replaceable \
class="parameter">tablespace_name</replaceable> ] +[ PARALLEL DML { UNSAFE | \
RESTRICTED | SAFE } ]  
 <phrase>where <replaceable class="parameter">column_constraint</replaceable> \
is:</phrase>  
@@ -1336,6 +1339,28 @@ WITH ( MODULUS <replaceable \
class="parameter">numeric_literal</replaceable>, REM  </listitem>
    </varlistentry>
 
+   <varlistentry id="sql-createtable-paralleldmlsafety">
+    <term><literal>PARALLEL DML { UNSAFE | RESTRICTED | SAFE } </literal></term>
+    <listitem>
+     <para>
+      <literal>PARALLEL DML UNSAFE</literal> indicates that the data in table
+      can't be modified in parallel mode. This is the default.
+      <literal>PARALLEL DML RESTRICTED</literal> indicates that the data in
+      table can be modified in parallel mode, but the modification is
+      restricted to parallel group leader. <literal>PARALLEL DML SAFE</literal>
+      indicates that the table is safe to be modified in parallel mode without
+      restriction. But note that <productname>PostgreSQL</productname>
+      does not support data modification in parallel worker for now.
+     </para>
+
+     <para>
+      Tables should be labeled parallel dml unsafe/restricted if any parallel
+      unsafe/restricted function could be executed when modifying the data in
+      table (e.g., functions in trigger/index expression/constraints ...).
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>USING INDEX TABLESPACE <replaceable \
class="parameter">tablespace_name</replaceable></literal></term>  <listitem>
diff --git a/doc/src/sgml/ref/create_table_as.sgml \
b/doc/src/sgml/ref/create_table_as.sgml index 07558ab..71c932b 100644
--- a/doc/src/sgml/ref/create_table_as.sgml
+++ b/doc/src/sgml/ref/create_table_as.sgml
@@ -27,6 +27,7 @@ CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE \
                [ IF NOT EXI
     [ WITH ( <replaceable class="parameter">storage_parameter</replaceable> [= \
<replaceable class="parameter">value</replaceable>] [, ... ] ) | WITHOUT OIDS ]  [ ON \
                COMMIT { PRESERVE ROWS | DELETE ROWS | DROP } ]
     [ TABLESPACE <replaceable class="parameter">tablespace_name</replaceable> ]
+    [ PARALLEL DML { UNSAFE | RESTRICTED | SAFE } ]
     AS <replaceable>query</replaceable>
     [ WITH [ NO ] DATA ]
 </synopsis>
@@ -224,6 +225,28 @@ CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] \
TABLE [ IF NOT EXI  </varlistentry>
 
    <varlistentry>
+    <term><literal>PARALLEL DML { UNSAFE | RESTRICTED | SAFE } </literal></term>
+    <listitem>
+     <para>
+      <literal>PARALLEL DML UNSAFE</literal> indicates that the data in table
+      can't be modified in parallel mode. This is the default.
+      <literal>PARALLEL DML RESTRICTED</literal> indicates that the data in
+      table can be modified in parallel mode, but the modification is
+      restricted to parallel group leader. <literal>PARALLEL DML SAFE</literal>
+      indicates that the table is safe to be modified in parallel mode without
+      restriction. But note that <productname>PostgreSQL</productname>
+      does not support data modification in parallel worker for now.
+     </para>
+
+     <para>
+      Tables should be labeled parallel dml unsafe/restricted if any parallel
+      unsafe/restricted function could be executed when modifying the data in
+      table (e.g., functions in trigger/index expression/constraints ...).
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
     <term><replaceable>query</replaceable></term>
     <listitem>
      <para>
diff --git a/src/test/regress/expected/insert_parallel.out \
b/src/test/regress/expected/insert_parallel.out new file mode 100644
index 0000000..ca486c6
--- /dev/null
+++ b/src/test/regress/expected/insert_parallel.out
@@ -0,0 +1,580 @@
+--
+-- PARALLEL
+--
+--
+-- START: setup some tables and data needed by the tests.
+--
+-- Setup - index expressions test
+create function pg_class_relname(Oid)
+returns name language sql parallel unsafe
+as 'select relname from pg_class where $1 = oid';
+-- For testing purposes, we'll mark this function as parallel-unsafe
+create or replace function fullname_parallel_unsafe(f text, l text) returns text as \
$$ +    begin
+        return f || l;
+    end;
+$$ language plpgsql immutable parallel unsafe;
+create or replace function fullname_parallel_restricted(f text, l text) returns text \
as $$ +    begin
+        return f || l;
+    end;
+$$ language plpgsql immutable parallel restricted;
+create table names(index int, first_name text, last_name text);
+create table names2(index int, first_name text, last_name text);
+create index names2_fullname_idx on names2 (fullname_parallel_unsafe(first_name, \
last_name)); +create table names4(index int, first_name text, last_name text);
+create index names4_fullname_idx on names4 (fullname_parallel_restricted(first_name, \
last_name)); +alter table names2 parallel dml safe;
+alter table names4 parallel dml safe;
+insert into names values
+    (1, 'albert', 'einstein'),
+    (2, 'niels', 'bohr'),
+    (3, 'erwin', 'schrodinger'),
+    (4, 'leonhard', 'euler'),
+    (5, 'stephen', 'hawking'),
+    (6, 'isaac', 'newton'),
+    (7, 'alan', 'turing'),
+    (8, 'richard', 'feynman');
+-- Setup - column default tests
+create or replace function bdefault_unsafe ()
+returns int language plpgsql parallel unsafe as $$
+begin
+	RETURN 5;
+end $$;
+create or replace function cdefault_restricted ()
+returns int language plpgsql parallel restricted as $$
+begin
+	RETURN 10;
+end $$;
+create or replace function ddefault_safe ()
+returns int language plpgsql parallel safe as $$
+begin
+	RETURN 20;
+end $$;
+create table testdef(a int, b int default bdefault_unsafe(), c int default \
cdefault_restricted(), d int default ddefault_safe()); +create table test_data(a \
int); +insert into test_data select * from generate_series(1,10);
+alter table testdef parallel dml safe;
+--
+-- END: setup some tables and data needed by the tests.
+--
+-- encourage use of parallel plans
+set parallel_setup_cost=0;
+set parallel_tuple_cost=0;
+set min_parallel_table_scan_size=0;
+set max_parallel_workers_per_gather=4;
+create table para_insert_p1 (
+    unique1    int4 PRIMARY KEY,
+    stringu1    name
+);
+create table para_insert_f1 (
+    unique1    int4 REFERENCES para_insert_p1(unique1),
+    stringu1    name
+);
+alter table para_insert_p1 parallel dml safe;
+alter table para_insert_f1 parallel dml safe;
+-- Check FK trigger
+select pg_class_relname(classid), proparallel from \
pg_get_parallel_safety('para_insert_f1'); + pg_class_relname | proparallel 
+------------------+-------------
+ pg_proc          | r
+ pg_trigger       | r
+ pg_proc          | r
+ pg_trigger       | r
+(4 rows)
+
+select pg_get_max_parallel_hazard('para_insert_f1');
+ pg_get_max_parallel_hazard 
+----------------------------
+ r
+(1 row)
+
+--
+-- Test INSERT with underlying query.
+-- (should create plan with parallel SELECT, Gather parent node)
+--
+explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1;
+               QUERY PLAN               
+----------------------------------------
+ Insert on para_insert_p1
+   ->  Gather
+         Workers Planned: 4
+         ->  Parallel Seq Scan on tenk1
+(4 rows)
+
+insert into para_insert_p1 select unique1, stringu1 from tenk1;
+-- select some values to verify that the parallel insert worked
+select count(*), sum(unique1) from para_insert_p1;
+ count |   sum    
+-------+----------
+ 10000 | 49995000
+(1 row)
+
+-- verify that the same transaction has been used by all parallel workers
+select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt;
+ count 
+-------
+     1
+(1 row)
+
+--
+-- Test INSERT with ordered underlying query.
+-- (should create plan with parallel SELECT, GatherMerge parent node)
+--
+truncate para_insert_p1 cascade;
+NOTICE:  truncate cascades to table "para_insert_f1"
+explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1 \
order by unique1; +                  QUERY PLAN                  
+----------------------------------------------
+ Insert on para_insert_p1
+   ->  Gather Merge
+         Workers Planned: 4
+         ->  Sort
+               Sort Key: tenk1.unique1
+               ->  Parallel Seq Scan on tenk1
+(6 rows)
+
+insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1;
+-- select some values to verify that the parallel insert worked
+select count(*), sum(unique1) from para_insert_p1;
+ count |   sum    
+-------+----------
+ 10000 | 49995000
+(1 row)
+
+-- verify that the same transaction has been used by all parallel workers
+select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt;
+ count 
+-------
+     1
+(1 row)
+
+--
+-- Test INSERT with RETURNING clause.
+-- (should create plan with parallel SELECT, Gather parent node)
+--
+create table test_data1(like test_data);
+alter table test_data1 parallel dml safe;
+explain (costs off) insert into test_data1 select * from test_data where a = 10 \
returning a as data; +                 QUERY PLAN                 
+--------------------------------------------
+ Insert on test_data1
+   ->  Gather
+         Workers Planned: 3
+         ->  Parallel Seq Scan on test_data
+               Filter: (a = 10)
+(5 rows)
+
+insert into test_data1 select * from test_data where a = 10 returning a as data;
+ data 
+------
+   10
+(1 row)
+
+--
+-- Test INSERT into a table with a foreign key.
+-- (Insert into a table with a foreign key is parallel-restricted,
+--  as doing this in a parallel worker would create a new commandId
+--  and within a worker this is not currently supported)
+--
+explain (costs off) insert into para_insert_f1 select unique1, stringu1 from tenk1;
+               QUERY PLAN               
+----------------------------------------
+ Insert on para_insert_f1
+   ->  Gather
+         Workers Planned: 4
+         ->  Parallel Seq Scan on tenk1
+(4 rows)
+
+insert into para_insert_f1 select unique1, stringu1 from tenk1;
+-- select some values to verify that the insert worked
+select count(*), sum(unique1) from para_insert_f1;
+ count |   sum    
+-------+----------
+ 10000 | 49995000
+(1 row)
+
+--
+-- Test INSERT with ON CONFLICT ... DO UPDATE ...
+-- (should not create a parallel plan)
+--
+create table test_conflict_table(id serial primary key, somedata int);
+alter table test_conflict_table parallel dml safe;
+explain (costs off) insert into test_conflict_table(id, somedata) select a, a from \
test_data; +                 QUERY PLAN                 
+--------------------------------------------
+ Insert on test_conflict_table
+   ->  Gather
+         Workers Planned: 3
+         ->  Parallel Seq Scan on test_data
+(4 rows)
+
+insert into test_conflict_table(id, somedata) select a, a from test_data;
+explain (costs off) insert into test_conflict_table(id, somedata) select a, a from \
test_data ON CONFLICT(id) DO UPDATE SET somedata = EXCLUDED.somedata + 1; +           \
QUERY PLAN                       \
+------------------------------------------------------ + Insert on \
test_conflict_table +   Conflict Resolution: UPDATE
+   Conflict Arbiter Indexes: test_conflict_table_pkey
+   ->  Seq Scan on test_data
+(4 rows)
+
+--
+-- Test INSERT with parallel-unsafe index expression
+--
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('names2');
+ pg_class_relname | proparallel 
+------------------+-------------
+ pg_proc          | u
+ pg_index         | u
+(2 rows)
+
+select pg_get_max_parallel_hazard('names2');
+ pg_get_max_parallel_hazard 
+----------------------------
+ u
+(1 row)
+
+alter table names2 parallel dml safe;
+-- insert into names2 select * from names returning *;
+--
+-- Test INSERT with parallel-restricted index expression
+--
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('names4');
+ pg_class_relname | proparallel 
+------------------+-------------
+ pg_proc          | r
+ pg_index         | r
+(2 rows)
+
+select pg_get_max_parallel_hazard('names4');
+ pg_get_max_parallel_hazard 
+----------------------------
+ r
+(1 row)
+
+--
+-- Test INSERT with underlying query - and RETURNING (no projection)
+-- (should create a parallel plan; parallel SELECT)
+--
+create table names5 (like names);
+explain (costs off) insert into names5 select * from names returning *;
+       QUERY PLAN        
+-------------------------
+ Insert on names5
+   ->  Seq Scan on names
+(2 rows)
+
+--
+-- Test INSERT with underlying ordered query - and RETURNING (no projection)
+-- (should create a parallel plan; parallel SELECT)
+--
+create table names6 (like names);
+alter table names6 parallel dml safe;
+explain (costs off) insert into names6 select * from names order by last_name \
returning *; +                  QUERY PLAN                  
+----------------------------------------------
+ Insert on names6
+   ->  Gather Merge
+         Workers Planned: 3
+         ->  Sort
+               Sort Key: names.last_name
+               ->  Parallel Seq Scan on names
+(6 rows)
+
+insert into names6 select * from names order by last_name returning *;
+ index | first_name |  last_name  
+-------+------------+-------------
+     2 | niels      | bohr
+     1 | albert     | einstein
+     4 | leonhard   | euler
+     8 | richard    | feynman
+     5 | stephen    | hawking
+     6 | isaac      | newton
+     3 | erwin      | schrodinger
+     7 | alan       | turing
+(8 rows)
+
+--
+-- Test INSERT with underlying ordered query - and RETURNING (with projection)
+-- (should create a parallel plan; parallel SELECT)
+--
+create table names7 (like names);
+alter table names7 parallel dml safe;
+explain (costs off) insert into names7 select * from names order by last_name \
returning last_name || ', ' || first_name as last_name_then_first_name; +             \
QUERY PLAN                   +----------------------------------------------
+ Insert on names7
+   ->  Gather Merge
+         Workers Planned: 3
+         ->  Sort
+               Sort Key: names.last_name
+               ->  Parallel Seq Scan on names
+(6 rows)
+
+insert into names7 select * from names order by last_name returning last_name || ', \
' || first_name as last_name_then_first_name; + last_name_then_first_name 
+---------------------------
+ bohr, niels
+ einstein, albert
+ euler, leonhard
+ feynman, richard
+ hawking, stephen
+ newton, isaac
+ schrodinger, erwin
+ turing, alan
+(8 rows)
+
+--
+-- Test INSERT into temporary table with underlying query.
+-- (Insert into a temp table is parallel-restricted;
+-- should create a parallel plan; parallel SELECT)
+--
+create temporary table temp_names (like names);
+alter table temp_names parallel dml restricted;
+select pg_class_relname(classid), proparallel from \
pg_get_parallel_safety('temp_names'); + pg_class_relname | proparallel 
+------------------+-------------
+ pg_class         | r
+(1 row)
+
+select pg_get_max_parallel_hazard('temp_names');
+ pg_get_max_parallel_hazard 
+----------------------------
+ r
+(1 row)
+
+explain (costs off) insert into temp_names select * from names;
+               QUERY PLAN               
+----------------------------------------
+ Insert on temp_names
+   ->  Gather
+         Workers Planned: 3
+         ->  Parallel Seq Scan on names
+(4 rows)
+
+insert into temp_names select * from names;
+--
+-- Test INSERT with column defaults
+--
+--
+--
+-- Parallel INSERT with unsafe column default, should not use a parallel plan
+--
+explain (costs off) insert into testdef(a,c,d) select a,a*4,a*8 from test_data;
+         QUERY PLAN          
+-----------------------------
+ Insert on testdef
+   ->  Seq Scan on test_data
+(2 rows)
+
+--
+-- Parallel INSERT with restricted column default, should use parallel SELECT
+--
+explain (costs off) insert into testdef(a,b,d) select a,a*2,a*8 from test_data;
+                 QUERY PLAN                 
+--------------------------------------------
+ Insert on testdef
+   ->  Gather
+         Workers Planned: 3
+         ->  Parallel Seq Scan on test_data
+(4 rows)
+
+insert into testdef(a,b,d) select a,a*2,a*8 from test_data;
+select * from testdef order by a;
+ a  | b  | c  | d  
+----+----+----+----
+  1 |  2 | 10 |  8
+  2 |  4 | 10 | 16
+  3 |  6 | 10 | 24
+  4 |  8 | 10 | 32
+  5 | 10 | 10 | 40
+  6 | 12 | 10 | 48
+  7 | 14 | 10 | 56
+  8 | 16 | 10 | 64
+  9 | 18 | 10 | 72
+ 10 | 20 | 10 | 80
+(10 rows)
+
+truncate testdef;
+--
+-- Parallel INSERT with restricted and unsafe column defaults, should not use a \
parallel plan +--
+explain (costs off) insert into testdef(a,d) select a,a*8 from test_data;
+         QUERY PLAN          
+-----------------------------
+ Insert on testdef
+   ->  Seq Scan on test_data
+(2 rows)
+
+--
+-- Test INSERT into partition with underlying query.
+--
+create table parttable1 (a int, b name) partition by range (a);
+create table parttable1_1 partition of parttable1 for values from (0) to (5000);
+create table parttable1_2 partition of parttable1 for values from (5000) to (10000);
+alter table parttable1 parallel dml safe;
+explain (costs off) insert into parttable1 select unique1,stringu1 from tenk1;
+               QUERY PLAN               
+----------------------------------------
+ Insert on parttable1
+   ->  Gather
+         Workers Planned: 4
+         ->  Parallel Seq Scan on tenk1
+(4 rows)
+
+insert into parttable1 select unique1,stringu1 from tenk1;
+select count(*) from parttable1_1;
+ count 
+-------
+  5000
+(1 row)
+
+select count(*) from parttable1_2;
+ count 
+-------
+  5000
+(1 row)
+
+--
+-- Test table with parallel-unsafe check constraint
+-- (should not create a parallel plan)
+--
+create or replace function check_b_unsafe(b name) returns boolean as $$
+    begin
+        return (b <> 'XXXXXX');
+    end;
+$$ language plpgsql parallel unsafe;
+create table table_check_b(a int4, b name check (check_b_unsafe(b)), c name);
+alter table table_check_b parallel dml safe;
+select pg_class_relname(classid), proparallel from \
pg_get_parallel_safety('table_check_b'); + pg_class_relname | proparallel 
+------------------+-------------
+ pg_proc          | u
+ pg_constraint    | u
+(2 rows)
+
+select pg_get_max_parallel_hazard('table_check_b');
+ pg_get_max_parallel_hazard 
+----------------------------
+ u
+(1 row)
+
+-- insert into table_check_b select * from names;
+--
+-- Test table with parallel-safe before stmt-level triggers
+-- (should create a parallel SELECT plan; triggers should fire)
+--
+create table names_with_safe_trigger (like names);
+alter table names_with_safe_trigger parallel dml safe;
+create or replace function insert_before_trigger_safe() returns trigger as $$
+    begin
+        raise notice 'hello from insert_before_trigger_safe';
+		return new;
+    end;
+$$ language plpgsql parallel safe;
+create trigger insert_before_trigger_safe before insert on names_with_safe_trigger
+    for each statement execute procedure insert_before_trigger_safe();
+select pg_class_relname(classid), proparallel from \
pg_get_parallel_safety('names_with_safe_trigger'); + pg_class_relname | proparallel 
+------------------+-------------
+(0 rows)
+
+select pg_get_max_parallel_hazard('names_with_safe_trigger');
+ pg_get_max_parallel_hazard 
+----------------------------
+ s
+(1 row)
+
+insert into names_with_safe_trigger select * from names;
+NOTICE:  hello from insert_before_trigger_safe
+--
+-- Test table with parallel-unsafe before stmt-level triggers
+-- (should not create a parallel plan; triggers should fire)
+--
+create table names_with_unsafe_trigger (like names);
+alter table names_with_unsafe_trigger parallel dml safe;
+create or replace function insert_before_trigger_unsafe() returns trigger as $$
+    begin
+        raise notice 'hello from insert_before_trigger_unsafe';
+		return new;
+    end;
+$$ language plpgsql parallel unsafe;
+create trigger insert_before_trigger_unsafe before insert on \
names_with_unsafe_trigger +    for each statement execute procedure \
insert_before_trigger_unsafe(); +select pg_class_relname(classid), proparallel from \
pg_get_parallel_safety('names_with_unsafe_trigger'); + pg_class_relname | proparallel \
 +------------------+-------------
+ pg_proc          | u
+ pg_trigger       | u
+(2 rows)
+
+select pg_get_max_parallel_hazard('names_with_unsafe_trigger');
+ pg_get_max_parallel_hazard 
+----------------------------
+ u
+(1 row)
+
+-- insert into names_with_unsafe_trigger select * from names;
+--
+-- Test partition with parallel-unsafe trigger
+-- (should not create a parallel plan)
+--
+create table part_unsafe_trigger (a int4, b name) partition by range (a);
+alter table names_with_unsafe_trigger parallel dml safe;
+create table part_unsafe_trigger_1 partition of part_unsafe_trigger for values from \
(0) to (5000); +create table part_unsafe_trigger_2 partition of part_unsafe_trigger \
for values from (5000) to (10000); +create trigger part_insert_before_trigger_unsafe \
before insert on part_unsafe_trigger_1 +    for each statement execute procedure \
insert_before_trigger_unsafe(); +select pg_class_relname(classid), proparallel from \
pg_get_parallel_safety('part_unsafe_trigger'); + pg_class_relname | proparallel 
+------------------+-------------
+ pg_proc          | u
+ pg_trigger       | u
+(2 rows)
+
+select pg_get_max_parallel_hazard('part_unsafe_trigger');
+ pg_get_max_parallel_hazard 
+----------------------------
+ u
+(1 row)
+
+-- insert into names_with_unsafe_trigger select * from names;
+--
+-- Test DOMAIN column with a CHECK constraint
+--
+create function sql_is_distinct_from_u(anyelement, anyelement)
+returns boolean language sql parallel unsafe
+as 'select $1 is distinct from $2 limit 1';
+create domain inotnull_u int
+  check (sql_is_distinct_from_u(value, null));
+create table dom_table_u (x inotnull_u, y int);
+-- Test DOMAIN column with parallel-unsafe CHECK constraint
+select pg_class_relname(classid), proparallel from \
pg_get_parallel_safety('dom_table_u'); + pg_class_relname | proparallel 
+------------------+-------------
+ pg_proc          | u
+ pg_constraint    | u
+(2 rows)
+
+select pg_get_max_parallel_hazard('dom_table_u');
+ pg_get_max_parallel_hazard 
+----------------------------
+ u
+(1 row)
+
+--
+-- Clean up anything not created in the transaction
+--
+drop table names;
+drop index names2_fullname_idx;
+drop table names2;
+drop index names4_fullname_idx;
+drop table names4;
+drop table testdef;
+drop table test_data;
+drop function bdefault_unsafe;
+drop function cdefault_restricted;
+drop function ddefault_safe;
+drop function fullname_parallel_unsafe;
+drop function fullname_parallel_restricted;
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 22b0d35..46fa6b7 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -96,6 +96,7 @@ test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8
 # run by itself so it can run parallel workers
 test: select_parallel
 test: write_parallel
+test: insert_parallel
 
 # no relation related tests can be put in this group
 test: publication subscription
diff --git a/src/test/regress/sql/insert_parallel.sql \
b/src/test/regress/sql/insert_parallel.sql new file mode 100644
index 0000000..0686cb6
--- /dev/null
+++ b/src/test/regress/sql/insert_parallel.sql
@@ -0,0 +1,346 @@
+--
+-- PARALLEL
+--
+
+--
+-- START: setup some tables and data needed by the tests.
+--
+
+-- Setup - index expressions test
+
+create function pg_class_relname(Oid)
+returns name language sql parallel unsafe
+as 'select relname from pg_class where $1 = oid';
+
+-- For testing purposes, we'll mark this function as parallel-unsafe
+create or replace function fullname_parallel_unsafe(f text, l text) returns text as \
$$ +    begin
+        return f || l;
+    end;
+$$ language plpgsql immutable parallel unsafe;
+
+create or replace function fullname_parallel_restricted(f text, l text) returns text \
as $$ +    begin
+        return f || l;
+    end;
+$$ language plpgsql immutable parallel restricted;
+
+create table names(index int, first_name text, last_name text);
+create table names2(index int, first_name text, last_name text);
+create index names2_fullname_idx on names2 (fullname_parallel_unsafe(first_name, \
last_name)); +create table names4(index int, first_name text, last_name text);
+create index names4_fullname_idx on names4 (fullname_parallel_restricted(first_name, \
last_name)); +
+alter table names2 parallel dml safe;
+alter table names4 parallel dml safe;
+
+
+insert into names values
+    (1, 'albert', 'einstein'),
+    (2, 'niels', 'bohr'),
+    (3, 'erwin', 'schrodinger'),
+    (4, 'leonhard', 'euler'),
+    (5, 'stephen', 'hawking'),
+    (6, 'isaac', 'newton'),
+    (7, 'alan', 'turing'),
+    (8, 'richard', 'feynman');
+
+-- Setup - column default tests
+
+create or replace function bdefault_unsafe ()
+returns int language plpgsql parallel unsafe as $$
+begin
+	RETURN 5;
+end $$;
+
+create or replace function cdefault_restricted ()
+returns int language plpgsql parallel restricted as $$
+begin
+	RETURN 10;
+end $$;
+
+create or replace function ddefault_safe ()
+returns int language plpgsql parallel safe as $$
+begin
+	RETURN 20;
+end $$;
+
+create table testdef(a int, b int default bdefault_unsafe(), c int default \
cdefault_restricted(), d int default ddefault_safe()); +create table test_data(a \
int); +insert into test_data select * from generate_series(1,10);
+alter table testdef parallel dml safe;
+
+
+--
+-- END: setup some tables and data needed by the tests.
+--
+
+-- encourage use of parallel plans
+set parallel_setup_cost=0;
+set parallel_tuple_cost=0;
+set min_parallel_table_scan_size=0;
+set max_parallel_workers_per_gather=4;
+
+create table para_insert_p1 (
+    unique1    int4 PRIMARY KEY,
+    stringu1    name
+);
+
+create table para_insert_f1 (
+    unique1    int4 REFERENCES para_insert_p1(unique1),
+    stringu1    name
+);
+
+alter table para_insert_p1 parallel dml safe;
+alter table para_insert_f1 parallel dml safe;
+
+-- Check FK trigger
+select pg_class_relname(classid), proparallel from \
pg_get_parallel_safety('para_insert_f1'); +select \
pg_get_max_parallel_hazard('para_insert_f1'); +
+--
+-- Test INSERT with underlying query.
+-- (should create plan with parallel SELECT, Gather parent node)
+--
+explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1;
+insert into para_insert_p1 select unique1, stringu1 from tenk1;
+-- select some values to verify that the parallel insert worked
+select count(*), sum(unique1) from para_insert_p1;
+-- verify that the same transaction has been used by all parallel workers
+select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt;
+
+--
+-- Test INSERT with ordered underlying query.
+-- (should create plan with parallel SELECT, GatherMerge parent node)
+--
+truncate para_insert_p1 cascade;
+explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1 \
order by unique1; +insert into para_insert_p1 select unique1, stringu1 from tenk1 \
order by unique1; +-- select some values to verify that the parallel insert worked
+select count(*), sum(unique1) from para_insert_p1;
+-- verify that the same transaction has been used by all parallel workers
+select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt;
+
+--
+-- Test INSERT with RETURNING clause.
+-- (should create plan with parallel SELECT, Gather parent node)
+--
+create table test_data1(like test_data);
+alter table test_data1 parallel dml safe;
+explain (costs off) insert into test_data1 select * from test_data where a = 10 \
returning a as data; +insert into test_data1 select * from test_data where a = 10 \
returning a as data; +
+--
+-- Test INSERT into a table with a foreign key.
+-- (Insert into a table with a foreign key is parallel-restricted,
+--  as doing this in a parallel worker would create a new commandId
+--  and within a worker this is not currently supported)
+--
+explain (costs off) insert into para_insert_f1 select unique1, stringu1 from tenk1;
+insert into para_insert_f1 select unique1, stringu1 from tenk1;
+-- select some values to verify that the insert worked
+select count(*), sum(unique1) from para_insert_f1;
+
+--
+-- Test INSERT with ON CONFLICT ... DO UPDATE ...
+-- (should not create a parallel plan)
+--
+create table test_conflict_table(id serial primary key, somedata int);
+alter table test_conflict_table parallel dml safe;
+explain (costs off) insert into test_conflict_table(id, somedata) select a, a from \
test_data; +insert into test_conflict_table(id, somedata) select a, a from test_data;
+explain (costs off) insert into test_conflict_table(id, somedata) select a, a from \
test_data ON CONFLICT(id) DO UPDATE SET somedata = EXCLUDED.somedata + 1; +
+
+--
+-- Test INSERT with parallel-unsafe index expression
+--
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('names2');
+select pg_get_max_parallel_hazard('names2');
+alter table names2 parallel dml safe;
+-- insert into names2 select * from names returning *;
+
+--
+-- Test INSERT with parallel-restricted index expression
+--
+select pg_class_relname(classid), proparallel from pg_get_parallel_safety('names4');
+select pg_get_max_parallel_hazard('names4');
+
+--
+-- Test INSERT with underlying query - and RETURNING (no projection)
+-- (should create a parallel plan; parallel SELECT)
+--
+create table names5 (like names);
+explain (costs off) insert into names5 select * from names returning *;
+
+--
+-- Test INSERT with underlying ordered query - and RETURNING (no projection)
+-- (should create a parallel plan; parallel SELECT)
+--
+create table names6 (like names);
+alter table names6 parallel dml safe;
+explain (costs off) insert into names6 select * from names order by last_name \
returning *; +insert into names6 select * from names order by last_name returning *;
+
+--
+-- Test INSERT with underlying ordered query - and RETURNING (with projection)
+-- (should create a parallel plan; parallel SELECT)
+--
+create table names7 (like names);
+alter table names7 parallel dml safe;
+explain (costs off) insert into names7 select * from names order by last_name \
returning last_name || ', ' || first_name as last_name_then_first_name; +insert into \
names7 select * from names order by last_name returning last_name || ', ' || \
first_name as last_name_then_first_name; +
+
+--
+-- Test INSERT into temporary table with underlying query.
+-- (Insert into a temp table is parallel-restricted;
+-- should create a parallel plan; parallel SELECT)
+--
+create temporary table temp_names (like names);
+alter table temp_names parallel dml restricted;
+select pg_class_relname(classid), proparallel from \
pg_get_parallel_safety('temp_names'); +select \
pg_get_max_parallel_hazard('temp_names'); +explain (costs off) insert into temp_names \
select * from names; +insert into temp_names select * from names;
+
+--
+-- Test INSERT with column defaults
+--
+--
+
+--
+-- Parallel INSERT with unsafe column default, should not use a parallel plan
+--
+explain (costs off) insert into testdef(a,c,d) select a,a*4,a*8 from test_data;
+
+--
+-- Parallel INSERT with restricted column default, should use parallel SELECT
+--
+explain (costs off) insert into testdef(a,b,d) select a,a*2,a*8 from test_data;
+insert into testdef(a,b,d) select a,a*2,a*8 from test_data;
+select * from testdef order by a;
+truncate testdef;
+
+--
+-- Parallel INSERT with restricted and unsafe column defaults, should not use a \
parallel plan +--
+explain (costs off) insert into testdef(a,d) select a,a*8 from test_data;
+
+--
+-- Test INSERT into partition with underlying query.
+--
+create table parttable1 (a int, b name) partition by range (a);
+create table parttable1_1 partition of parttable1 for values from (0) to (5000);
+create table parttable1_2 partition of parttable1 for values from (5000) to (10000);
+
+alter table parttable1 parallel dml safe;
+
+explain (costs off) insert into parttable1 select unique1,stringu1 from tenk1;
+insert into parttable1 select unique1,stringu1 from tenk1;
+select count(*) from parttable1_1;
+select count(*) from parttable1_2;
+
+--
+-- Test table with parallel-unsafe check constraint
+-- (should not create a parallel plan)
+--
+create or replace function check_b_unsafe(b name) returns boolean as $$
+    begin
+        return (b <> 'XXXXXX');
+    end;
+$$ language plpgsql parallel unsafe;
+
+create table table_check_b(a int4, b name check (check_b_unsafe(b)), c name);
+alter table table_check_b parallel dml safe;
+select pg_class_relname(classid), proparallel from \
pg_get_parallel_safety('table_check_b'); +select \
pg_get_max_parallel_hazard('table_check_b'); +-- insert into table_check_b select * \
from names; +
+--
+-- Test table with parallel-safe before stmt-level triggers
+-- (should create a parallel SELECT plan; triggers should fire)
+--
+create table names_with_safe_trigger (like names);
+alter table names_with_safe_trigger parallel dml safe;
+
+create or replace function insert_before_trigger_safe() returns trigger as $$
+    begin
+        raise notice 'hello from insert_before_trigger_safe';
+		return new;
+    end;
+$$ language plpgsql parallel safe;
+create trigger insert_before_trigger_safe before insert on names_with_safe_trigger
+    for each statement execute procedure insert_before_trigger_safe();
+select pg_class_relname(classid), proparallel from \
pg_get_parallel_safety('names_with_safe_trigger'); +select \
pg_get_max_parallel_hazard('names_with_safe_trigger'); +insert into \
names_with_safe_trigger select * from names; +
+--
+-- Test table with parallel-unsafe before stmt-level triggers
+-- (should not create a parallel plan; triggers should fire)
+--
+create table names_with_unsafe_trigger (like names);
+alter table names_with_unsafe_trigger parallel dml safe;
+create or replace function insert_before_trigger_unsafe() returns trigger as $$
+    begin
+        raise notice 'hello from insert_before_trigger_unsafe';
+		return new;
+    end;
+$$ language plpgsql parallel unsafe;
+create trigger insert_before_trigger_unsafe before insert on \
names_with_unsafe_trigger +    for each statement execute procedure \
insert_before_trigger_unsafe(); +select pg_class_relname(classid), proparallel from \
pg_get_parallel_safety('names_with_unsafe_trigger'); +select \
pg_get_max_parallel_hazard('names_with_unsafe_trigger'); +-- insert into \
names_with_unsafe_trigger select * from names; +
+--
+-- Test partition with parallel-unsafe trigger
+-- (should not create a parallel plan)
+--
+
+create table part_unsafe_trigger (a int4, b name) partition by range (a);
+alter table names_with_unsafe_trigger parallel dml safe;
+create table part_unsafe_trigger_1 partition of part_unsafe_trigger for values from \
(0) to (5000); +create table part_unsafe_trigger_2 partition of part_unsafe_trigger \
for values from (5000) to (10000); +create trigger part_insert_before_trigger_unsafe \
before insert on part_unsafe_trigger_1 +    for each statement execute procedure \
insert_before_trigger_unsafe(); +
+select pg_class_relname(classid), proparallel from \
pg_get_parallel_safety('part_unsafe_trigger'); +select \
pg_get_max_parallel_hazard('part_unsafe_trigger'); +-- insert into \
names_with_unsafe_trigger select * from names; +
+--
+-- Test DOMAIN column with a CHECK constraint
+--
+create function sql_is_distinct_from_u(anyelement, anyelement)
+returns boolean language sql parallel unsafe
+as 'select $1 is distinct from $2 limit 1';
+
+create domain inotnull_u int
+  check (sql_is_distinct_from_u(value, null));
+
+create table dom_table_u (x inotnull_u, y int);
+
+
+-- Test DOMAIN column with parallel-unsafe CHECK constraint
+select pg_class_relname(classid), proparallel from \
pg_get_parallel_safety('dom_table_u'); +select \
pg_get_max_parallel_hazard('dom_table_u'); +
+--
+-- Clean up anything not created in the transaction
+--
+
+drop table names;
+drop index names2_fullname_idx;
+drop table names2;
+drop index names4_fullname_idx;
+drop table names4;
+drop table testdef;
+drop table test_data;
+
+drop function bdefault_unsafe;
+drop function cdefault_restricted;
+drop function ddefault_safe;
+drop function fullname_parallel_unsafe;
+drop function fullname_parallel_restricted;
-- 
2.7.2.windows.1


["v6-0001-CREATE-ALTER-TABLE-PARALLEL-DML.patch" (application/octet-stream)]

From 055879d3a797e2a2f18a70b9e33fa8d613edcf6b Mon Sep 17 00:00:00 2001
From: houzj <houzj.fnst@fujitsu.com>
Date: Mon, 31 May 2021 09:28:09 +0800
Subject: [PATCH 1/3] CREATE-ALTER-TABLE-PARALLEL-DML

Enable users to declare a table's parallel data-modification safety
(SAFE/RESTRICTED/UNSAFE).

Add a table property that represents parallel safety of a table for
DML statement execution.

It may be specified as follows:
CREATE TABLE table_name PARALLEL DML { UNSAFE | RESTRICTED | SAFE };
ALTER TABLE table_name PARALLEL DML { UNSAFE | RESTRICTED | SAFE };

This property is recorded in pg_class's relparallel column as 'u',
'r', or 's', just like pg_proc's proparallel.
The default is UNSAFE.

The planner assumes that all of the table, its descendant partitions,
and their ancillary objects have, at worst, the specified parallel
safety. The user is responsible for its correctness.

---
 src/backend/bootstrap/bootparse.y                  |  3 +
 src/backend/catalog/heap.c                         |  7 +-
 src/backend/catalog/index.c                        |  2 +
 src/backend/catalog/toasting.c                     |  1 +
 src/backend/commands/cluster.c                     |  1 +
 src/backend/commands/createas.c                    |  1 +
 src/backend/commands/sequence.c                    |  1 +
 src/backend/commands/tablecmds.c                   | 87 ++++++++++++++++++++++
 src/backend/commands/typecmds.c                    |  1 +
 src/backend/commands/view.c                        |  1 +
 src/backend/nodes/copyfuncs.c                      |  1 +
 src/backend/nodes/equalfuncs.c                     |  2 +
 src/backend/nodes/outfuncs.c                       |  2 +
 src/backend/nodes/readfuncs.c                      |  1 +
 src/backend/parser/gram.y                          | 65 +++++++++++-----
 src/backend/utils/cache/relcache.c                 |  6 +-
 src/bin/pg_dump/pg_dump.c                          | 47 +++++++++---
 src/bin/pg_dump/pg_dump.h                          |  1 +
 src/include/catalog/heap.h                         |  2 +
 src/include/catalog/pg_class.h                     |  3 +
 src/include/nodes/parsenodes.h                     |  4 +-
 src/include/nodes/primnodes.h                      |  1 +
 src/include/parser/kwlist.h                        |  1 +
 src/include/utils/relcache.h                       |  3 +-
 .../modules/test_ddl_deparse/test_ddl_deparse.c    |  3 +
 25 files changed, 215 insertions(+), 32 deletions(-)

diff --git a/src/backend/bootstrap/bootparse.y b/src/backend/bootstrap/bootparse.y
index 5fcd004..88fcd57 100644
--- a/src/backend/bootstrap/bootparse.y
+++ b/src/backend/bootstrap/bootparse.y
@@ -25,6 +25,7 @@
 #include "catalog/pg_authid.h"
 #include "catalog/pg_class.h"
 #include "catalog/pg_namespace.h"
+#include "catalog/pg_proc.h"
 #include "catalog/pg_tablespace.h"
 #include "catalog/toasting.h"
 #include "commands/defrem.h"
@@ -208,6 +209,7 @@ Boot_CreateStmt:
 												   tupdesc,
 												   RELKIND_RELATION,
 												   RELPERSISTENCE_PERMANENT,
+												   PROPARALLEL_UNSAFE,
 												   shared_relation,
 												   mapped_relation,
 												   true,
@@ -231,6 +233,7 @@ Boot_CreateStmt:
 													  NIL,
 													  RELKIND_RELATION,
 													  RELPERSISTENCE_PERMANENT,
+													  PROPARALLEL_UNSAFE,
 													  shared_relation,
 													  mapped_relation,
 													  ONCOMMIT_NOOP,
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index afa830d..7b1152d 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -302,6 +302,7 @@ heap_create(const char *relname,
 			TupleDesc tupDesc,
 			char relkind,
 			char relpersistence,
+			char relparalleldml,
 			bool shared_relation,
 			bool mapped_relation,
 			bool allow_system_table_mods,
@@ -404,7 +405,8 @@ heap_create(const char *relname,
 									 shared_relation,
 									 mapped_relation,
 									 relpersistence,
-									 relkind);
+									 relkind,
+									 relparalleldml);
 
 	/*
 	 * Have the storage manager create the relation's disk file, if needed.
@@ -961,6 +963,7 @@ InsertPgClassTuple(Relation pg_class_desc,
 	values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
 	values[Anum_pg_class_relispopulated - 1] = BoolGetDatum(rd_rel->relispopulated);
 	values[Anum_pg_class_relreplident - 1] = CharGetDatum(rd_rel->relreplident);
+	values[Anum_pg_class_relparalleldml - 1] = CharGetDatum(rd_rel->relparalleldml);
 	values[Anum_pg_class_relispartition - 1] = BoolGetDatum(rd_rel->relispartition);
 	values[Anum_pg_class_relrewrite - 1] = ObjectIdGetDatum(rd_rel->relrewrite);
 	values[Anum_pg_class_relfrozenxid - 1] = \
TransactionIdGetDatum(rd_rel->relfrozenxid); @@ -1154,6 +1157,7 @@ \
heap_create_with_catalog(const char *relname,  List *cooked_constraints,
 						 char relkind,
 						 char relpersistence,
+						 char relparalleldml,
 						 bool shared_relation,
 						 bool mapped_relation,
 						 OnCommitAction oncommit,
@@ -1301,6 +1305,7 @@ heap_create_with_catalog(const char *relname,
 							   tupdesc,
 							   relkind,
 							   relpersistence,
+							   relparalleldml,
 							   shared_relation,
 							   mapped_relation,
 							   allow_system_table_mods,
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 50b7a16..ce2ae5a 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -50,6 +50,7 @@
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_opclass.h"
 #include "catalog/pg_operator.h"
+#include "catalog/pg_proc.h"
 #include "catalog/pg_tablespace.h"
 #include "catalog/pg_trigger.h"
 #include "catalog/pg_type.h"
@@ -935,6 +936,7 @@ index_create(Relation heapRelation,
 								indexTupDesc,
 								relkind,
 								relpersistence,
+								PROPARALLEL_UNSAFE,
 								shared_relation,
 								mapped_relation,
 								allow_system_table_mods,
diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c
index bf81f6c..d0a9fea 100644
--- a/src/backend/catalog/toasting.c
+++ b/src/backend/catalog/toasting.c
@@ -253,6 +253,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid,
 										   NIL,
 										   RELKIND_TOASTVALUE,
 										   rel->rd_rel->relpersistence,
+										   rel->rd_rel->relparalleldml,
 										   shared_relation,
 										   mapped_relation,
 										   ONCOMMIT_NOOP,
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 6487a9e..2151121 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -691,6 +691,7 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, char \
relpersistence,  NIL,
 										  RELKIND_RELATION,
 										  relpersistence,
+										  OldHeap->rd_rel->relparalleldml,
 										  false,
 										  RelationIsMapped(OldHeap),
 										  ONCOMMIT_NOOP,
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index dce8820..45aacc8 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -107,6 +107,7 @@ create_ctas_internal(List *attrList, IntoClause *into)
 	create->options = into->options;
 	create->oncommit = into->onCommit;
 	create->tablespacename = into->tableSpaceName;
+	create->paralleldmlsafety = into->paralleldmlsafety;
 	create->if_not_exists = false;
 	create->accessMethod = into->accessMethod;
 
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 0415df9..6f25c23 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -211,6 +211,7 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq)
 	stmt->options = NIL;
 	stmt->oncommit = ONCOMMIT_NOOP;
 	stmt->tablespacename = NULL;
+	stmt->paralleldmlsafety = NULL;
 	stmt->if_not_exists = seq->if_not_exists;
 
 	address = DefineRelation(stmt, RELKIND_SEQUENCE, seq->ownerId, NULL, NULL);
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 028e8ac..cb5a420 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -40,6 +40,7 @@
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_opclass.h"
+#include "catalog/pg_proc.h"
 #include "catalog/pg_tablespace.h"
 #include "catalog/pg_statistic_ext.h"
 #include "catalog/pg_trigger.h"
@@ -602,6 +603,7 @@ static void refuseDupeIndexAttach(Relation parentIdx, Relation \
partIdx,  static List *GetParentedForeignKeyRefs(Relation partition);
 static void ATDetachCheckNoForeignKeyRefs(Relation partition);
 static char GetAttributeCompression(Oid atttypid, char *compression);
+static void ATExecParallelDMLSafety(Relation rel, Node *def);
 
 
 /* ----------------------------------------------------------------
@@ -647,6 +649,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
 	LOCKMODE	parentLockmode;
 	const char *accessMethod = NULL;
 	Oid			accessMethodId = InvalidOid;
+	char		relparalleldml = PROPARALLEL_UNSAFE;
 
 	/*
 	 * Truncate relname to appropriate length (probably a waste of time, as
@@ -925,6 +928,28 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
 	if (accessMethod != NULL)
 		accessMethodId = get_table_am_oid(accessMethod, false);
 
+	if (stmt->paralleldmlsafety != NULL)
+	{
+		if (strcmp(stmt->paralleldmlsafety, "safe") == 0)
+		{
+			if (relkind == RELKIND_FOREIGN_TABLE ||
+				stmt->relation->relpersistence == RELPERSISTENCE_TEMP)
+				ereport(ERROR,
+						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+						errmsg("cannot support foreign or temporary table data modification by \
parallel workers"))); +
+			relparalleldml = PROPARALLEL_SAFE;
+		}
+		else if (strcmp(stmt->paralleldmlsafety, "restricted") == 0)
+			relparalleldml = PROPARALLEL_RESTRICTED;
+		else if (strcmp(stmt->paralleldmlsafety, "unsafe") == 0)
+			relparalleldml = PROPARALLEL_UNSAFE;
+		else
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					errmsg("parameter \"parallel dml\" must be SAFE, RESTRICTED, or UNSAFE")));
+	}
+
 	/*
 	 * Create the relation.  Inherited defaults and constraints are passed in
 	 * for immediate handling --- since they don't need parsing, they can be
@@ -943,6 +968,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
 													  old_constraints),
 										  relkind,
 										  stmt->relation->relpersistence,
+										  relparalleldml,
 										  false,
 										  false,
 										  stmt->oncommit,
@@ -4184,6 +4210,7 @@ AlterTableGetLockLevel(List *cmds)
 			case AT_SetIdentity:
 			case AT_DropExpression:
 			case AT_SetCompression:
+			case AT_ParallelDMLSafety:
 				cmd_lockmode = AccessExclusiveLock;
 				break;
 
@@ -4717,6 +4744,11 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
 			/* No command-specific prep needed */
 			pass = AT_PASS_MISC;
 			break;
+		case AT_ParallelDMLSafety:
+			ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
+			/* No command-specific prep needed */
+			pass = AT_PASS_MISC;
+			break;
 		default:				/* oops */
 			elog(ERROR, "unrecognized alter table type: %d",
 				 (int) cmd->subtype);
@@ -5119,6 +5151,9 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab,
 		case AT_DetachPartitionFinalize:
 			ATExecDetachPartitionFinalize(rel, ((PartitionCmd *) cmd->def)->name);
 			break;
+		case AT_ParallelDMLSafety:
+			ATExecParallelDMLSafety(rel, cmd->def);
+			break;
 		default:				/* oops */
 			elog(ERROR, "unrecognized alter table type: %d",
 				 (int) cmd->subtype);
@@ -18614,3 +18649,55 @@ GetAttributeCompression(Oid atttypid, char *compression)
 
 	return cmethod;
 }
+
+static void
+ATExecParallelDMLSafety(Relation rel, Node *def)
+{
+	Relation	pg_class;
+	Oid			relid;
+	HeapTuple	tuple;
+	char		relparallel = PROPARALLEL_SAFE;
+	char	   *parallel = strVal(def);
+
+	if (parallel)
+	{
+		if (strcmp(parallel, "safe") == 0)
+		{
+			/*
+			 * We can't support table modification in a parallel worker if it's a
+			 * foreign table/partition (no FDW API for supporting parallel access) or
+			 * a temporary table.
+			 */
+			if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE ||
+				RelationUsesLocalBuffers(rel))
+					ereport(ERROR,
+							(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+							 errmsg("cannot support foreign or temporary table data modification by \
parallel workers"))); +
+			relparallel = PROPARALLEL_SAFE;
+		}
+		else if (strcmp(parallel, "restricted") == 0)
+			relparallel = PROPARALLEL_RESTRICTED;
+		else if (strcmp(parallel, "unsafe") == 0)
+			relparallel = PROPARALLEL_UNSAFE;
+		else
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("parameter \"parallel dml\" must be SAFE, RESTRICTED, or UNSAFE")));
+	}
+
+	relid = RelationGetRelid(rel);
+
+	pg_class = table_open(RelationRelationId, RowExclusiveLock);
+
+	tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
+
+	if (!HeapTupleIsValid(tuple))
+		elog(ERROR, "cache lookup failed for relation %u", relid);
+
+	((Form_pg_class) GETSTRUCT(tuple))->relparalleldml = relparallel;
+	CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
+
+	table_close(pg_class, RowExclusiveLock);
+	heap_freetuple(tuple);
+}
diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c
index 58ec65c..8baebe0 100644
--- a/src/backend/commands/typecmds.c
+++ b/src/backend/commands/typecmds.c
@@ -2540,6 +2540,7 @@ DefineCompositeType(RangeVar *typevar, List *coldeflist)
 	createStmt->options = NIL;
 	createStmt->oncommit = ONCOMMIT_NOOP;
 	createStmt->tablespacename = NULL;
+	createStmt->paralleldmlsafety = NULL;
 	createStmt->if_not_exists = false;
 
 	/*
diff --git a/src/backend/commands/view.c b/src/backend/commands/view.c
index f2642db..2d77a88 100644
--- a/src/backend/commands/view.c
+++ b/src/backend/commands/view.c
@@ -227,6 +227,7 @@ DefineVirtualRelation(RangeVar *relation, List *tlist, bool \
replace,  createStmt->options = options;
 		createStmt->oncommit = ONCOMMIT_NOOP;
 		createStmt->tablespacename = NULL;
+		createStmt->paralleldmlsafety = NULL;
 		createStmt->if_not_exists = false;
 
 		/*
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 90770a8..6bf8787 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -3531,6 +3531,7 @@ CopyCreateStmtFields(const CreateStmt *from, CreateStmt \
*newnode)  COPY_SCALAR_FIELD(oncommit);
 	COPY_STRING_FIELD(tablespacename);
 	COPY_STRING_FIELD(accessMethod);
+	COPY_STRING_FIELD(paralleldmlsafety);
 	COPY_SCALAR_FIELD(if_not_exists);
 }
 
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index ce76d09..aaed9d1 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -146,6 +146,7 @@ _equalIntoClause(const IntoClause *a, const IntoClause *b)
 	COMPARE_NODE_FIELD(options);
 	COMPARE_SCALAR_FIELD(onCommit);
 	COMPARE_STRING_FIELD(tableSpaceName);
+	COMPARE_STRING_FIELD(paralleldmlsafety);
 	COMPARE_NODE_FIELD(viewQuery);
 	COMPARE_SCALAR_FIELD(skipData);
 
@@ -1284,6 +1285,7 @@ _equalCreateStmt(const CreateStmt *a, const CreateStmt *b)
 	COMPARE_SCALAR_FIELD(oncommit);
 	COMPARE_STRING_FIELD(tablespacename);
 	COMPARE_STRING_FIELD(accessMethod);
+	COMPARE_STRING_FIELD(paralleldmlsafety);
 	COMPARE_SCALAR_FIELD(if_not_exists);
 
 	return true;
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 8da8b14..16c66a8 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -1107,6 +1107,7 @@ _outIntoClause(StringInfo str, const IntoClause *node)
 	WRITE_NODE_FIELD(options);
 	WRITE_ENUM_FIELD(onCommit, OnCommitAction);
 	WRITE_STRING_FIELD(tableSpaceName);
+	WRITE_STRING_FIELD(paralleldmlsafety);
 	WRITE_NODE_FIELD(viewQuery);
 	WRITE_BOOL_FIELD(skipData);
 }
@@ -2703,6 +2704,7 @@ _outCreateStmtInfo(StringInfo str, const CreateStmt *node)
 	WRITE_ENUM_FIELD(oncommit, OnCommitAction);
 	WRITE_STRING_FIELD(tablespacename);
 	WRITE_STRING_FIELD(accessMethod);
+	WRITE_STRING_FIELD(paralleldmlsafety);
 	WRITE_BOOL_FIELD(if_not_exists);
 }
 
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 3772ea0..09f5f0d 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -563,6 +563,7 @@ _readIntoClause(void)
 	READ_NODE_FIELD(options);
 	READ_ENUM_FIELD(onCommit, OnCommitAction);
 	READ_STRING_FIELD(tableSpaceName);
+	READ_STRING_FIELD(paralleldmlsafety);
 	READ_NODE_FIELD(viewQuery);
 	READ_BOOL_FIELD(skipData);
 
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 9ee90e3..f099795 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -609,7 +609,7 @@ static Node *makeRecursiveViewSelect(char *relname, List \
*aliases, Node *query);  %type <partboundspec> PartitionBoundSpec
 %type <list>		hash_partbound
 %type <defelt>		hash_partbound_elem
-
+%type <str>			ParallelDMLSafety
 
 /*
  * Non-keyword token types.  These are hard-wired into the "flex" lexer.
@@ -654,7 +654,7 @@ static Node *makeRecursiveViewSelect(char *relname, List \
*aliases, Node *query);  
 	DATA_P DATABASE DAY_P DEALLOCATE DEC DECIMAL_P DECLARE DEFAULT DEFAULTS
 	DEFERRABLE DEFERRED DEFINER DELETE_P DELIMITER DELIMITERS DEPENDS DEPTH DESC
-	DETACH DICTIONARY DISABLE_P DISCARD DISTINCT DO DOCUMENT_P DOMAIN_P
+	DETACH DICTIONARY DISABLE_P DISCARD DISTINCT DML DO DOCUMENT_P DOMAIN_P
 	DOUBLE_P DROP
 
 	EACH ELSE ENABLE_P ENCODING ENCRYPTED END_P ENUM_P ESCAPE EVENT EXCEPT
@@ -2683,6 +2683,14 @@ alter_table_cmd:
 					n->subtype = AT_NoForceRowSecurity;
 					$$ = (Node *)n;
 				}
+			/* ALTER TABLE <name> PARALLEL DML SAFE/RESTRICTED/UNSAFE */
+			| PARALLEL DML ColId
+				{
+					AlterTableCmd *n = makeNode(AlterTableCmd);
+					n->subtype = AT_ParallelDMLSafety;
+					n->def = (Node *)makeString($3);
+					$$ = (Node *)n;
+				}
 			| alter_generic_options
 				{
 					AlterTableCmd *n = makeNode(AlterTableCmd);
@@ -3268,7 +3276,7 @@ copy_generic_opt_arg_list_item:
 
 CreateStmt:	CREATE OptTemp TABLE qualified_name '(' OptTableElementList ')'
 			OptInherit OptPartitionSpec table_access_method_clause OptWith
-			OnCommitOption OptTableSpace
+			OnCommitOption OptTableSpace ParallelDMLSafety
 				{
 					CreateStmt *n = makeNode(CreateStmt);
 					$4->relpersistence = $2;
@@ -3282,12 +3290,13 @@ CreateStmt:	CREATE OptTemp TABLE qualified_name '(' \
OptTableElementList ')'  n->options = $11;
 					n->oncommit = $12;
 					n->tablespacename = $13;
+					n->paralleldmlsafety = $14;
 					n->if_not_exists = false;
 					$$ = (Node *)n;
 				}
 		| CREATE OptTemp TABLE IF_P NOT EXISTS qualified_name '('
 			OptTableElementList ')' OptInherit OptPartitionSpec table_access_method_clause
-			OptWith OnCommitOption OptTableSpace
+			OptWith OnCommitOption OptTableSpace ParallelDMLSafety
 				{
 					CreateStmt *n = makeNode(CreateStmt);
 					$7->relpersistence = $2;
@@ -3301,12 +3310,13 @@ CreateStmt:	CREATE OptTemp TABLE qualified_name '(' \
OptTableElementList ')'  n->options = $14;
 					n->oncommit = $15;
 					n->tablespacename = $16;
+					n->paralleldmlsafety = $17;
 					n->if_not_exists = true;
 					$$ = (Node *)n;
 				}
 		| CREATE OptTemp TABLE qualified_name OF any_name
 			OptTypedTableElementList OptPartitionSpec table_access_method_clause
-			OptWith OnCommitOption OptTableSpace
+			OptWith OnCommitOption OptTableSpace ParallelDMLSafety
 				{
 					CreateStmt *n = makeNode(CreateStmt);
 					$4->relpersistence = $2;
@@ -3321,12 +3331,13 @@ CreateStmt:	CREATE OptTemp TABLE qualified_name '(' \
OptTableElementList ')'  n->options = $10;
 					n->oncommit = $11;
 					n->tablespacename = $12;
+					n->paralleldmlsafety = $13;
 					n->if_not_exists = false;
 					$$ = (Node *)n;
 				}
 		| CREATE OptTemp TABLE IF_P NOT EXISTS qualified_name OF any_name
 			OptTypedTableElementList OptPartitionSpec table_access_method_clause
-			OptWith OnCommitOption OptTableSpace
+			OptWith OnCommitOption OptTableSpace ParallelDMLSafety
 				{
 					CreateStmt *n = makeNode(CreateStmt);
 					$7->relpersistence = $2;
@@ -3341,12 +3352,14 @@ CreateStmt:	CREATE OptTemp TABLE qualified_name '(' \
OptTableElementList ')'  n->options = $13;
 					n->oncommit = $14;
 					n->tablespacename = $15;
+					n->paralleldmlsafety = $16;
 					n->if_not_exists = true;
 					$$ = (Node *)n;
 				}
 		| CREATE OptTemp TABLE qualified_name PARTITION OF qualified_name
 			OptTypedTableElementList PartitionBoundSpec OptPartitionSpec
 			table_access_method_clause OptWith OnCommitOption OptTableSpace
+			ParallelDMLSafety
 				{
 					CreateStmt *n = makeNode(CreateStmt);
 					$4->relpersistence = $2;
@@ -3361,12 +3374,14 @@ CreateStmt:	CREATE OptTemp TABLE qualified_name '(' \
OptTableElementList ')'  n->options = $12;
 					n->oncommit = $13;
 					n->tablespacename = $14;
+					n->paralleldmlsafety = $15;
 					n->if_not_exists = false;
 					$$ = (Node *)n;
 				}
 		| CREATE OptTemp TABLE IF_P NOT EXISTS qualified_name PARTITION OF
 			qualified_name OptTypedTableElementList PartitionBoundSpec OptPartitionSpec
 			table_access_method_clause OptWith OnCommitOption OptTableSpace
+			ParallelDMLSafety
 				{
 					CreateStmt *n = makeNode(CreateStmt);
 					$7->relpersistence = $2;
@@ -3381,6 +3396,7 @@ CreateStmt:	CREATE OptTemp TABLE qualified_name '(' \
OptTableElementList ')'  n->options = $15;
 					n->oncommit = $16;
 					n->tablespacename = $17;
+					n->paralleldmlsafety = $18;
 					n->if_not_exists = true;
 					$$ = (Node *)n;
 				}
@@ -4081,6 +4097,10 @@ OptTableSpace:   TABLESPACE name					{ $$ = $2; }
 			| /*EMPTY*/								{ $$ = NULL; }
 		;
 
+ParallelDMLSafety:   PARALLEL DML name				{ $$ = $3; }
+			| /*EMPTY*/								{ $$ = NULL; }
+		;
+
 OptConsTableSpace:   USING INDEX TABLESPACE name	{ $$ = $4; }
 			| /*EMPTY*/								{ $$ = NULL; }
 		;
@@ -4228,7 +4248,7 @@ CreateAsStmt:
 
 create_as_target:
 			qualified_name opt_column_list table_access_method_clause
-			OptWith OnCommitOption OptTableSpace
+			OptWith OnCommitOption OptTableSpace ParallelDMLSafety
 				{
 					$$ = makeNode(IntoClause);
 					$$->rel = $1;
@@ -4237,6 +4257,7 @@ create_as_target:
 					$$->options = $4;
 					$$->onCommit = $5;
 					$$->tableSpaceName = $6;
+					$$->paralleldmlsafety = $7;
 					$$->viewQuery = NULL;
 					$$->skipData = false;		/* might get changed later */
 				}
@@ -5016,7 +5037,7 @@ AlterForeignServerStmt: ALTER SERVER name \
foreign_server_version alter_generic_o  CreateForeignTableStmt:
 		CREATE FOREIGN TABLE qualified_name
 			'(' OptTableElementList ')'
-			OptInherit SERVER name create_generic_options
+			OptInherit ParallelDMLSafety SERVER name create_generic_options
 				{
 					CreateForeignTableStmt *n = makeNode(CreateForeignTableStmt);
 					$4->relpersistence = RELPERSISTENCE_PERMANENT;
@@ -5028,15 +5049,16 @@ CreateForeignTableStmt:
 					n->base.options = NIL;
 					n->base.oncommit = ONCOMMIT_NOOP;
 					n->base.tablespacename = NULL;
+					n->base.paralleldmlsafety = $9;
 					n->base.if_not_exists = false;
 					/* FDW-specific data */
-					n->servername = $10;
-					n->options = $11;
+					n->servername = $11;
+					n->options = $12;
 					$$ = (Node *) n;
 				}
 		| CREATE FOREIGN TABLE IF_P NOT EXISTS qualified_name
 			'(' OptTableElementList ')'
-			OptInherit SERVER name create_generic_options
+			OptInherit ParallelDMLSafety SERVER name create_generic_options
 				{
 					CreateForeignTableStmt *n = makeNode(CreateForeignTableStmt);
 					$7->relpersistence = RELPERSISTENCE_PERMANENT;
@@ -5048,15 +5070,16 @@ CreateForeignTableStmt:
 					n->base.options = NIL;
 					n->base.oncommit = ONCOMMIT_NOOP;
 					n->base.tablespacename = NULL;
+					n->base.paralleldmlsafety = $12;
 					n->base.if_not_exists = true;
 					/* FDW-specific data */
-					n->servername = $13;
-					n->options = $14;
+					n->servername = $14;
+					n->options = $15;
 					$$ = (Node *) n;
 				}
 		| CREATE FOREIGN TABLE qualified_name
 			PARTITION OF qualified_name OptTypedTableElementList PartitionBoundSpec
-			SERVER name create_generic_options
+			ParallelDMLSafety SERVER name create_generic_options
 				{
 					CreateForeignTableStmt *n = makeNode(CreateForeignTableStmt);
 					$4->relpersistence = RELPERSISTENCE_PERMANENT;
@@ -5069,15 +5092,16 @@ CreateForeignTableStmt:
 					n->base.options = NIL;
 					n->base.oncommit = ONCOMMIT_NOOP;
 					n->base.tablespacename = NULL;
+					n->base.paralleldmlsafety = $10;
 					n->base.if_not_exists = false;
 					/* FDW-specific data */
-					n->servername = $11;
-					n->options = $12;
+					n->servername = $12;
+					n->options = $13;
 					$$ = (Node *) n;
 				}
 		| CREATE FOREIGN TABLE IF_P NOT EXISTS qualified_name
 			PARTITION OF qualified_name OptTypedTableElementList PartitionBoundSpec
-			SERVER name create_generic_options
+			ParallelDMLSafety SERVER name create_generic_options
 				{
 					CreateForeignTableStmt *n = makeNode(CreateForeignTableStmt);
 					$7->relpersistence = RELPERSISTENCE_PERMANENT;
@@ -5090,10 +5114,11 @@ CreateForeignTableStmt:
 					n->base.options = NIL;
 					n->base.oncommit = ONCOMMIT_NOOP;
 					n->base.tablespacename = NULL;
+					n->base.paralleldmlsafety = $13;
 					n->base.if_not_exists = true;
 					/* FDW-specific data */
-					n->servername = $14;
-					n->options = $15;
+					n->servername = $15;
+					n->options = $16;
 					$$ = (Node *) n;
 				}
 		;
@@ -15563,6 +15588,7 @@ unreserved_keyword:
 			| DICTIONARY
 			| DISABLE_P
 			| DISCARD
+			| DML
 			| DOCUMENT_P
 			| DOMAIN_P
 			| DOUBLE_P
@@ -16103,6 +16129,7 @@ bare_label_keyword:
 			| DISABLE_P
 			| DISCARD
 			| DISTINCT
+			| DML
 			| DO
 			| DOCUMENT_P
 			| DOMAIN_P
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index fd05615..ef6200d 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -1870,6 +1870,7 @@ formrdesc(const char *relationName, Oid relationReltype,
 	relation->rd_rel->relkind = RELKIND_RELATION;
 	relation->rd_rel->relnatts = (int16) natts;
 	relation->rd_rel->relam = HEAP_TABLE_AM_OID;
+	relation->rd_rel->relparalleldml = PROPARALLEL_UNSAFE;
 
 	/*
 	 * initialize attribute tuple form
@@ -3356,7 +3357,8 @@ RelationBuildLocalRelation(const char *relname,
 						   bool shared_relation,
 						   bool mapped_relation,
 						   char relpersistence,
-						   char relkind)
+						   char relkind,
+						   char relparalleldml)
 {
 	Relation	rel;
 	MemoryContext oldcxt;
@@ -3506,6 +3508,8 @@ RelationBuildLocalRelation(const char *relname,
 	else
 		rel->rd_rel->relreplident = REPLICA_IDENTITY_NOTHING;
 
+	rel->rd_rel->relparalleldml = relparalleldml;
+
 	/*
 	 * Insert relation physical and logical identifiers (OIDs) into the right
 	 * places.  For a mapped relation, we set relfilenode to zero and rely on
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 8f53cc7..abca220 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -6227,6 +6227,7 @@ getTables(Archive *fout, int *numTables)
 	int			i_relpersistence;
 	int			i_relispopulated;
 	int			i_relreplident;
+	int			i_relproparalleldml;
 	int			i_owning_tab;
 	int			i_owning_col;
 	int			i_reltablespace;
@@ -6331,7 +6332,7 @@ getTables(Archive *fout, int *numTables)
 						  "tc.relfrozenxid AS tfrozenxid, "
 						  "tc.relminmxid AS tminmxid, "
 						  "c.relpersistence, c.relispopulated, "
-						  "c.relreplident, c.relpages, am.amname, "
+						  "c.relreplident, c.relparalleldml, c.relpages, am.amname, "
 						  "CASE WHEN c.relkind = 'f' THEN "
 						  "(SELECT ftserver FROM pg_catalog.pg_foreign_table WHERE ftrelid = c.oid) "
 						  "ELSE 0 END AS foreignserver, "
@@ -6423,7 +6424,7 @@ getTables(Archive *fout, int *numTables)
 						  "tc.relfrozenxid AS tfrozenxid, "
 						  "tc.relminmxid AS tminmxid, "
 						  "c.relpersistence, c.relispopulated, "
-						  "c.relreplident, c.relpages, "
+						  "c.relreplident, c.relparalleldml, c.relpages, "
 						  "NULL AS amname, "
 						  "CASE WHEN c.relkind = 'f' THEN "
 						  "(SELECT ftserver FROM pg_catalog.pg_foreign_table WHERE ftrelid = c.oid) "
@@ -6476,7 +6477,7 @@ getTables(Archive *fout, int *numTables)
 						  "tc.relfrozenxid AS tfrozenxid, "
 						  "tc.relminmxid AS tminmxid, "
 						  "c.relpersistence, c.relispopulated, "
-						  "c.relreplident, c.relpages, "
+						  "c.relreplident, c.relparalleldml, c.relpages, "
 						  "NULL AS amname, "
 						  "CASE WHEN c.relkind = 'f' THEN "
 						  "(SELECT ftserver FROM pg_catalog.pg_foreign_table WHERE ftrelid = c.oid) "
@@ -6529,7 +6530,7 @@ getTables(Archive *fout, int *numTables)
 						  "tc.relfrozenxid AS tfrozenxid, "
 						  "tc.relminmxid AS tminmxid, "
 						  "c.relpersistence, c.relispopulated, "
-						  "'d' AS relreplident, c.relpages, "
+						  "'d' AS relreplident, 'u' AS relparalleldml, c.relpages, "
 						  "NULL AS amname, "
 						  "CASE WHEN c.relkind = 'f' THEN "
 						  "(SELECT ftserver FROM pg_catalog.pg_foreign_table WHERE ftrelid = c.oid) "
@@ -6582,7 +6583,7 @@ getTables(Archive *fout, int *numTables)
 						  "tc.relfrozenxid AS tfrozenxid, "
 						  "0 AS tminmxid, "
 						  "c.relpersistence, 't' as relispopulated, "
-						  "'d' AS relreplident, c.relpages, "
+						  "'d' AS relreplident, 'u' AS relparalleldml, c.relpages, "
 						  "NULL AS amname, "
 						  "CASE WHEN c.relkind = 'f' THEN "
 						  "(SELECT ftserver FROM pg_catalog.pg_foreign_table WHERE ftrelid = c.oid) "
@@ -6633,7 +6634,7 @@ getTables(Archive *fout, int *numTables)
 						  "tc.relfrozenxid AS tfrozenxid, "
 						  "0 AS tminmxid, "
 						  "'p' AS relpersistence, 't' as relispopulated, "
-						  "'d' AS relreplident, c.relpages, "
+						  "'d' AS relreplident, 'u' AS relparalleldml, c.relpages, "
 						  "NULL AS amname, "
 						  "NULL AS foreignserver, "
 						  "CASE WHEN c.reloftype <> 0 THEN c.reloftype::pg_catalog.regtype ELSE NULL \
END AS reloftype, " @@ -6681,7 +6682,7 @@ getTables(Archive *fout, int *numTables)
 						  "tc.relfrozenxid AS tfrozenxid, "
 						  "0 AS tminmxid, "
 						  "'p' AS relpersistence, 't' as relispopulated, "
-						  "'d' AS relreplident, c.relpages, "
+						  "'d' AS relreplident, 'u' AS relparalleldml, c.relpages, "
 						  "NULL AS amname, "
 						  "NULL AS foreignserver, "
 						  "NULL AS reloftype, "
@@ -6729,7 +6730,7 @@ getTables(Archive *fout, int *numTables)
 						  "tc.relfrozenxid AS tfrozenxid, "
 						  "0 AS tminmxid, "
 						  "'p' AS relpersistence, 't' as relispopulated, "
-						  "'d' AS relreplident, c.relpages, "
+						  "'d' AS relreplident, 'u' AS relparalleldml, c.relpages, "
 						  "NULL AS amname, "
 						  "NULL AS foreignserver, "
 						  "NULL AS reloftype, "
@@ -6776,7 +6777,7 @@ getTables(Archive *fout, int *numTables)
 						  "0 AS toid, "
 						  "0 AS tfrozenxid, 0 AS tminmxid,"
 						  "'p' AS relpersistence, 't' as relispopulated, "
-						  "'d' AS relreplident, relpages, "
+						  "'d' AS relreplident, 'u' AS relparalleldml, relpages, "
 						  "NULL AS amname, "
 						  "NULL AS foreignserver, "
 						  "NULL AS reloftype, "
@@ -6845,6 +6846,7 @@ getTables(Archive *fout, int *numTables)
 	i_relpersistence = PQfnumber(res, "relpersistence");
 	i_relispopulated = PQfnumber(res, "relispopulated");
 	i_relreplident = PQfnumber(res, "relreplident");
+	i_relproparalleldml = PQfnumber(res, "relparalleldml");
 	i_relpages = PQfnumber(res, "relpages");
 	i_foreignserver = PQfnumber(res, "foreignserver");
 	i_owning_tab = PQfnumber(res, "owning_tab");
@@ -6900,6 +6902,7 @@ getTables(Archive *fout, int *numTables)
 		tblinfo[i].hasoids = (strcmp(PQgetvalue(res, i, i_relhasoids), "t") == 0);
 		tblinfo[i].relispopulated = (strcmp(PQgetvalue(res, i, i_relispopulated), "t") == \
0);  tblinfo[i].relreplident = *(PQgetvalue(res, i, i_relreplident));
+		tblinfo[i].relparalleldml = *(PQgetvalue(res, i, i_relproparalleldml));
 		tblinfo[i].relpages = atoi(PQgetvalue(res, i, i_relpages));
 		tblinfo[i].frozenxid = atooid(PQgetvalue(res, i, i_relfrozenxid));
 		tblinfo[i].minmxid = atooid(PQgetvalue(res, i, i_relminmxid));
@@ -16423,6 +16426,32 @@ dumpTableSchema(Archive *fout, const TableInfo *tbinfo)
 		}
 	}
 
+	if (tbinfo->relkind == RELKIND_RELATION ||
+		tbinfo->relkind == RELKIND_PARTITIONED_TABLE ||
+		tbinfo->relkind == RELKIND_FOREIGN_TABLE)
+	{
+		appendPQExpBuffer(q, "\nALTER %sTABLE %s PARALLEL DML ",
+						tbinfo->relkind == RELKIND_FOREIGN_TABLE ? "FOREIGN " : "",
+						qualrelname);
+
+		switch (tbinfo->relparalleldml)
+		{
+			case 's':
+				appendPQExpBuffer(q, "SAFE;\n");
+				break;
+			case 'r':
+				appendPQExpBuffer(q, "RESTRICTED;\n");
+				break;
+			case 'u':
+				appendPQExpBuffer(q, "UNSAFE;\n");
+				break;
+			default:
+				/* should not reach here */
+				appendPQExpBuffer(q, "UNSAFE;\n");
+				break;
+		}
+	}
+
 	if (tbinfo->forcerowsec)
 		appendPQExpBuffer(q, "\nALTER TABLE ONLY %s FORCE ROW LEVEL SECURITY;\n",
 						  qualrelname);
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 49e1b0a..e083593 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -268,6 +268,7 @@ typedef struct _tableInfo
 	char		relpersistence; /* relation persistence */
 	bool		relispopulated; /* relation is populated */
 	char		relreplident;	/* replica identifier */
+	char		relparalleldml; /* parallel safety of dml on the relation */
 	char	   *reltablespace;	/* relation tablespace */
 	char	   *reloptions;		/* options specified by WITH (...) */
 	char	   *checkoption;	/* WITH CHECK OPTION, if any */
diff --git a/src/include/catalog/heap.h b/src/include/catalog/heap.h
index 6ce480b..b599759 100644
--- a/src/include/catalog/heap.h
+++ b/src/include/catalog/heap.h
@@ -55,6 +55,7 @@ extern Relation heap_create(const char *relname,
 							TupleDesc tupDesc,
 							char relkind,
 							char relpersistence,
+							char relparalleldml,
 							bool shared_relation,
 							bool mapped_relation,
 							bool allow_system_table_mods,
@@ -73,6 +74,7 @@ extern Oid	heap_create_with_catalog(const char *relname,
 									 List *cooked_constraints,
 									 char relkind,
 									 char relpersistence,
+									 char relparalleldml,
 									 bool shared_relation,
 									 bool mapped_relation,
 									 OnCommitAction oncommit,
diff --git a/src/include/catalog/pg_class.h b/src/include/catalog/pg_class.h
index 3e37729..af280b5 100644
--- a/src/include/catalog/pg_class.h
+++ b/src/include/catalog/pg_class.h
@@ -116,6 +116,9 @@ CATALOG(pg_class,1259,RelationRelationId) BKI_BOOTSTRAP \
BKI_ROWTYPE_OID(83,Relat  /* see REPLICA_IDENTITY_xxx constants */
 	char		relreplident BKI_DEFAULT(n);
 
+	/* parallel safety of the dml on the relation */
+	char		relparalleldml BKI_DEFAULT(u);
+
 	/* is relation a partition? */
 	bool		relispartition BKI_DEFAULT(f);
 
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index ef73342..dcdf6db 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -1933,7 +1933,8 @@ typedef enum AlterTableType
 	AT_AddIdentity,				/* ADD IDENTITY */
 	AT_SetIdentity,				/* SET identity column options */
 	AT_DropIdentity,			/* DROP IDENTITY */
-	AT_ReAddStatistics			/* internal to commands/tablecmds.c */
+	AT_ReAddStatistics,			/* internal to commands/tablecmds.c */
+	AT_ParallelDMLSafety		/* PARALLEL DML SAFE/RESTRICTED/UNSAFE */
 } AlterTableType;
 
 typedef struct ReplicaIdentityStmt
@@ -2168,6 +2169,7 @@ typedef struct CreateStmt
 	OnCommitAction oncommit;	/* what do we do at COMMIT? */
 	char	   *tablespacename; /* table space to use, or NULL */
 	char	   *accessMethod;	/* table access method */
+	char	   *paralleldmlsafety; /* parallel dml safety */
 	bool		if_not_exists;	/* just do nothing if it already exists? */
 } CreateStmt;
 
diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h
index 9ae851d..6b532b0 100644
--- a/src/include/nodes/primnodes.h
+++ b/src/include/nodes/primnodes.h
@@ -115,6 +115,7 @@ typedef struct IntoClause
 	List	   *options;		/* options from WITH clause */
 	OnCommitAction onCommit;	/* what do we do at COMMIT? */
 	char	   *tableSpaceName; /* table space to use, or NULL */
+	char	   *paralleldmlsafety; /* parallel dml safety */
 	Node	   *viewQuery;		/* materialized view's SELECT query */
 	bool		skipData;		/* true for WITH NO DATA */
 } IntoClause;
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index f836acf..05222fa 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -139,6 +139,7 @@ PG_KEYWORD("dictionary", DICTIONARY, UNRESERVED_KEYWORD, \
BARE_LABEL)  PG_KEYWORD("disable", DISABLE_P, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("discard", DISCARD, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("distinct", DISTINCT, RESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("dml", DML, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("do", DO, RESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("document", DOCUMENT_P, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("domain", DOMAIN_P, UNRESERVED_KEYWORD, BARE_LABEL)
diff --git a/src/include/utils/relcache.h b/src/include/utils/relcache.h
index f772855..5ea225a 100644
--- a/src/include/utils/relcache.h
+++ b/src/include/utils/relcache.h
@@ -108,7 +108,8 @@ extern Relation RelationBuildLocalRelation(const char *relname,
 										   bool shared_relation,
 										   bool mapped_relation,
 										   char relpersistence,
-										   char relkind);
+										   char relkind,
+										   char relparalleldml);
 
 /*
  * Routines to manage assignment of new relfilenode to a relation
diff --git a/src/test/modules/test_ddl_deparse/test_ddl_deparse.c \
b/src/test/modules/test_ddl_deparse/test_ddl_deparse.c index 1bae1e5..e1f5678 100644
--- a/src/test/modules/test_ddl_deparse/test_ddl_deparse.c
+++ b/src/test/modules/test_ddl_deparse/test_ddl_deparse.c
@@ -276,6 +276,9 @@ get_altertable_subcmdtypes(PG_FUNCTION_ARGS)
 			case AT_NoForceRowSecurity:
 				strtype = "NO FORCE ROW SECURITY";
 				break;
+			case AT_ParallelDMLSafety:
+				strtype = "PARALLEL DML SAFETY";
+				break;
 			case AT_GenericOptions:
 				strtype = "SET OPTIONS";
 				break;
-- 
2.7.2.windows.1


["v6-0002-parallel-SELECT-for-INSERT.patch" (application/octet-stream)]

From 49f57ad7508e2ee33fade8a1a7358b7c4ec09eaa Mon Sep 17 00:00:00 2001
From: houzj <houzj.fnst@fujitsu.com>
Date: Mon, 31 May 2021 09:32:54 +0800
Subject: [PATCH 2/3] parallel-SELECT-for-INSERT

Enable parallel select for insert.
Prepare for entering parallel mode by assigning a TransactionId.

---
 src/backend/access/transam/xact.c    | 26 +++++++++++
 src/backend/executor/execMain.c      |  3 ++
 src/backend/optimizer/plan/planner.c | 21 ++++-----
 src/backend/optimizer/util/clauses.c | 87 +++++++++++++++++++++++++++++++++++-
 src/include/access/xact.h            | 15 +++++++
 src/include/optimizer/clauses.h      |  2 +
 6 files changed, 143 insertions(+), 11 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 4414459..2d68e46 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1015,6 +1015,32 @@ IsInParallelMode(void)
 }
 
 /*
+ *	PrepareParallelModePlanExec
+ *
+ * Prepare for entering parallel mode plan execution, based on command-type.
+ */
+void
+PrepareParallelModePlanExec(CmdType commandType)
+{
+	if (IsModifySupportedInParallelMode(commandType))
+	{
+		Assert(!IsInParallelMode());
+
+		/*
+		 * Prepare for entering parallel mode by assigning a TransactionId.
+		 * Failure to do this now would result in heap_insert() subsequently
+		 * attempting to assign a TransactionId whilst in parallel-mode, which
+		 * is not allowed.
+		 *
+		 * This approach has a disadvantage in that if the underlying SELECT
+		 * does not return any rows, then the TransactionId is not used,
+		 * however that shouldn't happen in practice in many cases.
+		 */
+		(void) GetCurrentTransactionId();
+	}
+}
+
+/*
  *	CommandCounterIncrement
  */
 void
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index b3ce4ba..ea685f0 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1535,7 +1535,10 @@ ExecutePlan(EState *estate,
 
 	estate->es_use_parallel_mode = use_parallel_mode;
 	if (use_parallel_mode)
+	{
+		PrepareParallelModePlanExec(estate->es_plannedstmt->commandType);
 		EnterParallelMode();
+	}
 
 	/*
 	 * Loop until we've processed the proper number of tuples from the plan.
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 1868c4e..7736813 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -314,16 +314,16 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
 	/*
 	 * Assess whether it's feasible to use parallel mode for this query. We
 	 * can't do this in a standalone backend, or if the command will try to
-	 * modify any data, or if this is a cursor operation, or if GUCs are set
-	 * to values that don't permit parallelism, or if parallel-unsafe
-	 * functions are present in the query tree.
+	 * modify any data (except for Insert), or if this is a cursor operation,
+	 * or if GUCs are set to values that don't permit parallelism, or if
+	 * parallel-unsafe functions are present in the query tree.
 	 *
-	 * (Note that we do allow CREATE TABLE AS, SELECT INTO, and CREATE
-	 * MATERIALIZED VIEW to use parallel plans, but as of now, only the leader
-	 * backend writes into a completely new table.  In the future, we can
-	 * extend it to allow workers to write into the table.  However, to allow
-	 * parallel updates and deletes, we have to solve other problems,
-	 * especially around combo CIDs.)
+	 * (Note that we do allow CREATE TABLE AS, INSERT INTO...SELECT, SELECT
+	 * INTO, and CREATE MATERIALIZED VIEW to use parallel plans. However, as
+	 * of now, only the leader backend writes into a completely new table. In
+	 * the future, we can extend it to allow workers to write into the table.
+	 * However, to allow parallel updates and deletes, we have to solve other
+	 * problems, especially around combo CIDs.)
 	 *
 	 * For now, we don't try to use parallel mode if we're running inside a
 	 * parallel worker.  We might eventually be able to relax this
@@ -332,7 +332,8 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
 	 */
 	if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 &&
 		IsUnderPostmaster &&
-		parse->commandType == CMD_SELECT &&
+		(parse->commandType == CMD_SELECT ||
+		is_parallel_allowed_for_modify(parse)) &&
 		!parse->hasModifyingCTE &&
 		max_parallel_workers_per_gather > 0 &&
 		!IsParallelWorker())
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 517712a..7c58c88 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -20,6 +20,8 @@
 #include "postgres.h"
 
 #include "access/htup_details.h"
+#include "access/table.h"
+#include "access/xact.h"
 #include "catalog/pg_aggregate.h"
 #include "catalog/pg_class.h"
 #include "catalog/pg_language.h"
@@ -43,6 +45,7 @@
 #include "parser/parse_agg.h"
 #include "parser/parse_coerce.h"
 #include "parser/parse_func.h"
+#include "parser/parsetree.h"
 #include "rewrite/rewriteManip.h"
 #include "tcop/tcopprot.h"
 #include "utils/acl.h"
@@ -51,6 +54,7 @@
 #include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/rel.h"
 #include "utils/syscache.h"
 #include "utils/typcache.h"
 
@@ -148,6 +152,7 @@ static Query *substitute_actual_srf_parameters(Query *expr,
 											   int nargs, List *args);
 static Node *substitute_actual_srf_parameters_mutator(Node *node,
 													  substitute_actual_srf_parameters_context *context);
+static bool max_parallel_hazard_test(char proparallel, max_parallel_hazard_context *context);
 
 
 /*****************************************************************************
@@ -615,12 +620,34 @@ contain_volatile_functions_not_nextval_walker(Node *node, void *context)
 char
 max_parallel_hazard(Query *parse)
 {
+	bool max_hazard_found;
 	max_parallel_hazard_context context;
 
 	context.max_hazard = PROPARALLEL_SAFE;
 	context.max_interesting = PROPARALLEL_UNSAFE;
 	context.safe_param_ids = NIL;
-	(void) max_parallel_hazard_walker((Node *) parse, &context);
+
+	max_hazard_found = max_parallel_hazard_walker((Node *) parse, &context);
+
+	if (!max_hazard_found &&
+		IsModifySupportedInParallelMode(parse->commandType))
+	{
+		RangeTblEntry *rte;
+		Relation target_rel;
+
+		rte = rt_fetch(parse->resultRelation, parse->rtable);
+
+		/*
+		 * The target table is already locked by the caller (this is done in the
+		 * parse/analyze phase), and remains locked until end-of-transaction.
+		 */
+		target_rel = table_open(rte->relid, NoLock);
+
+		(void) max_parallel_hazard_test(target_rel->rd_rel->relparalleldml,
+										&context);
+		table_close(target_rel, NoLock);
+	}
+
 	return context.max_hazard;
 }
 
@@ -854,6 +881,64 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
 								  context);
 }
 
+/*
+ * is_parallel_allowed_for_modify
+ *
+ * Check at a high-level if parallel mode is able to be used for the specified
+ * table-modification statement. Currently, we support only Inserts.
+ *
+ * It's not possible in the following cases:
+ *
+ *  1) INSERT...ON CONFLICT...DO UPDATE
+ *  2) INSERT without SELECT
+ *
+ * (Note: we don't do in-depth parallel-safety checks here, we do only the
+ * cheaper tests that can quickly exclude obvious cases for which
+ * parallelism isn't supported, to avoid having to do further parallel-safety
+ * checks for these)
+ */
+bool
+is_parallel_allowed_for_modify(Query *parse)
+{
+	bool		hasSubQuery;
+	RangeTblEntry *rte;
+	ListCell   *lc;
+
+	if (!IsModifySupportedInParallelMode(parse->commandType))
+		return false;
+
+	/*
+	 * UPDATE is not currently supported in parallel-mode, so prohibit
+	 * INSERT...ON CONFLICT...DO UPDATE...
+	 *
+	 * In order to support update, even if only in the leader, some further
+	 * work would need to be done. A mechanism would be needed for sharing
+	 * combo-cids between leader and workers during parallel-mode, since for
+	 * example, the leader might generate a combo-cid and it needs to be
+	 * propagated to the workers.
+	 */
+	if (parse->commandType == CMD_INSERT &&
+		parse->onConflict != NULL &&
+		parse->onConflict->action == ONCONFLICT_UPDATE)
+		return false;
+
+	/*
+	 * If there is no underlying SELECT, a parallel insert operation is not
+	 * desirable.
+	 */
+	hasSubQuery = false;
+	foreach(lc, parse->rtable)
+	{
+		rte = lfirst_node(RangeTblEntry, lc);
+		if (rte->rtekind == RTE_SUBQUERY)
+		{
+			hasSubQuery = true;
+			break;
+		}
+	}
+
+	return hasSubQuery;
+}
 
 /*****************************************************************************
  *		Check clauses for nonstrict functions
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 134f686..fd3f86b 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -466,5 +466,20 @@ extern void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parse
 extern void EnterParallelMode(void);
 extern void ExitParallelMode(void);
 extern bool IsInParallelMode(void);
+extern void PrepareParallelModePlanExec(CmdType commandType);
+
+/*
+ * IsModifySupportedInParallelMode
+ *
+ * Indicates whether execution of the specified table-modification command
+ * (INSERT/UPDATE/DELETE) in parallel-mode is supported, subject to certain
+ * parallel-safety conditions.
+ */
+static inline bool
+IsModifySupportedInParallelMode(CmdType commandType)
+{
+	/* Currently only INSERT is supported */
+	return (commandType == CMD_INSERT);
+}
 
 #endif							/* XACT_H */
diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h
index 0673887..32b5656 100644
--- a/src/include/optimizer/clauses.h
+++ b/src/include/optimizer/clauses.h
@@ -53,4 +53,6 @@ extern void CommuteOpExpr(OpExpr *clause);
 extern Query *inline_set_returning_function(PlannerInfo *root,
 											RangeTblEntry *rte);
 
+extern bool is_parallel_allowed_for_modify(Query *parse);
+
 #endif							/* CLAUSES_H */
-- 
2.7.2.windows.1


["v6-0003-get-parallel-safety-functions.patch" (application/octet-stream)]

From f3a384416cb832a14c22cf49d493d59c7e003d56 Mon Sep 17 00:00:00 2001
From: houzj <houzj.fnst@fujitsu.com>
Date: Mon, 31 May 2021 09:38:28 +0800
Subject: [PATCH 3/3] get-parallel-safety-functions

Provide a utility function "pg_get_parallel_safety(regclass)" that
returns records of (objid, classid, parallel_safety) for all
parallel unsafe/restricted table-related objects from which the
table's parallel DML safety is determined. The user can use this
information during development in order to accurately declare a
table's parallel DML safety. Or to identify any problematic objects
if a parallel DML fails or behaves unexpectedly.

When the use of an index-related parallel unsafe/restricted function
is detected, both the function oid and the index oid are returned.

Provide a utility function "pg_get_max_parallel_hazard(regclass)" that
returns the worst parallel DML safety hazard that can be found in the
given relation. Users can use this function to do a quick check without
caring about specific parallel-related objects.

---
 src/backend/optimizer/util/clauses.c | 568 ++++++++++++++++++++++++++++++++++-
 src/backend/utils/adt/misc.c         |  94 ++++++
 src/backend/utils/cache/typcache.c   |  15 +
 src/include/catalog/pg_proc.dat      |  22 +-
 src/include/optimizer/clauses.h      |  10 +
 src/include/utils/typcache.h         |   2 +
 6 files changed, 706 insertions(+), 5 deletions(-)

diff --git a/src/backend/optimizer/util/clauses.c \
b/src/backend/optimizer/util/clauses.c index 7c58c88..ecd1597 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -19,15 +19,20 @@
 
 #include "postgres.h"
 
+#include "access/amapi.h"
+#include "access/genam.h"
 #include "access/htup_details.h"
 #include "access/table.h"
 #include "access/xact.h"
 #include "catalog/pg_aggregate.h"
 #include "catalog/pg_class.h"
+#include "catalog/pg_constraint.h"
 #include "catalog/pg_language.h"
 #include "catalog/pg_operator.h"
 #include "catalog/pg_proc.h"
+#include "catalog/pg_trigger.h"
 #include "catalog/pg_type.h"
+#include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/functions.h"
 #include "funcapi.h"
@@ -46,6 +51,8 @@
 #include "parser/parse_coerce.h"
 #include "parser/parse_func.h"
 #include "parser/parsetree.h"
+#include "partitioning/partdesc.h"
+#include "rewrite/rewriteHandler.h"
 #include "rewrite/rewriteManip.h"
 #include "tcop/tcopprot.h"
 #include "utils/acl.h"
@@ -54,6 +61,7 @@
 #include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/partcache.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
 #include "utils/typcache.h"
@@ -92,6 +100,9 @@ typedef struct
 	char		max_hazard;		/* worst proparallel hazard found so far */
 	char		max_interesting;	/* worst proparallel hazard of interest */
 	List	   *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */
+	bool		check_all;
+	List	   *func_oids;
+	PartitionDirectory partition_directory;
 } max_parallel_hazard_context;
 
 static bool contain_agg_clause_walker(Node *node, void *context);
@@ -102,6 +113,24 @@ static bool contain_volatile_functions_walker(Node *node, void \
*context);  static bool contain_volatile_functions_not_nextval_walker(Node *node, \
void *context);  static bool max_parallel_hazard_walker(Node *node,
 									   max_parallel_hazard_context *context);
+static bool target_rel_all_parallel_hazard_recurse(Relation relation,
+												   max_parallel_hazard_context *context);
+static bool target_rel_trigger_max_parallel_hazard(Relation rel,
+												   max_parallel_hazard_context *context);
+static bool index_expr_max_parallel_hazard(Relation index_rel,
+										   List *ii_Expressions,
+										   List *ii_Predicate,
+										   bool check_all,
+										   char max_interesting,
+										   max_parallel_hazard_context *context);
+static bool target_rel_index_max_parallel_hazard(Relation rel,
+												 max_parallel_hazard_context *context);
+static bool target_rel_domain_max_parallel_hazard(Oid typid,
+												  max_parallel_hazard_context *context);
+static bool target_rel_partitions_max_parallel_hazard(Relation rel,
+													  max_parallel_hazard_context *context);
+static List *target_rel_chk_constr_max_parallel_hazard(Relation rel,
+													   max_parallel_hazard_context *context);
 static bool contain_nonstrict_functions_walker(Node *node, void *context);
 static bool contain_exec_param_walker(Node *node, List *param_ids);
 static bool contain_context_dependent_node(Node *clause);
@@ -153,6 +182,7 @@ static Query *substitute_actual_srf_parameters(Query *expr,
 static Node *substitute_actual_srf_parameters_mutator(Node *node,
 													  substitute_actual_srf_parameters_context *context);
 static bool max_parallel_hazard_test(char proparallel, max_parallel_hazard_context \
*context); +static safety_object *make_safety_object(Oid objid, Oid classid, char \
proparallel);  
 
 /*****************************************************************************
@@ -626,6 +656,9 @@ max_parallel_hazard(Query *parse)
 	context.max_hazard = PROPARALLEL_SAFE;
 	context.max_interesting = PROPARALLEL_UNSAFE;
 	context.safe_param_ids = NIL;
+	context.check_all = false;
+	context.func_oids = NIL;
+	context.partition_directory = NULL;
 
 	max_hazard_found = max_parallel_hazard_walker((Node *) parse, &context);
 
@@ -678,6 +711,9 @@ is_parallel_safe(PlannerInfo *root, Node *node)
 	context.max_hazard = PROPARALLEL_SAFE;
 	context.max_interesting = PROPARALLEL_RESTRICTED;
 	context.safe_param_ids = NIL;
+	context.check_all = false;
+	context.func_oids = NIL;
+	context.partition_directory = NULL;
 
 	/*
 	 * The params that refer to the same or parent query level are considered
@@ -709,7 +745,7 @@ max_parallel_hazard_test(char proparallel, \
max_parallel_hazard_context *context)  break;
 		case PROPARALLEL_RESTRICTED:
 			/* increase max_hazard to RESTRICTED */
-			Assert(context->max_hazard != PROPARALLEL_UNSAFE);
+			Assert(context->check_all || context->max_hazard != PROPARALLEL_UNSAFE);
 			context->max_hazard = proparallel;
 			/* done if we are not expecting any unsafe functions */
 			if (context->max_interesting == proparallel)
@@ -726,6 +762,64 @@ max_parallel_hazard_test(char proparallel, \
max_parallel_hazard_context *context)  return false;
 }
 
+
+static safety_object *
+make_safety_object(Oid objid, Oid classid, char proparallel)
+{
+	safety_object *object = (safety_object *) palloc(sizeof(safety_object));
+
+	object->objid = objid;
+	object->classid = classid;
+	object->proparallel = proparallel;
+
+	return object;
+}
+
+static bool
+parallel_safety_checker(Oid func_id, void *context)
+{
+	char proparallel;
+	max_parallel_hazard_context *cont = (max_parallel_hazard_context *) context;
+
+	proparallel = func_parallel(func_id);
+	if (max_parallel_hazard_test(proparallel, cont) && !cont->check_all)
+		return true;
+	else if (proparallel != PROPARALLEL_SAFE)
+	{
+		cont->func_oids = lappend(cont->func_oids,
+			make_safety_object(func_id, ProcedureRelationId, proparallel));
+	}
+
+	return false;
+}
+
+/* Check parallel unsafe/restricted function in expression */
+static bool
+parallel_safety_walker(Node *node, max_parallel_hazard_context *context)
+{
+	if (node == NULL)
+		return false;
+
+	/* Check for hazardous functions in node itself */
+	if (check_functions_in_node(node, parallel_safety_checker,
+								context))
+		return true;
+
+	if (IsA(node, CoerceToDomain))
+	{
+		CoerceToDomain *domain = (CoerceToDomain *) node;
+
+		if (target_rel_domain_max_parallel_hazard(domain->resulttype, context) &&
+		   !context->check_all)
+			return true;
+	}
+
+	/* Recurse to check arguments */
+	return expression_tree_walker(node,
+								  parallel_safety_walker,
+								  context);
+}
+
 /* check_functions_in_node callback */
 static bool
 max_parallel_hazard_checker(Oid func_id, void *context)
@@ -881,6 +975,478 @@ max_parallel_hazard_walker(Node *node, \
max_parallel_hazard_context *context)  context);
 }
 
+List*
+target_rel_max_parallel_hazard(Oid relOid, bool findall,
+							   char max_interesting, char *max_hazard)
+{
+	max_parallel_hazard_context context;
+	Relation	targetRel;
+
+	context.check_all = findall;
+	context.func_oids = NIL;
+	context.max_hazard = PROPARALLEL_SAFE;
+	context.max_interesting = max_interesting;
+	context.safe_param_ids = NIL;
+	context.partition_directory = NULL;
+
+	targetRel = table_open(relOid, AccessShareLock);
+
+	(void) target_rel_all_parallel_hazard_recurse(targetRel, &context);
+	if (context.partition_directory)
+		DestroyPartitionDirectory(context.partition_directory);
+
+	table_close(targetRel, AccessShareLock);
+
+	*max_hazard = context.max_hazard;
+
+	return context.func_oids;
+}
+
+
+static bool
+target_rel_all_parallel_hazard_recurse(Relation rel, max_parallel_hazard_context \
*context) +{
+	TupleDesc	tupdesc;
+	int			attnum;
+
+	/*
+	 * We can't support table modification in a parallel worker if it's a
+	 * foreign table/partition (no FDW API for supporting parallel access) or
+	 * a temporary table.
+	 */
+	if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE ||
+		RelationUsesLocalBuffers(rel))
+	{
+		if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context) &&
+		   !context->check_all)
+			return true;
+		else
+			context->func_oids = lappend(context->func_oids,
+				make_safety_object(rel->rd_rel->oid, RelationRelationId,
+								   PROPARALLEL_RESTRICTED));
+	}
+
+	/*
+	 * If a partitioned table, check that each partition is safe for
+	 * modification in parallel-mode.
+	 */
+	if (target_rel_partitions_max_parallel_hazard(rel, context))
+		return true;
+
+	/*
+	 * If there are any index expressions or index predicate, check that they
+	 * are parallel-mode safe.
+	 */
+	if (target_rel_index_max_parallel_hazard(rel, context))
+		return true;
+
+	/*
+	 * If any triggers exist, check that they are parallel-safe.
+	 */
+	if (target_rel_trigger_max_parallel_hazard(rel, context))
+		return true;
+
+	/*
+	 * Column default expressions are only applicable to INSERT and UPDATE.
+	 * Note that even though column defaults may be specified separately for
+	 * each partition in a partitioned table, a partition's default value is
+	 * not applied when inserting a tuple through a partitioned table.
+	 */
+
+	tupdesc = RelationGetDescr(rel);
+	for (attnum = 0; attnum < tupdesc->natts; attnum++)
+	{
+		Form_pg_attribute att = TupleDescAttr(tupdesc, attnum);
+
+		/* We don't need info for dropped or generated attributes */
+		if (att->attisdropped || att->attgenerated)
+			continue;
+
+		if (att->atthasdef)
+		{
+			Node	   *defaultexpr;
+			defaultexpr = build_column_default(rel, attnum);
+			if (parallel_safety_walker((Node *) defaultexpr, context))
+				return true;
+		}
+
+		/*
+		 * If the column is of a DOMAIN type, determine whether that
+		 * domain has any CHECK expressions that are not parallel-mode
+		 * safe.
+		 */
+		if (get_typtype(att->atttypid) == TYPTYPE_DOMAIN)
+		{
+			if (target_rel_domain_max_parallel_hazard(att->atttypid, context))
+				return true;
+		}
+	}
+
+	/*
+	 * CHECK constraints are only applicable to INSERT and UPDATE. If any
+	 * CHECK constraints exist, determine if they are parallel-safe.
+	 */
+	if (target_rel_chk_constr_max_parallel_hazard(rel, context))
+		return true;
+
+	return false;
+}
+
+/*
+ * target_rel_trigger_max_parallel_hazard
+ *
+ * Finds all the PARALLEL UNSAFE/RESTRICTED objects for the specified relation's
+ * trigger data.
+ */
+static bool
+target_rel_trigger_max_parallel_hazard(Relation rel,
+									   max_parallel_hazard_context *context)
+{
+	int		i;
+	char	proparallel;
+
+	if (rel->trigdesc == NULL)
+		return false;
+
+	/*
+	 * Care is needed here to avoid using the same relcache TriggerDesc field
+	 * across other cache accesses, because relcache doesn't guarantee that it
+	 * won't move.
+	 */
+	for (i = 0; i < rel->trigdesc->numtriggers; i++)
+	{
+		Oid			tgfoid = rel->trigdesc->triggers[i].tgfoid;
+		Oid			tgoid = rel->trigdesc->triggers[i].tgoid;
+
+		proparallel = func_parallel(tgfoid);
+
+		if (max_parallel_hazard_test(proparallel, context) &&
+		   !context->check_all)
+			return true;
+		else if (proparallel != PROPARALLEL_SAFE)
+		{
+			context->func_oids = lappend(context->func_oids,
+				make_safety_object(tgfoid, ProcedureRelationId, proparallel));
+			context->func_oids = lappend(context->func_oids,
+				make_safety_object(tgoid, TriggerRelationId, proparallel));
+		}
+	}
+
+	return false;
+}
+
+static bool
+index_expr_max_parallel_hazard(Relation index_rel,
+						List *ii_Expressions, List *ii_Predicate,
+						bool check_all, char max_interesting,
+						max_parallel_hazard_context *context)
+{
+	int indnatts;
+	int nsupport;
+	Form_pg_index indexStruct;
+	int			i;
+	ListCell   *index_expr_item;
+
+	indexStruct = index_rel->rd_index;
+	index_expr_item = list_head(ii_Expressions);
+
+	if (ii_Expressions != NIL)
+	{
+		for (i = 0; i < indexStruct->indnatts; i++)
+		{
+			int			keycol = indexStruct->indkey.values[i];
+
+			if (keycol == 0)
+			{
+				/* Found an index expression */
+				Node	   *index_expr;
+
+				Assert(index_expr_item != NULL);
+				if (index_expr_item == NULL)	/* shouldn't happen */
+					elog(ERROR, "too few entries in indexprs list");
+
+				index_expr = (Node *) lfirst(index_expr_item);
+
+				if (parallel_safety_walker(index_expr, context))
+					return true;
+
+				index_expr_item = lnext(ii_Expressions, index_expr_item);
+			}
+		}
+	}
+
+	if (ii_Predicate != NIL)
+	{
+		if (parallel_safety_walker((Node *) ii_Predicate, context))
+			return true;
+	}
+
+	/*
+	 * Check parallel-safety of any index AM support functions.
+	 */
+	indnatts = IndexRelationGetNumberOfAttributes(index_rel);
+	nsupport = indnatts * index_rel->rd_indam->amsupport;
+	if (nsupport > 0)
+	{
+		for (i = 0; i < nsupport; i++)
+		{
+			char proparallel;
+
+			Oid funcOid = index_rel->rd_support[i];
+			if (!OidIsValid(funcOid))
+				continue;
+
+			proparallel = func_parallel(funcOid);
+			if (max_parallel_hazard_test(proparallel, context) &&
+			   !context->check_all)
+				return true;
+			else if (proparallel != PROPARALLEL_SAFE)
+			{
+				context->func_oids = lappend(context->func_oids,
+					make_safety_object(funcOid, ProcedureRelationId, proparallel));
+			}
+		}
+	}
+
+	return false;
+}
+
+/*
+ * target_rel_index_max_parallel_hazard
+ *
+ * Finds all the PARALLEL UNSAFE/RESTRICTED objects for any existing index
+ * expressions or index predicate of a specified relation.
+ */
+static bool
+target_rel_index_max_parallel_hazard(Relation rel, max_parallel_hazard_context \
*context) +{
+	List	   *index_oid_list;
+	ListCell   *lc;
+	LOCKMODE	lockmode = AccessShareLock;
+	bool		max_hazard_found;
+
+
+	index_oid_list = RelationGetIndexList(rel);
+	foreach(lc, index_oid_list)
+	{
+		Relation	index_rel;
+		List	   *ii_Expressions;
+		List	   *ii_Predicate;
+		List	   *temp_objects;
+		Oid			index_oid = lfirst_oid(lc);
+
+		temp_objects = context->func_oids;
+		context->func_oids = NIL;
+		context->max_hazard = PROPARALLEL_SAFE;
+
+		index_rel = index_open(index_oid, lockmode);
+
+		/* Check index expression */
+		ii_Expressions = RelationGetIndexExpressions(index_rel);
+		ii_Predicate = RelationGetIndexPredicate(index_rel);
+
+		max_hazard_found = index_expr_max_parallel_hazard(index_rel, ii_Expressions,
+										   ii_Predicate, context->check_all,
+										   context->max_interesting,
+										   context);
+
+		index_close(index_rel, lockmode);
+
+		if (max_hazard_found)
+			return true;
+
+		/* Add the index itself to the objects list */
+		else if (context->func_oids != NIL)
+		{
+			context->func_oids = lappend(context->func_oids,
+			make_safety_object(index_oid, IndexRelationId, context->max_hazard));
+		}
+
+		context->func_oids = list_concat(context->func_oids, temp_objects);
+		list_free(temp_objects);
+	}
+
+	list_free(index_oid_list);
+
+	return false;
+}
+
+/*
+ * target_rel_domain_max_parallel_hazard
+ *
+ * Finds all the PARALLEL UNSAFE/RESTRICTED objects for the specified DOMAIN type.
+ * Only any CHECK expressions are examined for parallel-safety.
+ */
+static bool
+target_rel_domain_max_parallel_hazard(Oid typid, max_parallel_hazard_context \
*context) +{
+	ListCell *lc;
+	List *domain_list;
+	List       *temp_objects;
+
+	domain_list = GetDomainConstraints(typid);
+
+	foreach(lc, domain_list)
+	{
+		DomainConstraintState *r = (DomainConstraintState *) lfirst(lc);
+
+		temp_objects = context->func_oids;
+		context->func_oids = NIL;
+		context->max_hazard = PROPARALLEL_SAFE;
+
+		if (parallel_safety_walker((Node *) r->check_expr, context) &&
+		   !context->check_all)
+			return true;
+
+		/* Add the Constraint itself to the objects list */
+		else if (context->func_oids != NIL)
+		{
+			context->func_oids = lappend(context->func_oids,
+				make_safety_object(get_domain_constraint_oid(typid, r->name, false),
+								   ConstraintRelationId,
+								   context->max_hazard));
+		}
+
+		context->func_oids = list_concat(context->func_oids, temp_objects);
+		list_free(temp_objects);
+	}
+
+	return false;
+
+}
+
+/*
+ * target_rel_partitions_max_parallel_hazard
+ *
+ * Finds all the PARALLEL UNSAFE/RESTRICTED objects for any partitions of a
+ * of a specified relation.
+ */
+static bool
+target_rel_partitions_max_parallel_hazard(Relation rel,
+										  max_parallel_hazard_context *context)
+{
+	int			i;
+	PartitionDesc pdesc;
+	PartitionKey pkey;
+	ListCell   *partexprs_item;
+	int			partnatts;
+	List	   *partexprs, *qual;
+
+	/* Check partition check expression */
+	qual = RelationGetPartitionQual(rel);
+	if (parallel_safety_walker((Node *) qual, context))
+		return true;
+
+	if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+		return false;
+
+	pkey = RelationGetPartitionKey(rel);
+
+	partnatts = get_partition_natts(pkey);
+	partexprs = get_partition_exprs(pkey);
+
+	partexprs_item = list_head(partexprs);
+	for (i = 0; i < partnatts; i++)
+	{
+		Oid funcOid = pkey->partsupfunc[i].fn_oid;
+		if (OidIsValid(funcOid))
+		{
+			char proparallel = func_parallel(funcOid);
+
+			if (max_parallel_hazard_test(proparallel, context) &&
+			   !context->check_all)
+				return true;
+
+			else if (proparallel != PROPARALLEL_SAFE)
+				context->func_oids = lappend(context->func_oids,
+					make_safety_object(funcOid, ProcedureRelationId, proparallel));
+		}
+		/* Check parallel-safety of any expressions in the partition key */
+		if (get_partition_col_attnum(pkey, i) == 0)
+		{
+			Node	   *check_expr = (Node *) lfirst(partexprs_item);
+
+			if (parallel_safety_walker(check_expr, context))
+				return true;
+
+			partexprs_item = lnext(partexprs, partexprs_item);
+		}
+	}
+
+	/* Recursively check each partition ... */
+
+	/* Create the PartitionDirectory infrastructure if we didn't already */
+	if (context->partition_directory == NULL)
+		context->partition_directory =
+			CreatePartitionDirectory(CurrentMemoryContext, false);
+
+	pdesc = PartitionDirectoryLookup(context->partition_directory, rel);
+
+	for (i = 0; i < pdesc->nparts; i++)
+	{
+		Relation	part_rel;
+		bool		max_hazard_found;
+
+		part_rel = table_open(pdesc->oids[i], AccessShareLock);
+		max_hazard_found =  target_rel_all_parallel_hazard_recurse(part_rel, context);
+		table_close(part_rel, AccessShareLock);
+
+		if (max_hazard_found)
+			break;
+	}
+
+	return false;
+}
+
+/*
+ * target_rel_chk_constr_max_parallel_hazard
+ *
+ * Finds all the PARALLEL UNSAFE/RESTRICTED objects for any CHECK expressions or
+ * CHECK constraints related to the specified relation.
+ */
+static List*
+target_rel_chk_constr_max_parallel_hazard(Relation rel,
+										  max_parallel_hazard_context *context)
+{
+	TupleDesc	tupdesc;
+	List	   *temp_objects;
+
+	tupdesc = RelationGetDescr(rel);
+
+	/*
+	 * Determine if there are any CHECK constraints which are not
+	 * parallel-safe.
+	 */
+	if (tupdesc->constr != NULL && tupdesc->constr->num_check > 0)
+	{
+		int			i;
+
+		ConstrCheck *check = tupdesc->constr->check;
+
+		for (i = 0; i < tupdesc->constr->num_check; i++)
+		{
+			Expr	   *check_expr = stringToNode(check[i].ccbin);
+
+			temp_objects = context->func_oids;
+			context->func_oids = NIL;
+			context->max_hazard = PROPARALLEL_SAFE;
+
+			parallel_safety_walker((Node *) check_expr, context);
+
+			if (context->func_oids != NIL)
+			{
+				context->func_oids = lappend(context->func_oids,
+				make_safety_object(get_relation_constraint_oid(rel->rd_rel->oid, check->ccname, \
true), ConstraintRelationId, context->max_hazard)); +			}
+
+			context->func_oids = list_concat(context->func_oids, temp_objects);
+			list_free(temp_objects);
+		}
+	}
+
+	return false;
+}
+
 /*
  * is_parallel_allowed_for_modify
  *
diff --git a/src/backend/utils/adt/misc.c b/src/backend/utils/adt/misc.c
index 88faf4d..18eabcd 100644
--- a/src/backend/utils/adt/misc.c
+++ b/src/backend/utils/adt/misc.c
@@ -23,6 +23,8 @@
 #include "access/sysattr.h"
 #include "access/table.h"
 #include "catalog/catalog.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_proc.h"
 #include "catalog/pg_tablespace.h"
 #include "catalog/pg_type.h"
 #include "catalog/system_fk_info.h"
@@ -31,6 +33,7 @@
 #include "common/keywords.h"
 #include "funcapi.h"
 #include "miscadmin.h"
+#include "optimizer/clauses.h"
 #include "parser/scansup.h"
 #include "pgstat.h"
 #include "postmaster/syslogger.h"
@@ -43,6 +46,7 @@
 #include "utils/lsyscache.h"
 #include "utils/ruleutils.h"
 #include "utils/timestamp.h"
+#include "utils/varlena.h"
 
 /*
  * Common subroutine for num_nulls() and num_nonnulls().
@@ -605,6 +609,96 @@ pg_collation_for(PG_FUNCTION_ARGS)
 	PG_RETURN_TEXT_P(cstring_to_text(generate_collation_name(collid)));
 }
 
+/*
+ * Find the worst parallel-hazard level in the given relation
+ *
+ * Returns the worst parallel hazard level (the earliest in this list:
+ * PROPARALLEL_UNSAFE, PROPARALLEL_RESTRICTED, PROPARALLEL_SAFE) that can
+ * be found in the given relation.
+ */
+Datum
+pg_get_max_parallel_hazard(PG_FUNCTION_ARGS)
+{
+	char		max_parallel_hazard;
+	Oid			relOid = PG_GETARG_OID(0);
+
+	(void) target_rel_max_parallel_hazard(relOid, false,
+										  PROPARALLEL_UNSAFE,
+										  &max_parallel_hazard);
+
+	PG_RETURN_CHAR(max_parallel_hazard);
+}
+
+/*
+ * Determine whether the target relation is safe to execute parallel modification.
+ *
+ * Return all the PARALLEL RESTRICTED/UNSAFE objects.
+ */
+Datum
+pg_get_parallel_safety(PG_FUNCTION_ARGS)
+{
+#define PG_GET_PARALLEL_SAFETY_COLS	3
+	List	   *objects;
+	ListCell   *object;
+	TupleDesc	tupdesc;
+	Tuplestorestate *tupstore;
+	MemoryContext per_query_ctx;
+	MemoryContext oldcontext;
+	ReturnSetInfo *rsinfo;
+	char		max_parallel_hazard;
+	Oid			relOid = PG_GETARG_OID(0);
+
+	rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+
+	/* check to see if caller supports us returning a tuplestore */
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mode required, but it is not allowed in this context")));
+
+	/* Build a tuple descriptor for our result type */
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+	tupstore = tuplestore_begin_heap(true, false, work_mem);
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	objects = target_rel_max_parallel_hazard(relOid, true,
+											 PROPARALLEL_UNSAFE,
+											 &max_parallel_hazard);
+	foreach(object, objects)
+	{
+		Datum		values[PG_GET_PARALLEL_SAFETY_COLS];
+		bool		nulls[PG_GET_PARALLEL_SAFETY_COLS];
+		safety_object *sobject = (safety_object *) lfirst(object);
+
+		memset(nulls, 0, sizeof(nulls));
+
+		values[0] = sobject->objid;
+		values[1] = sobject->classid;
+		values[2] = sobject->proparallel;
+
+		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+	}
+
+	/* clean up and return the tuplestore */
+	tuplestore_donestoring(tupstore);
+
+	return (Datum) 0;
+}
+
 
 /*
  * pg_relation_is_updatable - determine which update events the specified
diff --git a/src/backend/utils/cache/typcache.c b/src/backend/utils/cache/typcache.c
index de96e96..8c0a705 100644
--- a/src/backend/utils/cache/typcache.c
+++ b/src/backend/utils/cache/typcache.c
@@ -2518,6 +2518,21 @@ compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid \
arg2)  return 0;
 }
 
+
+List *
+GetDomainConstraints(Oid type_id)
+{
+	TypeCacheEntry *typentry;
+	List		   *constraints = NIL;
+
+	typentry = lookup_type_cache(type_id, TYPECACHE_DOMAIN_CONSTR_INFO);
+
+	if(typentry->domainData != NULL)
+		constraints = typentry->domainData->constraints;
+
+	return constraints;
+}
+
 /*
  * Load (or re-load) the enumData member of the typcache entry.
  */
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index acbcae4..2f8c528 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -3766,6 +3766,20 @@
   provolatile => 's', prorettype => 'regclass', proargtypes => 'regclass',
   prosrc => 'pg_get_replica_identity_index' },
 
+{ oid => '6122',
+  descr => 'parallel unsafe/restricted objects in the target relation',
+  proname => 'pg_get_parallel_safety', prorows => '100',
+  proretset => 't', provolatile => 'v', proparallel => 'u',
+  prorettype => 'record', proargtypes => 'regclass',
+  proallargtypes => '{regclass,oid,oid,char}',
+  proargmodes => '{i,o,o,o}',
+  proargnames => '{table_name, objid, classid, proparallel}',
+  prosrc => 'pg_get_parallel_safety' },
+
+{ oid => '6123', descr => 'worst parallel-hazard level in the given relation for \
DML', +  proname => 'pg_get_max_parallel_hazard', prorettype => 'char', proargtypes \
=> 'regclass', +  prosrc => 'pg_get_max_parallel_hazard', provolatile => 'v', \
proparallel => 'u' }, +
 # Deferrable unique constraint trigger
 { oid => '1250', descr => 'deferred UNIQUE constraint check',
   proname => 'unique_key_recheck', provolatile => 'v', prorettype => 'trigger',
@@ -3773,11 +3787,11 @@
 
 # Generic referential integrity constraint triggers
 { oid => '1644', descr => 'referential integrity FOREIGN KEY ... REFERENCES',
-  proname => 'RI_FKey_check_ins', provolatile => 'v', prorettype => 'trigger',
-  proargtypes => '', prosrc => 'RI_FKey_check_ins' },
+  proname => 'RI_FKey_check_ins', provolatile => 'v', proparallel => 'r',
+  prorettype => 'trigger', proargtypes => '', prosrc => 'RI_FKey_check_ins' },
 { oid => '1645', descr => 'referential integrity FOREIGN KEY ... REFERENCES',
-  proname => 'RI_FKey_check_upd', provolatile => 'v', prorettype => 'trigger',
-  proargtypes => '', prosrc => 'RI_FKey_check_upd' },
+  proname => 'RI_FKey_check_upd', provolatile => 'v', proparallel => 'r',
+  prorettype => 'trigger', proargtypes => '', prosrc => 'RI_FKey_check_upd' },
 { oid => '1646', descr => 'referential integrity ON DELETE CASCADE',
   proname => 'RI_FKey_cascade_del', provolatile => 'v', prorettype => 'trigger',
   proargtypes => '', prosrc => 'RI_FKey_cascade_del' },
diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h
index 32b5656..7ceb750 100644
--- a/src/include/optimizer/clauses.h
+++ b/src/include/optimizer/clauses.h
@@ -23,6 +23,13 @@ typedef struct
 	List	  **windowFuncs;	/* lists of WindowFuncs for each winref */
 } WindowFuncLists;
 
+typedef struct safety_object
+{
+	Oid objid;
+	Oid classid;
+	char proparallel;
+} safety_object;
+
 extern bool contain_agg_clause(Node *clause);
 
 extern bool contain_window_function(Node *clause);
@@ -54,5 +61,8 @@ extern Query *inline_set_returning_function(PlannerInfo *root,
 											RangeTblEntry *rte);
 
 extern bool is_parallel_allowed_for_modify(Query *parse);
+extern List *target_rel_max_parallel_hazard(Oid relOid, bool findall,
+											char max_interesting,
+											char *max_hazard);
 
 #endif							/* CLAUSES_H */
diff --git a/src/include/utils/typcache.h b/src/include/utils/typcache.h
index 1d68a9a..28ca7d8 100644
--- a/src/include/utils/typcache.h
+++ b/src/include/utils/typcache.h
@@ -199,6 +199,8 @@ extern uint64 assign_record_type_identifier(Oid type_id, int32 \
typmod);  
 extern int	compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2);
 
+extern List *GetDomainConstraints(Oid type_id);
+
 extern size_t SharedRecordTypmodRegistryEstimate(void);
 
 extern void SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *,
-- 
2.7.2.windows.1



[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic