[prev in list] [next in list] [prev in thread] [next in thread] 

List:       postgresql-hackers
Subject:    Re: Handle infinite recursion in logical replication setup
From:       vignesh C <vignesh21 () gmail ! com>
Date:       2022-06-30 16:22:14
Message-ID: CALDaNm1qrk3mPbEjTYgg7GL_0J9k-OHb+Q2ubeP=_Mt-2zmCZg () mail ! gmail ! com
[Download RAW message or body]

On Thu, Jun 30, 2022 at 9:17 AM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:
>
> On Tue, Jun 28, 2022 2:18 PM vignesh C <vignesh21@gmail.com> wrote:
> >
> > Thanks for the comments, the attached  v25 patch has the changes for the
> > same.
> >
>
> Thanks for updating the patch. Here are some comments.
>
> 0002 patch:
> ==============
> 1.
> +# Test the CREATE SUBSCRIPTION 'origin' parameter and its interaction with
> +# 'copy_data' parameter.
>
> It seems we should move "and its interaction with 'copy_data' parameter" to
> 0003 patch.

Modified

> 0003 patch
> ==============
> 1.
> When using ALTER SUBSCRIPTION ... REFRESH, subscription will throw an error if
> any table is subscribed in publisher, even if the table has been subscribed
> before refresh (which won't do the initial copy when refreshing). It looks the
> previously subscribed tables don't need this check. Would it be better that we
> only check the tables which need to do the initial copy?

Modified

> 2.
> +                               errmsg("table:%s.%s might have replicated data in the publisher",
> +                                          nspname, relname),
>
> I think the table name needs to be enclosed in double quotes, which is
> consistent with other messages.

Modified

Thanks for the comments, the attached v26 patch has the changes for the same.

Regards,
Vignesh

["v26-0001-Add-a-missing-test-to-verify-only-local-paramete.patch" (text/x-patch)]

From 1f1e3c579c7070349a999baf8d8b56b97b04dc31 Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignesh21@gmail.com>
Date: Thu, 26 May 2022 19:29:33 +0530
Subject: [PATCH v26 1/3] Add a missing test to verify only-local parameter in
 test_decoding plugin.

Add a missing test to verify only-local parameter in test_decoding plugin.
---
 contrib/test_decoding/expected/replorigin.out | 55 +++++++++++++++++++
 contrib/test_decoding/sql/replorigin.sql      | 15 +++++
 2 files changed, 70 insertions(+)

diff --git a/contrib/test_decoding/expected/replorigin.out \
b/contrib/test_decoding/expected/replorigin.out index 2e9ef7c823..56da7de79a 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -257,3 +257,58 @@ SELECT pg_replication_origin_drop('regress_test_decoding: \
regression_slot_no_lsn  
 (1 row)
 
+-- Verify the behaviour of the only-local parameter
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_only_local', \
'test_decoding'); + ?column? 
+----------
+ init
+(1 row)
+
+SELECT pg_replication_origin_create('regress_test_decoding: \
regression_slot_only_local'); + pg_replication_origin_create 
+------------------------------
+                            1
+(1 row)
+
+SELECT pg_replication_origin_session_setup('regress_test_decoding: \
regression_slot_only_local'); + pg_replication_origin_session_setup 
+-------------------------------------
+ 
+(1 row)
+
+INSERT INTO origin_tbl(data) VALUES ('only_local, commit1');
+-- Remote origin data returned when only-local parameter is not set
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_only_local', NULL, \
NULL, 'skip-empty-xacts', '1', 'include-xids', '0', 'only-local', '0'); +             \
data                                        \
+--------------------------------------------------------------------------------- + \
BEGIN + table public.origin_tbl: INSERT: id[integer]:8 data[text]:'only_local, \
commit1' + COMMIT
+(3 rows)
+
+INSERT INTO origin_tbl(data) VALUES ('only_local, commit2');
+-- Remote origin data not returned when only-local parameter is set
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_only_local', NULL, \
NULL, 'skip-empty-xacts', '1', 'include-xids', '0', 'only-local', '1'); + data 
+------
+(0 rows)
+
+-- Clean up
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset 
+-------------------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('regression_slot_only_local');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_replication_origin_drop('regress_test_decoding: \
regression_slot_only_local'); + pg_replication_origin_drop 
+----------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/replorigin.sql \
b/contrib/test_decoding/sql/replorigin.sql index 2e28a48777..4b79c919bb 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -119,3 +119,18 @@ SELECT data FROM \
pg_logical_slot_get_changes('regression_slot_no_lsn', NULL, NUL  SELECT \
pg_replication_origin_session_reset();  SELECT \
pg_drop_replication_slot('regression_slot_no_lsn');  SELECT \
pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn'); +
+-- Verify the behaviour of the only-local parameter
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_only_local', \
'test_decoding'); +SELECT pg_replication_origin_create('regress_test_decoding: \
regression_slot_only_local'); +SELECT \
pg_replication_origin_session_setup('regress_test_decoding: \
regression_slot_only_local'); +INSERT INTO origin_tbl(data) VALUES ('only_local, \
commit1'); +-- Remote origin data returned when only-local parameter is not set
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_only_local', NULL, \
NULL, 'skip-empty-xacts', '1', 'include-xids', '0', 'only-local', '0'); +INSERT INTO \
origin_tbl(data) VALUES ('only_local, commit2'); +-- Remote origin data not returned \
when only-local parameter is set +SELECT data FROM \
pg_logical_slot_get_changes('regression_slot_only_local', NULL, NULL, \
'skip-empty-xacts', '1', 'include-xids', '0', 'only-local', '1'); +-- Clean up
+SELECT pg_replication_origin_session_reset();
+SELECT pg_drop_replication_slot('regression_slot_only_local');
+SELECT pg_replication_origin_drop('regress_test_decoding: \
                regression_slot_only_local');
-- 
2.32.0


["v26-0004-Document-bidirectional-logical-replication-steps.patch" (text/x-patch)]

From ce74c604bc17e82a33de55f2694210800a7f6f55 Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignesh21@gmail.com>
Date: Mon, 27 Jun 2022 18:44:18 +0530
Subject: [PATCH v26 4/4] Document bidirectional logical replication steps in
 various scenarios.

Document the steps for the following:
a) Setting bidirectional replication between two nodes.
b) Adding a new node when there is no table data on any of the nodes.
c) Adding a new node when table data is present on the existing nodes.
d) Generic steps for adding a new node to an existing set of nodes.
---
 doc/src/sgml/logical-replication.sgml     | 301 ++++++++++++++++++++++
 doc/src/sgml/ref/create_subscription.sgml |   5 +-
 2 files changed, 305 insertions(+), 1 deletion(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index bdf1e7b727..9d9d92c4c9 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1479,4 +1479,305 @@ CREATE SUBSCRIPTION mysub CONNECTION 'dbname=foo host=bar user=repuser' PUBLICAT
    incremental changes to those tables.
   </para>
  </sect1>
+
+ <sect1 id="logical-replication-bidirectional">
+  <title>Bidirectional logical replication</title>
+
+   <para>
+    Bidirectional replication is useful for creating a multi-master database
+    environment for replicating read/write operations performed by any of the
+    member nodes. The steps to create a bidirectional replication in various
+    scenarios are given below.
+   </para>
+
+   <warning>
+    <para>
+     Setting up bidirectional logical replication requires multiple steps to be
+     performed on various nodes. Because not all operations are transactional,
+     the user is advised to take backups.
+    </para>
+   </warning>
+
+  <sect2 id="setting-bidirectional-replication-two-nodes">
+   <title>Setting bidirectional replication between two nodes</title>
+   <para>
+    The following steps demonstrate how to create a two-node bidirectional
+    replication when there is no table data present on both nodes
+    <literal>node1</literal> and <literal>node2</literal>:
+   </para>
+
+   <para>
+    Create a publication on <literal>node1</literal>:
+<programlisting>
+node1=# CREATE PUBLICATION pub_node1 FOR TABLE t1;
+CREATE PUBLICATION
+</programlisting></para>
+
+   <para>
+    Create a publication on <literal>node2</literal>:
+<programlisting>
+node2=# CREATE PUBLICATION pub_node2 FOR TABLE t1;
+CREATE PUBLICATION
+</programlisting></para>
+
+   <para>
+    Lock the table <literal>t1</literal> on <literal>node1</literal> and
+    <literal>node2</literal> in <literal>EXCLUSIVE</literal> mode until the
+    setup is completed.
+   </para>
+
+   <para>
+    Create a subscription on <literal>node2</literal> to subscribe to
+    <literal>node1</literal>:
+<programlisting>
+node2=# CREATE SUBSCRIPTION sub_node2_node1
+node2-# CONNECTION 'dbname=foo host=node1 user=repuser'
+node2-# PUBLICATION pub_node1
+node2-# WITH (copy_data = off, origin = local);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Create a subscription on <literal>node1</literal> to subscribe to
+    <literal>node2</literal>:
+<programlisting>
+node1=# CREATE SUBSCRIPTION sub_node1_node2
+node1-# CONNECTION 'dbname=foo host=node2 user=repuser'
+node1-# PUBLICATION pub_node2
+node1-# WITH (copy_data = off, origin = local);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Now the bidirectional logical replication setup is complete between
+    <literal>node1</literal> and <literal>node2</literal>. Any incremental
+    changes from <literal>node1</literal> will be replicated to
+    <literal>node2</literal>, and any incremental changes from
+    <literal>node2</literal> will be replicated to <literal>node1</literal>.
+   </para>
+  </sect2>
+
+  <sect2 id="add-new-node">
+   <title>Adding a new node when there is no table data on any of the nodes</title>
+   <para>
+    The following steps demonstrate adding a new node <literal>node3</literal>
+    to the existing <literal>node1</literal> and <literal>node2</literal> when
+    there is no <literal>t1</literal> data on any of the nodes. This requires
+    creating subscriptions on <literal>node1</literal> and
+    <literal>node2</literal> to replicate the data from
+    <literal>node3</literal> and creating subscriptions on
+    <literal>node3</literal> to replicate data from <literal>node1</literal>
+    and <literal>node2</literal>. Note: These steps assume that the
+    bidirectional logical replication between <literal>node1</literal> and
+    <literal>node2</literal> is already completed.
+   </para>
+
+   <para>
+    Create a publication on <literal>node3</literal>:
+<programlisting>
+node3=# CREATE PUBLICATION pub_node3 FOR TABLE t1;
+CREATE PUBLICATION
+</programlisting></para>
+
+   <para>
+    Lock table <literal>t1</literal> on all the nodes <literal>node1</literal>,
+    <literal>node2</literal> and <literal>node3</literal> in
+    <literal>EXCLUSIVE</literal> mode until the setup is completed.
+   </para>
+
+   <para>
+    Create a subscription on <literal>node1</literal> to subscribe to
+    <literal>node3</literal>:
+<programlisting>
+node1=# CREATE SUBSCRIPTION sub_node1_node3
+node1-# CONNECTION 'dbname=foo host=node3 user=repuser'
+node1-# PUBLICATION pub_node3
+node1-# WITH (copy_data = off, origin = local);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Create a subscription on <literal>node2</literal> to subscribe to
+    <literal>node3</literal>:
+<programlisting>
+node2=# CREATE SUBSCRIPTION sub_node2_node3
+node2-# CONNECTION 'dbname=foo host=node3 user=repuser'
+node2-# PUBLICATION pub_node3
+node2-# WITH (copy_data = off, origin = local);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Create a subscription on <literal>node3</literal> to subscribe to
+    <literal>node1</literal>:
+<programlisting>
+node3=# CREATE SUBSCRIPTION sub_node3_node1
+node3-# CONNECTION 'dbname=foo host=node1 user=repuser'
+node3-# PUBLICATION pub_node1
+node3-# WITH (copy_data = off, origin = local);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Create a subscription on <literal>node3</literal> to subscribe to
+    <literal>node2</literal>:
+<programlisting>
+node3=# CREATE SUBSCRIPTION sub_node3_node2
+node3-# CONNECTION 'dbname=foo host=node2 user=repuser'
+node3-# PUBLICATION pub_node2
+node3-# WITH (copy_data = off, origin = local);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Now the bidirectional logical replication setup is complete between
+    <literal>node1</literal>, <literal>node2</literal> and
+    <literal>node3</literal>. Incremental changes made on any node will be
+    replicated to the other two nodes.
+   </para>
+  </sect2>
+
+  <sect2 id="add-new-node-data-on-existing-node">
+   <title>Adding a new node when table data is present on the existing nodes</title>
+    <para>
+     The following steps demonstrate adding a new node <literal>node3</literal>
+     which has no <literal>t1</literal> data to the existing
+     <literal>node1</literal> and <literal>node2</literal> where
+     <literal>t1</literal> data is present. This needs similar steps; the only
+     change required here is that <literal>node3</literal> should create a
+     subscription with <literal>copy_data = force</literal> to one of the
+     existing nodes so it can receive the existing <literal>t1</literal> data
+     during initial data synchronization. Note: These steps assume that the
+     bidirectional logical replication between <literal>node1</literal> and
+     <literal>node2</literal> is already completed, and the pre-existing data
+     in table <literal>t1</literal> is already synchronized on both those
+     nodes.
+   </para>
+
+   <para>
+    Create a publication on <literal>node3</literal>:
+<programlisting>
+node3=# CREATE PUBLICATION pub_node3 FOR TABLE t1;
+CREATE PUBLICATION
+</programlisting></para>
+
+   <para>
+    Lock table <literal>t1</literal> on <literal>node2</literal> and
+    <literal>node3</literal> in <literal>EXCLUSIVE</literal> mode until the
+    setup is completed. There is no need to lock table <literal>t1</literal> on
+    <literal>node1</literal> because any data changes made will be synchronized
+    while creating the subscription with <literal>copy_data = force</literal>.
+   </para>
+
+   <para>
+    Create a subscription on <literal>node1</literal> to subscribe to
+    <literal>node3</literal>:
+<programlisting>
+node1=# CREATE SUBSCRIPTION sub_node1_node3
+node1-# CONNECTION 'dbname=foo host=node3 user=repuser'
+node1-# PUBLICATION pub_node3
+node1-# WITH (copy_data = off, origin = local);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Create a subscription on <literal>node2</literal> to subscribe to
+    <literal>node3</literal>:
+<programlisting>
+node2=# CREATE SUBSCRIPTION sub_node2_node3
+node2-# CONNECTION 'dbname=foo host=node3 user=repuser'
+node2-# PUBLICATION pub_node3
+node2-# WITH (copy_data = off, origin = local);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Create a subscription on <literal>node3</literal> to subscribe to
+    <literal>node1</literal>. Use <literal>copy_data = force </literal> so that
+    the existing table data is copied during initial sync:
+<programlisting>
+node3=# CREATE SUBSCRIPTION sub_node3_node1
+node3-# CONNECTION 'dbname=foo host=node1 user=repuser'
+node3-# PUBLICATION pub_node1
+node3-# WITH (copy_data = force, origin = local);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Create a subscription on <literal>node3</literal> to subscribe to
+    <literal>node2</literal>. Use <literal>copy_data = off</literal>
+    because the initial table data would have been
+    already copied in the previous step:
+<programlisting>
+node3=# CREATE SUBSCRIPTION sub_node3_node2
+node3-# CONNECTION 'dbname=foo host=node2 user=repuser'
+node3-# PUBLICATION pub_node2
+node3-# WITH (copy_data = off, origin = local);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Now the bidirectional logical replication setup is complete between
+    <literal>node1</literal>, <literal>node2</literal> and
+    <literal>node3</literal>. Incremental changes made on any node will be
+    replicated to the other two nodes.
+   </para>
+  </sect2>
+
+  <sect2 id="add-node-data-present-on-new-node">
+   <title>Adding a new node when table data is present on the new node</title>
+   <note>
+    <para>
+     Adding a new node when table data is present on the new node is not
+     supported.
+    </para>
+   </note>
+  </sect2>
+
+  <sect2 id="generic-steps-add-new-node">
+   <title>Generic steps for adding a new node to an existing set of nodes</title>
+   <para>
+    Step-1: Create a publication on the new node.
+   </para>
+   <para>
+    Step-2: Lock the required tables of the new node in
+    <literal>EXCLUSIVE</literal> mode until the setup is complete. (This lock
+    is necessary to prevent any modifications from happening on the new node.
+    If data modifications occurred after Step-3, there is a chance they could
+    be published to the first node and then synchronized back to the new node
+    while creating the subscription in Step-5. This would result in
+    inconsistent data).
+   </para>
+   <para>
+    Step-3. Create subscriptions on existing nodes to the publication on the
+    new node with <literal>origin = local</literal> and
+    <literal>copy_data = off</literal>. (The <literal>copy_data = off</literal>
+    is OK here because it is asserted that the published tables of the new node
+    will have no pre-existing data).
+   </para>
+   <para>
+    Step-4. Lock the required tables of the existing nodes except the first node
+    in <literal>EXCLUSIVE</literal> mode until the setup is complete. (This
+    lock is necessary to prevent any modifications from happening. If data
+    modifications occur, there is a chance that modifications done between
+    Step-5 and Step-6 will not be synchronized to the new node. This would
+    result in inconsistent data. There is no need to lock the required tables
+    on the first node because any data changes made will be synchronized while
+    creating the subscription with <literal>copy_data = force</literal>).
+   </para>
+   <para>
+    Step-5. Create a subscription on the new node to the publication on the
+    first node with <literal>origin = local</literal> and
+    <literal>copy_data = force</literal>. (This will copy the same table data
+    from the existing nodes to the new node).
+   </para>
+   <para>
+    Step-6. Create subscriptions on the new node to publications on the
+    remaining nodes with <literal>origin = local</literal> and
+    <literal>copy_data = off</literal>. (The copy_data = off is OK here because
+    the existing node data was already copied to the new node in Step-5).
+   </para>
+  </sect2>
+ </sect1>
+
 </chapter>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index f0bc2ba63d..431abda656 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -408,7 +408,10 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
    subscribed to the same table from other publishers and, if so, throw an
    error to prevent possible non-local data from being copied. The user can
    override this check and continue with the copy operation by specifying
-   <literal>copy_data = force</literal>.
+   <literal>copy_data = force</literal>. Refer to
+   <xref linkend="logical-replication-bidirectional"/> for how
+   <literal>copy_data</literal> and <literal>origin</literal> can be used to
+   set up bidirectional replication.
   </para>
 
  </refsect1>
-- 
2.32.0


["v26-0002-Skip-replication-of-non-local-data.patch" (text/x-patch)]

From f13628f7ed65b3a7fcce9ab27c39a235010019ce Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignesh21@gmail.com>
Date: Mon, 20 Jun 2022 14:36:51 +0530
Subject: [PATCH v26 2/3] Skip replication of non local data.

This patch adds a new SUBSCRIPTION parameter "origin". It specifies whether
the subscription will request the publisher to only send changes that
originated locally, or to send changes regardless of origin. Setting it to
"local" means that the subscription will request the publisher to only send
changes that originated locally. Setting it to "any" means that that the
publisher sends changes regardless of their origin. The default is "any".
Usage:
CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres port=9999'
PUBLICATION pub1 WITH (origin = local);

Even though "origin" parameter allows only "local" and "any" values, it is
implemented as a string type so that the parameter can be extended in future
versions to support filtering using origin names specified by the user.
---
 doc/src/sgml/catalogs.sgml                    |  13 ++
 doc/src/sgml/ref/alter_subscription.sgml      |   5 +-
 doc/src/sgml/ref/create_subscription.sgml     |  16 ++
 src/backend/catalog/pg_subscription.c         |   8 +
 src/backend/catalog/system_views.sql          |   4 +-
 src/backend/commands/subscriptioncmds.c       |  43 +++++-
 .../libpqwalreceiver/libpqwalreceiver.c       |   6 +
 src/backend/replication/logical/worker.c      |   2 +
 src/backend/replication/pgoutput/pgoutput.c   |  27 +++-
 src/bin/pg_dump/pg_dump.c                     |  16 +-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/pg_dump/t/002_pg_dump.pl              |  22 +++
 src/bin/psql/describe.c                       |   8 +-
 src/bin/psql/tab-complete.c                   |   4 +-
 src/include/catalog/pg_subscription.h         |  17 ++
 src/include/replication/pgoutput.h            |   1 +
 src/include/replication/walreceiver.h         |   2 +
 src/test/regress/expected/subscription.out    | 142 ++++++++++-------
 src/test/regress/sql/subscription.sql         |  10 ++
 src/test/subscription/t/032_origin.pl         | 146 ++++++++++++++++++
 20 files changed, 420 insertions(+), 73 deletions(-)
 create mode 100644 src/test/subscription/t/032_origin.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 25b02c4e37..dc4b2d3e4c 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7903,6 +7903,19 @@ SCRAM-SHA-256$<replaceable>&lt;iteration \
count&gt;</replaceable>:<replaceable>&l  </para></entry>
      </row>
 
+    <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>suborigin</structfield> <type>text</type>
+      </para>
+      <para>
+       The origin value must be either <literal>local</literal> or
+       <literal>any</literal>. The default is <literal>any</literal>.
+       If <literal>local</literal>, the subscription will request the publisher
+       to only send changes that originated locally. If <literal>any</literal>,
+       the publisher sends changes regardless of their origin.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subconninfo</structfield> <type>text</type>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml \
b/doc/src/sgml/ref/alter_subscription.sgml index 353ea5def2..64efc21f53 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -207,8 +207,9 @@ ALTER SUBSCRIPTION <replaceable \
class="parameter">name</replaceable> RENAME TO <  information.  The parameters that \
can be altered  are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
-      <literal>binary</literal>, <literal>streaming</literal>, and
-      <literal>disable_on_error</literal>.
+      <literal>binary</literal>, <literal>streaming</literal>,
+      <literal>disable_on_error</literal>, and
+      <literal>origin</literal>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml \
b/doc/src/sgml/ref/create_subscription.sgml index 34b3264b26..b0a6ebcb7d 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -302,6 +302,22 @@ CREATE SUBSCRIPTION <replaceable \
class="parameter">subscription_name</replaceabl  </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry>
+        <term><literal>origin</literal> (<type>string</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the subscription will request the publisher to only
+          send changes that originated locally, or to send changes regardless
+          of origin. Setting <literal>origin</literal> to
+          <literal>local</literal> means that the subscription will request the
+          publisher to only send changes that originated locally. Setting
+          <literal>origin</literal> to <literal>any</literal> means that the
+          publisher sends changes regardless of their origin. The default is
+          <literal>any</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist></para>
 
     </listitem>
diff --git a/src/backend/catalog/pg_subscription.c \
b/src/backend/catalog/pg_subscription.c index add51caadf..8ea58b30ba 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -72,6 +72,14 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->twophasestate = subform->subtwophasestate;
 	sub->disableonerr = subform->subdisableonerr;
 
+	/* Get origin */
+	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+							tup,
+							Anum_pg_subscription_suborigin,
+							&isnull);
+	Assert(!isnull);
+	sub->origin = TextDatumGetCString(datum);
+
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
 							tup,
diff --git a/src/backend/catalog/system_views.sql \
b/src/backend/catalog/system_views.sql index fedaed533b..46305e2931 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1298,8 +1298,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
-              subbinary, substream, subtwophasestate, subdisableonerr, subslotname,
-              subsynccommit, subpublications)
+              subbinary, substream, subtwophasestate, subdisableonerr,
+              suborigin, subslotname, subsynccommit, subpublications)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c \
b/src/backend/commands/subscriptioncmds.c index 83e6eae855..3070ebc7e7 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -64,6 +64,7 @@
 #define SUBOPT_TWOPHASE_COMMIT		0x00000200
 #define SUBOPT_DISABLE_ON_ERR		0x00000400
 #define SUBOPT_LSN					0x00000800
+#define SUBOPT_ORIGIN				0x00001000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -86,6 +87,7 @@ typedef struct SubOpts
 	bool		streaming;
 	bool		twophase;
 	bool		disableonerr;
+	char	   *origin;
 	XLogRecPtr	lsn;
 } SubOpts;
 
@@ -118,7 +120,7 @@ parse_subscription_options(ParseState *pstate, List \
*stmt_options,  IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
 				 SUBOPT_COPY_DATA));
 
-	/* Set default values for the boolean supported options. */
+	/* Set default values for the supported options. */
 	if (IsSet(supported_opts, SUBOPT_CONNECT))
 		opts->connect = true;
 	if (IsSet(supported_opts, SUBOPT_ENABLED))
@@ -137,6 +139,8 @@ parse_subscription_options(ParseState *pstate, List \
*stmt_options,  opts->twophase = false;
 	if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
 		opts->disableonerr = false;
+	if (IsSet(supported_opts, SUBOPT_ORIGIN))
+		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -265,6 +269,29 @@ parse_subscription_options(ParseState *pstate, List \
*stmt_options,  opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
 			opts->disableonerr = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
+				 strcmp(defel->defname, "origin") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_ORIGIN;
+			pfree(opts->origin);
+
+			/*
+			 * Even though "origin" parameter allows only "local" and "any"
+			 * values, it is implemented as a string type so that the parameter
+			 * can be extended in future versions to support filtering using
+			 * origin names specified by the user.
+			 */
+			opts->origin = defGetString(defel);
+
+			if ((strcmp(opts->origin, LOGICALREP_ORIGIN_LOCAL) != 0) &&
+				(strcmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
+				ereport(ERROR,
+						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						errmsg("unrecognized origin value: \"%s\"", opts->origin));
+		}
 		else if (IsSet(supported_opts, SUBOPT_LSN) &&
 				 strcmp(defel->defname, "lsn") == 0)
 		{
@@ -531,7 +558,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt \
*stmt,  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
-					  SUBOPT_DISABLE_ON_ERR);
+					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -607,6 +634,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt \
*stmt,  LOGICALREP_TWOPHASE_STATE_PENDING :
 					 LOGICALREP_TWOPHASE_STATE_DISABLED);
 	values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
+	values[Anum_pg_subscription_suborigin - 1] =
+		CStringGetTextDatum(opts.origin);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
@@ -1015,7 +1044,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt \
*stmt,  {
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR);
+								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
+								  SUBOPT_ORIGIN);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1072,6 +1102,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt \
*stmt,  = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
+				{
+					values[Anum_pg_subscription_suborigin - 1] =
+						CStringGetTextDatum(opts.origin);
+					replaces[Anum_pg_subscription_suborigin - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c \
b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index \
                0d89db4e6a..467a11e3a9 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -453,6 +453,12 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			PQserverVersion(conn->streamConn) >= 150000)
 			appendStringInfoString(&cmd, ", two_phase 'on'");
 
+		/* FIXME: 150000 should be changed to 160000 later for PG16. */
+		if (options->proto.logical.origin &&
+			PQserverVersion(conn->streamConn) >= 150000)
+			appendStringInfo(&cmd, ", origin '%s'",
+							 options->proto.logical.origin);
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
 		if (!pubnames_str)
diff --git a/src/backend/replication/logical/worker.c \
b/src/backend/replication/logical/worker.c index 38e3b1c1b3..5f8c541763 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3077,6 +3077,7 @@ maybe_reread_subscription(void)
 		strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
 		newsub->binary != MySubscription->binary ||
 		newsub->stream != MySubscription->stream ||
+		strcmp(newsub->origin, MySubscription->origin) != 0 ||
 		newsub->owner != MySubscription->owner ||
 		!equal(newsub->publications, MySubscription->publications))
 	{
@@ -3758,6 +3759,7 @@ ApplyWorkerMain(Datum main_arg)
 	options.proto.logical.binary = MySubscription->binary;
 	options.proto.logical.streaming = MySubscription->stream;
 	options.proto.logical.twophase = false;
+	options.proto.logical.origin = pstrdup(MySubscription->origin);
 
 	if (!am_tablesync_worker())
 	{
diff --git a/src/backend/replication/pgoutput/pgoutput.c \
b/src/backend/replication/pgoutput/pgoutput.c index 8deae57143..3ceaa4bbd4 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -16,6 +16,7 @@
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
+#include "catalog/pg_subscription.h"
 #include "commands/defrem.h"
 #include "executor/executor.h"
 #include "fmgr.h"
@@ -81,6 +82,7 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext \
*ctx,  
 static bool publications_valid;
 static bool in_streaming;
+static bool publish_local_origin;
 
 static List *LoadPublications(List *pubnames);
 static void publication_invalidation_cb(Datum arg, int cacheid,
@@ -287,6 +289,7 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		messages_option_given = false;
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
+	bool		origin_option_given = false;
 
 	data->binary = false;
 	data->streaming = false;
@@ -380,6 +383,24 @@ parse_output_parameters(List *options, PGOutputData *data)
 
 			data->two_phase = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "origin") == 0)
+		{
+			if (origin_option_given)
+				ereport(ERROR,
+						errcode(ERRCODE_SYNTAX_ERROR),
+						errmsg("conflicting or redundant options"));
+			origin_option_given = true;
+
+			data->origin = defGetString(defel);
+			if (strcmp(data->origin, LOGICALREP_ORIGIN_LOCAL) == 0)
+				publish_local_origin = true;
+			else if (strcmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
+				publish_local_origin = false;
+			else
+				ereport(ERROR,
+						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						errmsg("unrecognized origin value: \"%s\"", data->origin));
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -1698,12 +1719,16 @@ pgoutput_message(LogicalDecodingContext *ctx, \
ReorderBufferTXN *txn,  }
 
 /*
- * Currently we always forward.
+ * Return true if the data source (origin) is remote and the user has requested
+ * only local data, false otherwise.
  */
 static bool
 pgoutput_origin_filter(LogicalDecodingContext *ctx,
 					   RepOriginId origin_id)
 {
+	if (publish_local_origin && origin_id != InvalidRepOriginId)
+		return true;
+
 	return false;
 }
 
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 7cc9c72e49..27ce1455db 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4406,6 +4406,7 @@ getSubscriptions(Archive *fout)
 	int			i_substream;
 	int			i_subtwophasestate;
 	int			i_subdisableonerr;
+	int			i_suborigin;
 	int			i_subconninfo;
 	int			i_subslotname;
 	int			i_subsynccommit;
@@ -4455,13 +4456,19 @@ getSubscriptions(Archive *fout)
 	if (fout->remoteVersion >= 150000)
 		appendPQExpBufferStr(query,
 							 " s.subtwophasestate,\n"
-							 " s.subdisableonerr\n");
+							 " s.subdisableonerr,\n");
 	else
 		appendPQExpBuffer(query,
 						  " '%c' AS subtwophasestate,\n"
-						  " false AS subdisableonerr\n",
+						  " false AS subdisableonerr,\n",
 						  LOGICALREP_TWOPHASE_STATE_DISABLED);
 
+	/* FIXME: 150000 should be changed to 160000 later for PG16. */
+	if (fout->remoteVersion >= 150000)
+		appendPQExpBufferStr(query, " s.suborigin\n");
+	else
+		appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY);
+
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n"
 						 "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
@@ -4487,6 +4494,7 @@ getSubscriptions(Archive *fout)
 	i_substream = PQfnumber(res, "substream");
 	i_subtwophasestate = PQfnumber(res, "subtwophasestate");
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
+	i_suborigin = PQfnumber(res, "suborigin");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4516,6 +4524,7 @@ getSubscriptions(Archive *fout)
 			pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
 		subinfo[i].subdisableonerr =
 			pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
+		subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4589,6 +4598,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo \
*subinfo)  if (strcmp(subinfo->subdisableonerr, "t") == 0)
 		appendPQExpBufferStr(query, ", disable_on_error = true");
 
+	if (strcmp(subinfo->suborigin, LOGICALREP_ORIGIN_ANY) != 0)
+		appendPQExpBuffer(query, ", origin = %s", subinfo->suborigin);
+
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
 		appendPQExpBuffer(query, ", synchronous_commit = %s", \
fmtId(subinfo->subsynccommit));  
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 1d21c2906f..69ee939d44 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -659,6 +659,7 @@ typedef struct _SubscriptionInfo
 	char	   *substream;
 	char	   *subtwophasestate;
 	char	   *subdisableonerr;
+	char	   *suborigin;
 	char	   *subsynccommit;
 	char	   *subpublications;
 } SubscriptionInfo;
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 1f08716f69..995cf467f0 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -2465,6 +2465,28 @@ my %tests = (
 		like => { %full_runs, section_post_data => 1, },
 	},
 
+	'CREATE SUBSCRIPTION sub2' => {
+		create_order => 50,
+		create_sql   => 'CREATE SUBSCRIPTION sub2
+						 CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
+						 WITH (connect = false, origin = local);',
+		regexp => qr/^
+			\QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH \
(connect = false, slot_name = 'sub2', origin = local);\E +			/xm,
+		like => { %full_runs, section_post_data => 1, },
+	},
+
+	'CREATE SUBSCRIPTION sub3' => {
+		create_order => 50,
+		create_sql   => 'CREATE SUBSCRIPTION sub3
+						 CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
+						 WITH (connect = false, origin = any);',
+		regexp => qr/^
+			\QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH \
(connect = false, slot_name = 'sub3');\E +			/xm,
+		like => { %full_runs, section_post_data => 1, },
+	},
+
 	'ALTER PUBLICATION pub1 ADD TABLE test_table' => {
 		create_order => 51,
 		create_sql =>
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index d1ae699171..cfae75f30f 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6354,7 +6354,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false, false, false, false, false, false};
+	false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6396,6 +6396,12 @@ describeSubscriptions(const char *pattern, bool verbose)
 							  gettext_noop("Two-phase commit"),
 							  gettext_noop("Disable on error"));
 
+		/* FIXME: 150000 should be changed to 160000 later for PG16 */
+		if (pset.sversion >= 150000)
+			appendPQExpBuffer(&buf,
+							  ", suborigin AS \"%s\"\n",
+							  gettext_noop("Origin"));
+
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
 						  ",  subconninfo AS \"%s\"\n",
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index bd44a1d55d..f7a7cf6e58 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1873,7 +1873,7 @@ psql_completion(const char *text, int start, int end)
 		COMPLETE_WITH("(", "PUBLICATION");
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
-		COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit", \
"disable_on_error"); +		COMPLETE_WITH("binary", "origin", "slot_name", "streaming", \
"synchronous_commit", "disable_on_error");  /* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", \
"("))  COMPLETE_WITH("lsn");
@@ -3152,7 +3152,7 @@ psql_completion(const char *text, int start, int end)
 	/* Complete "CREATE SUBSCRIPTION <name> ...  WITH ( <opt>" */
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
-					  "enabled", "slot_name", "streaming",
+					  "enabled", "origin", "slot_name", "streaming",
 					  "synchronous_commit", "two_phase", "disable_on_error");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
diff --git a/src/include/catalog/pg_subscription.h \
b/src/include/catalog/pg_subscription.h index d1260f590c..3d49c3b9e5 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -31,6 +31,18 @@
 #define LOGICALREP_TWOPHASE_STATE_PENDING 'p'
 #define LOGICALREP_TWOPHASE_STATE_ENABLED 'e'
 
+/*
+ * The subscription will request the publisher to only send changes that
+ * originated locally.
+ */
+#define LOGICALREP_ORIGIN_LOCAL "local"
+
+/*
+ * The subscription will request the publisher to send changes regardless
+ * of their origin.
+ */
+#define LOGICALREP_ORIGIN_ANY "any"
+
 /* ----------------
  *		pg_subscription definition. cpp turns this into
  *		typedef struct FormData_pg_subscription
@@ -87,6 +99,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) \
BKI_SHARED_RELATION BKI_ROW  
 	/* List of publications subscribed to */
 	text		subpublications[1] BKI_FORCE_NOT_NULL;
+
+	/* Only publish data originating from the specified origin */
+	text		suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY);
 #endif
 } FormData_pg_subscription;
 
@@ -118,6 +133,8 @@ typedef struct Subscription
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
 	List	   *publications;	/* List of publication names to subscribe to */
+	char	   *origin;			/* Only publish data originating from the
+								 * specified origin */
 } Subscription;
 
 extern Subscription *GetSubscription(Oid subid, bool missing_ok);
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index eafedd610a..02027550e2 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -29,6 +29,7 @@ typedef struct PGOutputData
 	bool		streaming;
 	bool		messages;
 	bool		two_phase;
+	char	   *origin;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h \
b/src/include/replication/walreceiver.h index 81184aa92f..88d7cc6abc 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -183,6 +183,8 @@ typedef struct
 			bool		streaming;	/* Streaming of large transactions */
 			bool		twophase;	/* Streaming of two-phase transactions at
 									 * prepare time */
+			char	   *origin; /* Only publish data originating from the
+								 * specified origin */
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/test/regress/expected/subscription.out \
b/src/test/regress/expected/subscription.out index 5db7146e06..d46c3f8d6a 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -70,16 +70,38 @@ ALTER SUBSCRIPTION regress_testsub3 ENABLE;
 ERROR:  cannot enable subscription that does not have a slot name
 ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION;
 ERROR:  ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions
+-- fail - origin must be either local or any
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' \
PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = foo); +ERROR:  \
unrecognized origin value: "foo" +-- now it works
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' \
PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = local); \
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... \
REFRESH PUBLICATION to subscribe the tables +\dRs+ regress_testsub4
+                                                                                     \
List of subscriptions +       Name       |           Owner           | Enabled | \
Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | \
Synchronous commit |          Conninfo           | Skip LSN  \
+------------------+---------------------------+---------+-------------+--------+----- \
------+------------------+------------------+--------+--------------------+-----------------------------+----------
 + regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | f  \
| d                | f                | local  | off                | \
dbname=regress_doesnotexist | 0/0 +(1 row)
+
+ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
+\dRs+ regress_testsub4
+                                                                                     \
List of subscriptions +       Name       |           Owner           | Enabled | \
Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | \
Synchronous commit |          Conninfo           | Skip LSN  \
+------------------+---------------------------+---------+-------------+--------+----- \
------+------------------+------------------+--------+--------------------+-----------------------------+----------
 + regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | f  \
| d                | f                | any    | off                | \
dbname=regress_doesnotexist | 0/0 +(1 row)
+
 DROP SUBSCRIPTION regress_testsub3;
+DROP SUBSCRIPTION regress_testsub4;
 -- fail - invalid connection string
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection \
info string  
 \dRs+
-                                                                                    \
                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | \
Streaming | Two-phase commit | Disable on error | Synchronous commit |          \
                Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------------------+-----------------------------+----------
                
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f    \
| d                | f                | off                | \
dbname=regress_doesnotexist | 0/0 +                                                   \
List of subscriptions +      Name       |           Owner           | Enabled | \
Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | \
Synchronous commit |          Conninfo           | Skip LSN  \
+-----------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------+--------------------+-----------------------------+----------
 + regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f   \
| d                | f                | any    | off                | \
dbname=regress_doesnotexist | 0/0  (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh \
= false); @@ -96,10 +118,10 @@ ERROR:  unrecognized subscription parameter: \
                "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                     \
                List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | \
Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit |       \
                Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+------- \
-+-----------+------------------+------------------+--------------------+------------------------------+----------
                
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f     \
| f         | d                | f                | off                | \
dbname=regress_doesnotexist2 | 0/12345 +                                              \
List of subscriptions +      Name       |           Owner           | Enabled |     \
Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | \
Synchronous commit |           Conninfo           | Skip LSN  \
+-----------------+---------------------------+---------+---------------------+------- \
-+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
 + regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f    \
| f         | d                | f                | any    | off                | \
dbname=regress_doesnotexist2 | 0/12345  (1 row)
 
 -- ok - with lsn = NONE
@@ -108,10 +130,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                     \
                List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | \
Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit |       \
                Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+------- \
-+-----------+------------------+------------------+--------------------+------------------------------+----------
                
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f     \
| f         | d                | f                | off                | \
dbname=regress_doesnotexist2 | 0/0 +                                                  \
List of subscriptions +      Name       |           Owner           | Enabled |     \
Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | \
Synchronous commit |           Conninfo           | Skip LSN  \
+-----------------+---------------------------+---------+---------------------+------- \
-+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
 + regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f    \
| f         | d                | f                | any    | off                | \
dbname=regress_doesnotexist2 | 0/0  (1 row)
 
 BEGIN;
@@ -143,10 +165,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit \
= foobar);  ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                     \
                List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | \
Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit |       \
                Conninfo           | Skip LSN 
----------------------+---------------------------+---------+---------------------+--- \
-----+-----------+------------------+------------------+--------------------+------------------------------+----------
                
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f \
| f         | d                | f                | local              | \
dbname=regress_doesnotexist2 | 0/0 +                                                  \
List of subscriptions +        Name         |           Owner           | Enabled |   \
Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | \
Synchronous commit |           Conninfo           | Skip LSN  \
+---------------------+---------------------------+---------+---------------------+--- \
-----+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
 + regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | \
f      | f         | d                | f                | any    | local             \
| dbname=regress_doesnotexist2 | 0/0  (1 row)
 
 -- rename back to keep the rest simple
@@ -179,19 +201,19 @@ ERROR:  binary requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' \
PUBLICATION testpub WITH (connect = false, binary = true);  WARNING:  tables were not \
subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to \
subscribe the tables  \dRs+
-                                                                                    \
                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | \
Streaming | Two-phase commit | Disable on error | Synchronous commit |          \
                Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------------------+-----------------------------+----------
                
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f    \
| d                | f                | off                | \
dbname=regress_doesnotexist | 0/0 +                                                   \
List of subscriptions +      Name       |           Owner           | Enabled | \
Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | \
Synchronous commit |          Conninfo           | Skip LSN  \
+-----------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------+--------------------+-----------------------------+----------
 + regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f   \
| d                | f                | any    | off                | \
dbname=regress_doesnotexist | 0/0  (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                    \
                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | \
Streaming | Two-phase commit | Disable on error | Synchronous commit |          \
                Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------------------+-----------------------------+----------
                
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f    \
| d                | f                | off                | \
dbname=regress_doesnotexist | 0/0 +                                                   \
List of subscriptions +      Name       |           Owner           | Enabled | \
Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | \
Synchronous commit |          Conninfo           | Skip LSN  \
+-----------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------+--------------------+-----------------------------+----------
 + regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f   \
| d                | f                | any    | off                | \
dbname=regress_doesnotexist | 0/0  (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -202,19 +224,19 @@ ERROR:  streaming requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' \
PUBLICATION testpub WITH (connect = false, streaming = true);  WARNING:  tables were \
not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to \
subscribe the tables  \dRs+
-                                                                                    \
                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | \
Streaming | Two-phase commit | Disable on error | Synchronous commit |          \
                Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------------------+-----------------------------+----------
                
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t    \
| d                | f                | off                | \
dbname=regress_doesnotexist | 0/0 +                                                   \
List of subscriptions +      Name       |           Owner           | Enabled | \
Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | \
Synchronous commit |          Conninfo           | Skip LSN  \
+-----------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------+--------------------+-----------------------------+----------
 + regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t   \
| d                | f                | any    | off                | \
dbname=regress_doesnotexist | 0/0  (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                    \
                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | \
Streaming | Two-phase commit | Disable on error | Synchronous commit |          \
                Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------------------+-----------------------------+----------
                
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f    \
| d                | f                | off                | \
dbname=regress_doesnotexist | 0/0 +                                                   \
List of subscriptions +      Name       |           Owner           | Enabled | \
Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | \
Synchronous commit |          Conninfo           | Skip LSN  \
+-----------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------+--------------------+-----------------------------+----------
 + regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f   \
| d                | f                | any    | off                | \
dbname=regress_doesnotexist | 0/0  (1 row)
 
 -- fail - publication already exists
@@ -229,10 +251,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, \
testpub2 WITH (refr  ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, \
testpub2 WITH (refresh = false);  ERROR:  publication "testpub1" is already in \
subscription "regress_testsub"  \dRs+
-                                                                                     \
                List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         \
| Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit |     \
                Conninfo           | Skip LSN 
------------------+---------------------------+---------+----------------------------- \
+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
                
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} \
| f      | f         | d                | f                | off                | \
dbname=regress_doesnotexist | 0/0 +                                                   \
List of subscriptions +      Name       |           Owner           | Enabled |       \
Publication         | Binary | Streaming | Two-phase commit | Disable on error | \
Origin | Synchronous commit |          Conninfo           | Skip LSN  \
+-----------------+---------------------------+---------+----------------------------- \
+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
 + regress_testsub | regress_subscription_user | f       | \
{testpub,testpub1,testpub2} | f      | f         | d                | f               \
| any    | off                | dbname=regress_doesnotexist | 0/0  (1 row)
 
 -- fail - publication used more then once
@@ -247,10 +269,10 @@ ERROR:  publication "testpub3" is not in subscription \
                "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh \
= false);  \dRs+
-                                                                                    \
                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | \
Streaming | Two-phase commit | Disable on error | Synchronous commit |          \
                Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------------------+-----------------------------+----------
                
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f    \
| d                | f                | off                | \
dbname=regress_doesnotexist | 0/0 +                                                   \
List of subscriptions +      Name       |           Owner           | Enabled | \
Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | \
Synchronous commit |          Conninfo           | Skip LSN  \
+-----------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------+--------------------+-----------------------------+----------
 + regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f   \
| d                | f                | any    | off                | \
dbname=regress_doesnotexist | 0/0  (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -284,10 +306,10 @@ ERROR:  two_phase requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' \
PUBLICATION testpub WITH (connect = false, two_phase = true);  WARNING:  tables were \
not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to \
subscribe the tables  \dRs+
-                                                                                    \
                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | \
Streaming | Two-phase commit | Disable on error | Synchronous commit |          \
                Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------------------+-----------------------------+----------
                
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f    \
| p                | f                | off                | \
dbname=regress_doesnotexist | 0/0 +                                                   \
List of subscriptions +      Name       |           Owner           | Enabled | \
Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | \
Synchronous commit |          Conninfo           | Skip LSN  \
+-----------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------+--------------------+-----------------------------+----------
 + regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f   \
| p                | f                | any    | off                | \
dbname=regress_doesnotexist | 0/0  (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -296,10 +318,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                    \
                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | \
Streaming | Two-phase commit | Disable on error | Synchronous commit |          \
                Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------------------+-----------------------------+----------
                
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t    \
| p                | f                | off                | \
dbname=regress_doesnotexist | 0/0 +                                                   \
List of subscriptions +      Name       |           Owner           | Enabled | \
Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | \
Synchronous commit |          Conninfo           | Skip LSN  \
+-----------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------+--------------------+-----------------------------+----------
 + regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t   \
| p                | f                | any    | off                | \
dbname=regress_doesnotexist | 0/0  (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -308,10 +330,10 @@ DROP SUBSCRIPTION regress_testsub;
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' \
PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);  \
WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... \
REFRESH PUBLICATION to subscribe the tables  \dRs+
-                                                                                    \
                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | \
Streaming | Two-phase commit | Disable on error | Synchronous commit |          \
                Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------------------+-----------------------------+----------
                
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t    \
| p                | f                | off                | \
dbname=regress_doesnotexist | 0/0 +                                                   \
List of subscriptions +      Name       |           Owner           | Enabled | \
Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | \
Synchronous commit |          Conninfo           | Skip LSN  \
+-----------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------+--------------------+-----------------------------+----------
 + regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t   \
| p                | f                | any    | off                | \
dbname=regress_doesnotexist | 0/0  (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -323,18 +345,18 @@ ERROR:  disable_on_error requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' \
PUBLICATION testpub WITH (connect = false, disable_on_error = false);  WARNING:  \
tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH \
PUBLICATION to subscribe the tables  \dRs+
-                                                                                    \
                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | \
Streaming | Two-phase commit | Disable on error | Synchronous commit |          \
                Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------------------+-----------------------------+----------
                
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f    \
| d                | f                | off                | \
dbname=regress_doesnotexist | 0/0 +                                                   \
List of subscriptions +      Name       |           Owner           | Enabled | \
Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | \
Synchronous commit |          Conninfo           | Skip LSN  \
+-----------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------+--------------------+-----------------------------+----------
 + regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f   \
| d                | f                | any    | off                | \
dbname=regress_doesnotexist | 0/0  (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                    \
                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | \
Streaming | Two-phase commit | Disable on error | Synchronous commit |          \
                Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------------------+-----------------------------+----------
                
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f    \
| d                | t                | off                | \
dbname=regress_doesnotexist | 0/0 +                                                   \
List of subscriptions +      Name       |           Owner           | Enabled | \
Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | \
Synchronous commit |          Conninfo           | Skip LSN  \
+-----------------+---------------------------+---------+-------------+--------+------ \
-----+------------------+------------------+--------+--------------------+-----------------------------+----------
 + regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f   \
| d                | t                | any    | off                | \
dbname=regress_doesnotexist | 0/0  (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql \
b/src/test/regress/sql/subscription.sql index 74c38ead5d..fff63ce538 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -54,7 +54,17 @@ CREATE SUBSCRIPTION regress_testsub3 CONNECTION \
'dbname=regress_doesnotexist' PU  ALTER SUBSCRIPTION regress_testsub3 ENABLE;
 ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION;
 
+-- fail - origin must be either local or any
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' \
PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = foo); +
+-- now it works
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' \
PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = local); +\dRs+ \
regress_testsub4 +ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
+\dRs+ regress_testsub4
+
 DROP SUBSCRIPTION regress_testsub3;
+DROP SUBSCRIPTION regress_testsub4;
 
 -- fail - invalid connection string
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
diff --git a/src/test/subscription/t/032_origin.pl \
b/src/test/subscription/t/032_origin.pl new file mode 100644
index 0000000000..3a22f4efe8
--- /dev/null
+++ b/src/test/subscription/t/032_origin.pl
@@ -0,0 +1,146 @@
+
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Test the CREATE SUBSCRIPTION 'origin' parameter.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###############################################################################
+# Setup a bidirectional logical replication between node_A & node_B
+###############################################################################
+
+# Initialize nodes
+# node_A
+my $node_A = PostgreSQL::Test::Cluster->new('node_A');
+$node_A->init(allows_streaming => 'logical');
+$node_A->start;
+# node_B
+my $node_B = PostgreSQL::Test::Cluster->new('node_B');
+$node_B->init(allows_streaming => 'logical');
+$node_B->start;
+
+# Create tables on node_A
+$node_A->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Create the same tables on node_B
+$node_B->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Setup logical replication
+# node_A (pub) -> node_B (sub)
+my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
+$node_A->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_A FOR TABLE tab_full");
+my $appname_B1 = 'tap_sub_B1';
+$node_B->safe_psql(
+	'postgres', "
+	CREATE SUBSCRIPTION tap_sub_B1
+	CONNECTION '$node_A_connstr application_name=$appname_B1'
+	PUBLICATION tap_pub_A
+	WITH (origin = local)");
+
+# node_B (pub) -> node_A (sub)
+my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
+$node_B->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_B FOR TABLE tab_full");
+my $appname_A = 'tap_sub_A';
+$node_A->safe_psql(
+	'postgres', "
+	CREATE SUBSCRIPTION tap_sub_A
+	CONNECTION '$node_B_connstr application_name=$appname_A'
+	PUBLICATION tap_pub_B
+	WITH (origin = local, copy_data = off)");
+
+# Wait for subscribers to finish initialization
+$node_A->wait_for_catchup($appname_B1);
+$node_B->wait_for_catchup($appname_A);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', \
's');"; +$node_A->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+$node_B->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+is(1, 1, 'Bidirectional replication setup is complete');
+
+my $result;
+
+###############################################################################
+# Check that bidirectional logical replication setup does not cause infinite
+# recursive insertion.
+###############################################################################
+
+# insert a record
+$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (11);");
+$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (12);");
+
+$node_A->wait_for_catchup($appname_B1);
+$node_B->wait_for_catchup($appname_A);
+
+# check that transaction was committed on subscriber(s)
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;");
+is($result, qq(11
+12), 'Inserted successfully without leading to infinite recursion in bidirectional \
replication setup' +);
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;");
+is($result, qq(11
+12), 'Inserted successfully without leading to infinite recursion in bidirectional \
replication setup' +);
+
+$node_A->safe_psql('postgres', "DELETE FROM tab_full;");
+
+###############################################################################
+# Check that remote data of node_B (that originated from node_C) is not
+# published to node_A.
+###############################################################################
+# Initialize node node_C
+my $node_C = PostgreSQL::Test::Cluster->new('node_C');
+$node_C->init(allows_streaming => 'logical');
+$node_C->start;
+
+$node_C->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Setup logical replication
+# node_C (pub) -> node_B (sub)
+my $node_C_connstr = $node_C->connstr . ' dbname=postgres';
+$node_C->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_C FOR TABLE tab_full");
+
+my $appname_B2 = 'tap_sub_B2';
+$node_B->safe_psql(
+	'postgres', "
+	CREATE SUBSCRIPTION tap_sub_B2
+	CONNECTION '$node_C_connstr application_name=$appname_B2'
+	PUBLICATION tap_pub_C
+	WITH (origin = local)");
+
+$node_C->wait_for_catchup($appname_B2);
+
+$node_C->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# insert a record
+$node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (13);");
+
+$node_C->wait_for_catchup($appname_B2);
+$node_B->wait_for_catchup($appname_A);
+
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;");
+is($result, qq(13), 'The node_C data replicated to node_B'
+);
+
+# check that the data published from node_C to node_B is not sent to node_A
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;");
+is($result, qq(), 'Remote data originating from another node (not the publisher) is \
not replicated when origin parameter is local' +);
+
+# shutdown
+$node_B->stop('fast');
+$node_A->stop('fast');
+$node_C->stop('fast');
+
+done_testing();
-- 
2.32.0


["v26-0003-Check-and-throw-an-error-if-publication-tables-w.patch" (text/x-patch)]

From 01e75588cd4679ef6967fb0f2d14cc660b02ed69 Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignesh21@gmail.com>
Date: Thu, 30 Jun 2022 16:50:09 +0530
Subject: [PATCH v26 3/3] Check and throw an error if publication tables were 
 also subscribing from other publishers and support force value for copy_data 
 parameter.

This patch does a couple of things:
1) Checks and throws an error if 'copy_data = on' and 'origin =
local' but the publication tables were also replicated from other publishers.
2) Adds 'force' value for copy_data parameter.

-------------------------------------------------------------------------------
The steps below help to demonstrate how the new exception is useful:

The initial copy phase has no way to know the origin of the row data,
so if 'copy_data = on' in the step 4 below, then an error will be
thrown to prevent any potentially non-local data from being copied:

e.g.
CREATE SUBSCRIPTION sub_node2_node1 CONNECTION '<node1 details>'
PUBLICATION pub_node1 WITH (copy_data = on, origin = local);
ERROR:  CREATE/ALTER SUBSCRIPTION with origin = local and copy_data = on is
not allowed when the publisher might have replicated data.

-------------------------------------------------------------------------------
The following steps help to demonstrate how the 'copy_data = force'
change will be useful:

Let's take a scenario where the user wants to set up bidirectional
logical replication between node1 and node2 where the same table on
node1 has pre-existing data and node2 has no pre-existing data.

e.g.
node1: Table t1 (c1 int) has data 11, 12, 13, 14
node2: Table t1 (c1 int) has no pre-existing data

The following steps are required in this case:
step 1:
node1=# CREATE PUBLICATION pub_node1 FOR TABLE t1;
CREATE PUBLICATION

step 2:
node2=# CREATE PUBLICATION pub_node2 FOR TABLE t1;
CREATE PUBLICATION

step 3:
node1=# CREATE SUBSCRIPTION sub_node1_node2 CONNECTION '<node2 details>'
node1-# PUBLICATION pub_node2;
CREATE SUBSCRIPTION

step 4:
node2=# CREATE SUBSCRIPTION sub_node2_node1 CONNECTION '<node1 details>'
node2-# PUBLICATION pub_node1;
CREATE SUBSCRIPTION

After the subscription is created on node2, node1 will be synced to
node2 and the newly synced data will be sent to node2. This process of
node1 sending data to node2 and node2 sending data to node1 will repeat
infinitely. If table t1 has a unique key, this will lead to a unique key
violation and replication won't proceed.

This problem can be avoided by using origin and copy_data parameters as given
below:
Step 1 & Step 2 are same as above.

step 3: Create a subscription on node1 to subscribe to node2:
node1=# CREATE SUBSCRIPTION sub_node1_node2 CONNECTION '<node2 details>'
node1-# PUBLICATION pub_node2 WITH (copy_data = off, origin = local);
CREATE SUBSCRIPTION

step 4: Create a subscription on node2 to subscribe to node1. Use
'copy_data = force' when creating a subscription to node1 so that the
existing table data is copied during initial sync:
node2=# CREATE SUBSCRIPTION sub_node2_node1 CONNECTION '<node1 details>'
node2-# PUBLICATION pub_node1 WITH (copy_data = force, origin = local);
CREATE SUBSCRIPTION
---
 doc/src/sgml/ref/alter_subscription.sgml   |  16 +-
 doc/src/sgml/ref/create_subscription.sgml  |  32 +-
 src/backend/catalog/pg_subscription.c      |  53 ++++
 src/backend/commands/subscriptioncmds.c    | 202 +++++++++++-
 src/include/catalog/pg_subscription_rel.h  |   7 +
 src/test/regress/expected/subscription.out |  22 +-
 src/test/regress/sql/subscription.sql      |  14 +
 src/test/subscription/t/032_origin.pl      | 337 ++++++++++++++++++---
 src/tools/pgindent/typedefs.list           |   2 +
 9 files changed, 625 insertions(+), 60 deletions(-)

diff --git a/doc/src/sgml/ref/alter_subscription.sgml \
b/doc/src/sgml/ref/alter_subscription.sgml index 64efc21f53..04e526fb80 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -161,12 +161,22 @@ ALTER SUBSCRIPTION <replaceable \
class="parameter">name</replaceable> RENAME TO <  
       <variablelist>
        <varlistentry>
-        <term><literal>copy_data</literal> (<type>boolean</type>)</term>
+        <term><literal>copy_data</literal> (<type>enum</type>)</term>
         <listitem>
          <para>
           Specifies whether to copy pre-existing data in the publications
-          that are being subscribed to when the replication starts.
-          The default is <literal>true</literal>.
+          that are being subscribed to when the replication starts. This
+          parameter may be either <literal>true</literal>,
+          <literal>false</literal> or <literal>force</literal>. The default is
+          <literal>true</literal>.
+         </para>
+         <para>
+          There is some interaction between the <literal>origin</literal>
+          parameter and the <literal>copy_data</literal> parameter. Refer to
+          the <command>CREATE SUBSCRIPTION</command>
+          <xref linkend="sql-createsubscription-notes" /> for interaction
+          details and usage of <literal>force</literal> for
+          <literal>copy_data</literal> parameter.
          </para>
          <para>
           Previously subscribed tables are not copied, even if a table's row
diff --git a/doc/src/sgml/ref/create_subscription.sgml \
b/doc/src/sgml/ref/create_subscription.sgml index b0a6ebcb7d..f0bc2ba63d 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -115,7 +115,8 @@ CREATE SUBSCRIPTION <replaceable \
class="parameter">subscription_name</replaceabl  (You cannot combine setting \
<literal>connect</literal>  to <literal>false</literal> with
           setting <literal>create_slot</literal>, <literal>enabled</literal>,
-          or <literal>copy_data</literal> to <literal>true</literal>.)
+          or <literal>copy_data</literal> to
+          <literal>true</literal>/<literal>force</literal>.)
          </para>
 
          <para>
@@ -201,18 +202,27 @@ CREATE SUBSCRIPTION <replaceable \
class="parameter">subscription_name</replaceabl  </varlistentry>
 
        <varlistentry>
-        <term><literal>copy_data</literal> (<type>boolean</type>)</term>
+        <term><literal>copy_data</literal> (<type>enum</type>)</term>
         <listitem>
          <para>
           Specifies whether to copy pre-existing data in the publications
-          that are being subscribed to when the replication starts.
-          The default is <literal>true</literal>.
+          that are being subscribed to when the replication starts. This
+          parameter may be either <literal>true</literal>,
+          <literal>false</literal> or <literal>force</literal>. The default is
+          <literal>true</literal>.
          </para>
          <para>
           If the publications contain <literal>WHERE</literal> clauses, it
           will affect what data is copied. Refer to the
           <xref linkend="sql-createsubscription-notes" /> for details.
          </para>
+         <para>
+          There is some interaction between the <literal>origin</literal>
+          parameter and the <literal>copy_data</literal> parameter. Refer to
+          the <xref linkend="sql-createsubscription-notes" /> for interaction
+          details and usage of <literal>force</literal> for
+          <literal>copy_data</literal> parameter.
+         </para>
         </listitem>
        </varlistentry>
 
@@ -316,6 +326,11 @@ CREATE SUBSCRIPTION <replaceable \
                class="parameter">subscription_name</replaceabl
           publisher sends changes regardless of their origin. The default is
           <literal>any</literal>.
          </para>
+         <para>
+          There is some interaction between the <literal>origin</literal>
+          parameter and the <literal>copy_data</literal> parameter. Refer to
+          the <xref linkend="sql-createsubscription-notes" /> for details.
+         </para>
         </listitem>
        </varlistentry>
       </variablelist></para>
@@ -387,6 +402,15 @@ CREATE SUBSCRIPTION <replaceable \
class="parameter">subscription_name</replaceabl  can have non-existent publications.
   </para>
 
+  <para>
+   If the subscription is created with <literal>origin = local</literal> and
+   <literal>copy_data = true</literal>, it will check if the publisher has
+   subscribed to the same table from other publishers and, if so, throw an
+   error to prevent possible non-local data from being copied. The user can
+   override this check and continue with the copy operation by specifying
+   <literal>copy_data = force</literal>.
+  </para>
+
  </refsect1>
 
  <refsect1>
diff --git a/src/backend/catalog/pg_subscription.c \
b/src/backend/catalog/pg_subscription.c index 8ea58b30ba..f42710f7d2 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -645,3 +645,56 @@ GetSubscriptionNotReadyRelations(Oid subid)
 
 	return res;
 }
+
+/*
+ * Get all relations for subscription that are in a ready state.
+ *
+ * Returned list is palloc'ed in current memory context.
+ */
+List *
+GetSubscriptionReadyRelations(Oid subid)
+{
+	List	   *res = NIL;
+	Relation	rel;
+	HeapTuple	tup;
+	int			nkeys = 0;
+	ScanKeyData skey[2];
+	SysScanDesc scan;
+
+	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+	ScanKeyInit(&skey[nkeys++],
+				Anum_pg_subscription_rel_srsubid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(subid));
+
+	ScanKeyInit(&skey[nkeys++],
+				Anum_pg_subscription_rel_srsubstate,
+				BTEqualStrategyNumber, F_CHAREQ,
+				CharGetDatum(SUBREL_STATE_READY));
+
+	scan = systable_beginscan(rel, InvalidOid, false,
+							  NULL, nkeys, skey);
+
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_subscription_rel subrel;
+		SubscriptionRel *subtbl;
+		Oid				relid;
+
+		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+
+		subtbl = (SubscriptionRel *) palloc(sizeof(SubscriptionRel));
+		relid = subrel->srrelid;
+		subtbl->relname = get_rel_name(relid);
+		subtbl->nspname = get_namespace_name(get_rel_namespace(relid));
+
+		res = lappend(res, subtbl);
+	}
+
+	/* Cleanup */
+	systable_endscan(scan);
+	table_close(rel, AccessShareLock);
+
+	return res;
+}
diff --git a/src/backend/commands/subscriptioncmds.c \
b/src/backend/commands/subscriptioncmds.c index 3070ebc7e7..ba5096d8e2 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -69,6 +69,16 @@
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
 
+/*
+ * Represents whether copy_data parameter is specified with off, on or force.
+ */
+typedef enum CopyData
+{
+	COPY_DATA_OFF = 0,
+	COPY_DATA_ON,
+	COPY_DATA_FORCE
+} CopyData;
+
 /*
  * Structure to hold a bitmap representing the user-provided CREATE/ALTER
  * SUBSCRIPTION command options and the parsed/default values of each of them.
@@ -81,7 +91,7 @@ typedef struct SubOpts
 	bool		connect;
 	bool		enabled;
 	bool		create_slot;
-	bool		copy_data;
+	CopyData	copy_data;
 	bool		refresh;
 	bool		binary;
 	bool		streaming;
@@ -92,10 +102,67 @@ typedef struct SubOpts
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static void check_pub_table_subscribed(WalReceiverConn *wrconn,
+									   List *publications, CopyData copydata,
+									   char *origin, Oid subid);
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, \
const char *subname);  static void ReportSlotConnectionError(List *rstates, Oid \
subid, char *slotname, char *err);  
+/*
+ * Validate the value specified for copy_data parameter.
+ */
+static CopyData
+DefGetCopyData(DefElem *def)
+{
+	/*
+	 * If no parameter given, assume "true" is meant.
+	 */
+	if (def->arg == NULL)
+		return COPY_DATA_ON;
+
+	/*
+	 * Allow 0, 1, "true", "false", "on", "off" or "force".
+	 */
+	switch (nodeTag(def->arg))
+	{
+		case T_Integer:
+			switch (intVal(def->arg))
+			{
+				case 0:
+					return COPY_DATA_OFF;
+				case 1:
+					return COPY_DATA_ON;
+				default:
+					/* otherwise, error out below */
+					break;
+			}
+			break;
+		default:
+			{
+				char	   *sval = defGetString(def);
+
+				/*
+				 * The set of strings accepted here should match up with the
+				 * grammar's opt_boolean_or_string production.
+				 */
+				if (pg_strcasecmp(sval, "false") == 0 ||
+					pg_strcasecmp(sval, "off") == 0)
+					return COPY_DATA_OFF;
+				if (pg_strcasecmp(sval, "true") == 0 ||
+					pg_strcasecmp(sval, "on") == 0)
+					return COPY_DATA_ON;
+				if (pg_strcasecmp(sval, "force") == 0)
+					return COPY_DATA_FORCE;
+			}
+			break;
+	}
+
+	ereport(ERROR,
+			errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+			errmsg("%s requires a boolean or \"force\"", def->defname));
+	return COPY_DATA_OFF;		/* keep compiler quiet */
+}
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -128,7 +195,7 @@ parse_subscription_options(ParseState *pstate, List \
*stmt_options,  if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
 		opts->create_slot = true;
 	if (IsSet(supported_opts, SUBOPT_COPY_DATA))
-		opts->copy_data = true;
+		opts->copy_data = COPY_DATA_ON;
 	if (IsSet(supported_opts, SUBOPT_REFRESH))
 		opts->refresh = true;
 	if (IsSet(supported_opts, SUBOPT_BINARY))
@@ -196,7 +263,7 @@ parse_subscription_options(ParseState *pstate, List \
*stmt_options,  errorConflictingDefElem(defel, pstate);
 
 			opts->specified_opts |= SUBOPT_COPY_DATA;
-			opts->copy_data = defGetBoolean(defel);
+			opts->copy_data = DefGetCopyData(defel);
 		}
 		else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
 				 strcmp(defel->defname, "synchronous_commit") == 0)
@@ -352,12 +419,12 @@ parse_subscription_options(ParseState *pstate, List \
*stmt_options,  ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 					 errmsg("%s and %s are mutually exclusive options",
-							"connect = false", "copy_data = true")));
+							"connect = false", "copy_data = true/force")));
 
 		/* Change the defaults of other options. */
 		opts->enabled = false;
 		opts->create_slot = false;
-		opts->copy_data = false;
+		opts->copy_data = COPY_DATA_OFF;
 	}
 
 	/*
@@ -681,6 +748,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt \
*stmt,  PG_TRY();
 		{
 			check_publications(wrconn, publications);
+			check_pub_table_subscribed(wrconn, publications, opts.copy_data,
+									   opts.origin, InvalidOid);
 
 			/*
 			 * Set sync state based on if we were asked to do data copy or
@@ -776,7 +845,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt \
*stmt,  }
 
 static void
-AlterSubscription_refresh(Subscription *sub, bool copy_data,
+AlterSubscription_refresh(Subscription *sub, CopyData copy_data,
 						  List *validate_publications)
 {
 	char	   *err;
@@ -811,6 +880,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 		if (validate_publications)
 			check_publications(wrconn, validate_publications);
 
+		check_pub_table_subscribed(wrconn, sub->publications, copy_data,
+								   sub->origin, sub->oid);
+
 		/* Get the table list from publisher. */
 		pubrel_names = fetch_table_list(wrconn, sub->publications);
 
@@ -1788,6 +1860,124 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
 	table_close(rel, RowExclusiveLock);
 }
 
+/*
+ * Check and throw an error if the publisher has subscribed to the same table
+ * from some other publisher. This check is required only if copydata is ON and
+ * the origin is local.
+ */
+static void
+check_pub_table_subscribed(WalReceiverConn *wrconn, List *publications,
+						   CopyData copydata, char *origin, Oid subid)
+{
+	WalRcvExecResult *res;
+	StringInfoData cmd;
+	TupleTableSlot *slot;
+	Oid			tableRow[2] = {TEXTOID, TEXTOID};
+	List	   *subreadyrels = NIL;
+	ListCell   *lc;
+
+	if (copydata != COPY_DATA_ON || !origin || (strcmp(origin, "local") != 0))
+		return;
+
+	initStringInfo(&cmd);
+	appendStringInfoString(&cmd,
+						   "SELECT DISTINCT N.nspname AS schemaname,\n"
+						   "				C.relname AS tablename\n"
+						   "FROM pg_publication P,\n"
+						   "	 LATERAL pg_get_publication_tables(P.pubname) GPT\n"
+						   "	 LEFT JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
+						   "	 pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
+						   "WHERE C.oid = GPT.relid AND PS.srrelid IS NOT NULL AND P.pubname IN (");
+	get_publications_str(publications, &cmd, true);
+	appendStringInfoChar(&cmd, ')');
+
+	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+	pfree(cmd.data);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_FAILURE),
+				 errmsg("could not receive list of replicated tables from the publisher: %s",
+						res->err)));
+
+	/*
+	 * The subid will be valid only for ALTER SUBSCRIPTION ... REFRESH
+	 * PUBLICATION. Get the ready relations for the subscription only in case
+	 * of ALTER SUBSCRIPTION case as there will be no relations in ready state
+	 * while the subscription is created.
+	 */
+	if (subid != InvalidOid)
+		subreadyrels = GetSubscriptionReadyRelations(subid);
+
+	/* Process tables. */
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		char	   *nspname;
+		char	   *relname;
+		bool		isnull;
+		bool		isreadytable = false;
+
+		nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+		relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+		Assert(!isnull);
+
+		foreach(lc, subreadyrels)
+		{
+			SubscriptionRel *subrel = (SubscriptionRel *) lfirst(lc);
+
+			if ((strcmp(nspname, subrel->nspname) == 0) &&
+				(strcmp(relname, subrel->relname) == 0))
+			{
+				isreadytable = true;
+				break;
+			}
+		}
+
+		/*
+		 * No need to throw an error for the tables that are in ready state,
+		 * as the walsender will send the changes from WAL in case of tables
+		 * in ready state.
+		 */
+		if (isreadytable)
+			continue;
+
+		/*
+		 * Throw an error if the publisher has subscribed to the same table
+		 * from some other publisher. We cannot differentiate between the
+		 * local and non-local data that is present in the HEAP during the
+		 * initial sync. Identification of local data can be done only from
+		 * the WAL by using the origin id.
+		 *
+		 * XXX: For simplicity, we don't check whether the table has any data
+		 * or not. If the table doesn't have any data then we don't need to
+		 * distinguish between local and non-local data so we can avoid
+		 * throwing error in that case.
+		 */
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("table: \"%s.%s\" might have replicated data in the publisher",
+					   nspname, relname),
+				errdetail("CREATE/ALTER SUBSCRIPTION with origin = local and copy_data = on is \
not allowed when the publisher might have replicated data."), +				errhint("Use \
CREATE/ALTER SUBSCRIPTION with copy_data = off/force.")); +
+		ExecClearTuple(slot);
+	}
+
+	foreach(lc, subreadyrels)
+	{
+		SubscriptionRel *subrel = (SubscriptionRel *) lfirst(lc);
+		pfree(subrel->nspname);
+		pfree(subrel->relname);
+		pfree(subrel);
+	}
+
+	ExecDropSingleTupleTableSlot(slot);
+
+	walrcv_clear_result(res);
+}
+
 /*
  * Get the list of tables which belong to specified publications on the
  * publisher connection.
diff --git a/src/include/catalog/pg_subscription_rel.h \
b/src/include/catalog/pg_subscription_rel.h index 9df99c3418..6c2e208bf7 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -80,6 +80,12 @@ typedef struct SubscriptionRelState
 	char		state;
 } SubscriptionRelState;
 
+typedef struct SubscriptionRel
+{
+	char	   *relname;
+	char	   *nspname;
+} SubscriptionRel;
+
 extern void AddSubscriptionRelState(Oid subid, Oid relid, char state,
 									XLogRecPtr sublsn);
 extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
@@ -90,5 +96,6 @@ extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 extern bool HasSubscriptionRelations(Oid subid);
 extern List *GetSubscriptionRelations(Oid subid);
 extern List *GetSubscriptionNotReadyRelations(Oid subid);
+extern List *GetSubscriptionReadyRelations(Oid subid);
 
 #endif							/* PG_SUBSCRIPTION_REL_H */
diff --git a/src/test/regress/expected/subscription.out \
b/src/test/regress/expected/subscription.out index d46c3f8d6a..49c87240a6 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -46,8 +46,18 @@ CREATE SUBSCRIPTION regress_testsub2 CONNECTION \
'dbname=regress_doesnotexist' PU  ERROR:  must be superuser to create subscriptions
 SET SESSION AUTHORIZATION 'regress_subscription_user';
 -- fail - invalid option combinations
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' \
PUBLICATION testpub WITH (connect = false, copy_data); +ERROR:  connect = false and \
copy_data = true/force are mutually exclusive options  CREATE SUBSCRIPTION \
regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH \
                (connect = false, copy_data = true);
-ERROR:  connect = false and copy_data = true are mutually exclusive options
+ERROR:  connect = false and copy_data = true/force are mutually exclusive options
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' \
PUBLICATION testpub WITH (connect = false, copy_data = on); +ERROR:  connect = false \
and copy_data = true/force are mutually exclusive options +CREATE SUBSCRIPTION \
regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH \
(connect = false, copy_data = 1); +ERROR:  connect = false and copy_data = true/force \
are mutually exclusive options +CREATE SUBSCRIPTION regress_testsub2 CONNECTION \
'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = \
force); +ERROR:  connect = false and copy_data = true/force are mutually exclusive \
options +CREATE SUBSCRIPTION regress_testsub2 CONNECTION \
'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = \
2); +ERROR:  copy_data requires a boolean or "force"
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' \
PUBLICATION testpub WITH (connect = false, enabled = true);  ERROR:  connect = false \
and enabled = true are mutually exclusive options  CREATE SUBSCRIPTION \
regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH \
(connect = false, create_slot = true); @@ -93,6 +103,16 @@ ALTER SUBSCRIPTION \
regress_testsub4 SET (origin = any);  
 DROP SUBSCRIPTION regress_testsub3;
 DROP SUBSCRIPTION regress_testsub4;
+-- ok - valid copy_data options
+CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' \
PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = false); \
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... \
REFRESH PUBLICATION to subscribe the tables +CREATE SUBSCRIPTION regress_testsub4 \
CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, \
connect = false, copy_data = off); +WARNING:  tables were not subscribed, you will \
have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \
+CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'dbname=regress_doesnotexist' \
PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = 0); \
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... \
REFRESH PUBLICATION to subscribe the tables +DROP SUBSCRIPTION regress_testsub3;
+DROP SUBSCRIPTION regress_testsub4;
+DROP SUBSCRIPTION regress_testsub5;
 -- fail - invalid connection string
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection \
                info string
diff --git a/src/test/regress/sql/subscription.sql \
b/src/test/regress/sql/subscription.sql index fff63ce538..597d56f561 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -39,7 +39,12 @@ CREATE SUBSCRIPTION regress_testsub2 CONNECTION \
'dbname=regress_doesnotexist' PU  SET SESSION AUTHORIZATION \
'regress_subscription_user';  
 -- fail - invalid option combinations
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' \
PUBLICATION testpub WITH (connect = false, copy_data);  CREATE SUBSCRIPTION \
regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH \
(connect = false, copy_data = true); +CREATE SUBSCRIPTION regress_testsub2 CONNECTION \
'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = \
on); +CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' \
PUBLICATION testpub WITH (connect = false, copy_data = 1); +CREATE SUBSCRIPTION \
regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH \
(connect = false, copy_data = force); +CREATE SUBSCRIPTION regress_testsub2 \
CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, \
copy_data = 2);  CREATE SUBSCRIPTION regress_testsub2 CONNECTION \
'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, enabled = \
true);  CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' \
PUBLICATION testpub WITH (connect = false, create_slot = true);  CREATE SUBSCRIPTION \
regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH \
(slot_name = NONE, enabled = true); @@ -66,6 +71,15 @@ ALTER SUBSCRIPTION \
regress_testsub4 SET (origin = any);  DROP SUBSCRIPTION regress_testsub3;
 DROP SUBSCRIPTION regress_testsub4;
 
+-- ok - valid copy_data options
+CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' \
PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = false); \
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' \
PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = off); \
+CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'dbname=regress_doesnotexist' \
PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = 0); +
+DROP SUBSCRIPTION regress_testsub3;
+DROP SUBSCRIPTION regress_testsub4;
+DROP SUBSCRIPTION regress_testsub5;
+
 -- fail - invalid connection string
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 
diff --git a/src/test/subscription/t/032_origin.pl \
b/src/test/subscription/t/032_origin.pl index 3a22f4efe8..70a23d5775 100644
--- a/src/test/subscription/t/032_origin.pl
+++ b/src/test/subscription/t/032_origin.pl
@@ -1,13 +1,124 @@
 
 # Copyright (c) 2021-2022, PostgreSQL Global Development Group
 
-# Test the CREATE SUBSCRIPTION 'origin' parameter.
+# Test the CREATE SUBSCRIPTION 'origin' parameter and its interaction with
+# 'copy_data' parameter.
 use strict;
 use warnings;
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
+my $result;
+my $stdout;
+my $stderr;
+
+my $subname_AB = 'tap_sub_A_B';
+my $subname_AC = 'tap_sub_A_C';
+my $subname_BA = 'tap_sub_B_A';
+my $subname_BC = 'tap_sub_B_C';
+my $subname_CA = 'tap_sub_C_A';
+my $subname_CB = 'tap_sub_C_B';
+
+# Detach node_C from the node-group of (node_A, node_B, node_C) and clean the
+# table contents from all nodes.
+sub detach_node_clean_table_data
+{
+	my ($node_A, $node_B, $node_C) = @_;
+	$node_A->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_A_C");
+	$node_B->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B_C");
+	$node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C_A");
+	$node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C_B");
+
+	$result =
+	  $node_A->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+	is($result, qq(1), 'check subscription was dropped on subscriber');
+
+	$result =
+	  $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+	is($result, qq(1), 'check subscription was dropped on subscriber');
+
+	$result =
+	  $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+	is($result, qq(0), 'check subscription was dropped on subscriber');
+
+	$result = $node_A->safe_psql('postgres',
+		"SELECT count(*) FROM pg_replication_slots");
+	is($result, qq(1), 'check replication slot was dropped on publisher');
+
+	$result = $node_B->safe_psql('postgres',
+		"SELECT count(*) FROM pg_replication_slots");
+	is($result, qq(1), 'check replication slot was dropped on publisher');
+
+	$result = $node_C->safe_psql('postgres',
+		"SELECT count(*) FROM pg_replication_slots");
+	is($result, qq(0), 'check replication slot was dropped on publisher');
+
+	$node_A->safe_psql('postgres', "TRUNCATE tab_full");
+	$node_B->safe_psql('postgres', "TRUNCATE tab_full");
+	$node_C->safe_psql('postgres', "TRUNCATE tab_full");
+}
+
+# Subroutine to verify the data is replicated successfully.
+sub verify_data
+{
+	my ($node_A, $node_B, $node_C, $expect) = @_;
+
+	$node_A->wait_for_catchup($subname_BA);
+	$node_A->wait_for_catchup($subname_CA);
+	$node_B->wait_for_catchup($subname_AB);
+	$node_B->wait_for_catchup($subname_CB);
+	$node_C->wait_for_catchup($subname_AC);
+	$node_C->wait_for_catchup($subname_BC);
+
+	# check that data is replicated to all the nodes
+	$result =
+	  $node_A->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;");
+	is($result, qq($expect),
+	   'Data is replicated as expected'
+	);
+
+	$result =
+	  $node_B->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;");
+	is($result, qq($expect),
+	   'Data is replicated as expected'
+	);
+
+	$result =
+	  $node_C->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;");
+	is($result, qq($expect),
+	   'Data is replicated as expected'
+	);
+}
+
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', \
's');"; +
+# Subroutine to create subscription and wait until the initial sync is
+# completed. Subroutine expects subscriber node, publisher node, subscription
+# name, destination connection string, publication name and the subscription
+# parameters to be passed as input parameters.
+sub create_subscription
+{
+	my ($node_subscriber, $node_publisher, $sub_name, $node_connstr,
+		$pub_name, $sub_params)
+	  = @_;
+
+	# Application_name is always assigned the same value as the subscription
+	# name.
+	$node_subscriber->safe_psql(
+		'postgres', "
+                CREATE SUBSCRIPTION $sub_name
+                CONNECTION '$node_connstr application_name=$sub_name'
+                PUBLICATION $pub_name
+                WITH ($sub_params)");
+	$node_publisher->wait_for_catchup($sub_name);
+
+	# also wait for initial table sync to finish
+	$node_subscriber->poll_query_until('postgres', $synced_query)
+	  or die "Timed out while waiting for subscriber to synchronize data";
+}
+
 ###############################################################################
 # Setup a bidirectional logical replication between node_A & node_B
 ###############################################################################
@@ -33,42 +144,18 @@ $node_B->safe_psql('postgres', "CREATE TABLE tab_full (a int \
PRIMARY KEY)");  my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
 $node_A->safe_psql('postgres',
 	"CREATE PUBLICATION tap_pub_A FOR TABLE tab_full");
-my $appname_B1 = 'tap_sub_B1';
-$node_B->safe_psql(
-	'postgres', "
-	CREATE SUBSCRIPTION tap_sub_B1
-	CONNECTION '$node_A_connstr application_name=$appname_B1'
-	PUBLICATION tap_pub_A
-	WITH (origin = local)");
+create_subscription($node_B, $node_A, $subname_BA, $node_A_connstr,
+	'tap_pub_A', 'copy_data = on, origin = local');
 
 # node_B (pub) -> node_A (sub)
 my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
 $node_B->safe_psql('postgres',
 	"CREATE PUBLICATION tap_pub_B FOR TABLE tab_full");
-my $appname_A = 'tap_sub_A';
-$node_A->safe_psql(
-	'postgres', "
-	CREATE SUBSCRIPTION tap_sub_A
-	CONNECTION '$node_B_connstr application_name=$appname_A'
-	PUBLICATION tap_pub_B
-	WITH (origin = local, copy_data = off)");
-
-# Wait for subscribers to finish initialization
-$node_A->wait_for_catchup($appname_B1);
-$node_B->wait_for_catchup($appname_A);
-
-# Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', \
                's');";
-$node_A->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
-$node_B->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+create_subscription($node_A, $node_B, $subname_AB, $node_B_connstr,
+	'tap_pub_B', 'copy_data = off, origin = local');
 
 is(1, 1, 'Bidirectional replication setup is complete');
 
-my $result;
-
 ###############################################################################
 # Check that bidirectional logical replication setup does not cause infinite
 # recursive insertion.
@@ -78,8 +165,8 @@ my $result;
 $node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (11);");
 $node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (12);");
 
-$node_A->wait_for_catchup($appname_B1);
-$node_B->wait_for_catchup($appname_A);
+$node_A->wait_for_catchup($subname_BA);
+$node_B->wait_for_catchup($subname_AB);
 
 # check that transaction was committed on subscriber(s)
 $result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;");
@@ -109,25 +196,14 @@ $node_C->safe_psql('postgres', "CREATE TABLE tab_full (a int \
PRIMARY KEY)");  my $node_C_connstr = $node_C->connstr . ' dbname=postgres';
 $node_C->safe_psql('postgres',
 	"CREATE PUBLICATION tap_pub_C FOR TABLE tab_full");
-
-my $appname_B2 = 'tap_sub_B2';
-$node_B->safe_psql(
-	'postgres', "
-	CREATE SUBSCRIPTION tap_sub_B2
-	CONNECTION '$node_C_connstr application_name=$appname_B2'
-	PUBLICATION tap_pub_C
-	WITH (origin = local)");
-
-$node_C->wait_for_catchup($appname_B2);
-
-$node_C->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+create_subscription($node_B, $node_C, $subname_BC, $node_C_connstr,
+	'tap_pub_C', 'copy_data = on, origin = local');
 
 # insert a record
 $node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (13);");
 
-$node_C->wait_for_catchup($appname_B2);
-$node_B->wait_for_catchup($appname_A);
+$node_C->wait_for_catchup($subname_BC);
+$node_B->wait_for_catchup($subname_AB);
 
 $result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;");
 is($result, qq(13), 'The node_C data replicated to node_B'
@@ -138,6 +214,175 @@ $result = $node_A->safe_psql('postgres', "SELECT * FROM \
tab_full ORDER BY 1;");  is($result, qq(), 'Remote data originating from another node \
(not the publisher) is not replicated when origin parameter is local'  );
 
+# clear the operations done by this test
+$node_B->safe_psql(
+       'postgres', "
+        DROP SUBSCRIPTION $subname_BC");
+$node_C->safe_psql(
+	    'postgres', "
+        DELETE FROM tab_full");
+$node_B->safe_psql(
+	    'postgres', "
+        DELETE FROM tab_full where a = 13");
+
+###############################################################################
+# Specify origin as 'local' which indicates that the publisher should only
+# replicate the changes that are generated locally from node_B, but in
+# this case since the node_B is also subscribing data from node_A, node_B can
+# have remotely originated data from node_A. We throw an error, in this case,
+# to draw attention to there being possible remote data.
+###############################################################################
+($result, $stdout, $stderr) = $node_A->psql(
+       'postgres', "
+        CREATE SUBSCRIPTION tap_sub_A2
+        CONNECTION '$node_B_connstr application_name=$subname_AB'
+        PUBLICATION tap_pub_B
+        WITH (origin = local, copy_data = on)");
+like(
+       $stderr,
+       qr/ERROR: ( [A-Z0-9]+:)? table: "public.tab_full" might have replicated data \
in the publisher/, +       "Create subscription with origin and copy_data having \
replicated table in publisher" +);
+
+# Creating subscription with origin as local and copy_data as force should be
+# successful when the publisher has replicated data
+$node_A->safe_psql(
+       'postgres', "
+        CREATE SUBSCRIPTION tap_sub_A2
+        CONNECTION '$node_B_connstr application_name=$subname_AC'
+        PUBLICATION tap_pub_B
+        WITH (origin = local, copy_data = force)");
+
+$node_B->wait_for_catchup($subname_AC);
+
+# also wait for initial table sync to finish
+$node_A->poll_query_until('postgres', $synced_query)
+	or die "Timed out while waiting for subscriber to synchronize data";
+
+# Alter subscription ... refresh publication should be successful in this case
+$node_A->safe_psql(
+       'postgres', "
+        ALTER SUBSCRIPTION tap_sub_A2 REFRESH PUBLICATION");
+
+$node_A->safe_psql(
+       'postgres', "
+        DROP SUBSCRIPTION tap_sub_A2");
+
+###############################################################################
+# Join 3rd node (node_C) to the existing 2 nodes(node_A & node_B) bidirectional
+# replication setup when the existing nodes (node_A & node_B) has pre-existing
+# data and the new node (node_C) does not have any data.
+###############################################################################
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;");
+is( $result, qq(), 'Check existing data');
+
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;");
+is( $result, qq(), 'Check existing data');
+
+$result =
+	$node_C->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;");
+is( $result, qq(), 'Check existing data');
+
+create_subscription($node_A, $node_C, $subname_AC, $node_C_connstr,
+       'tap_pub_C', 'copy_data = off, origin = local');
+create_subscription($node_B, $node_C, $subname_BC, $node_C_connstr,
+       'tap_pub_C', 'copy_data = off, origin = local');
+create_subscription($node_C, $node_A, $subname_CA, $node_A_connstr,
+       'tap_pub_A', 'copy_data = force, origin = local');
+create_subscription($node_C, $node_B, $subname_CB, $node_B_connstr,
+       'tap_pub_B', 'copy_data = off, origin = local');
+
+# insert some data in all the nodes
+$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (11);");
+$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (21);");
+$node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (31);");
+
+verify_data($node_A, $node_B, $node_C, '11
+21
+31');
+
+detach_node_clean_table_data($node_A, $node_B, $node_C);
+
+###############################################################################
+# Join 3rd node (node_C) to the existing 2 nodes(node_A & node_B) bidirectional
+# replication setup when the existing nodes (node_A & node_B) and the new node
+# (node_C) does not have any data.
+###############################################################################
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;");
+is( $result, qq(), 'Check existing data');
+
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;");
+is( $result, qq(), 'Check existing data');
+
+$result = $node_C->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;");
+is( $result, qq(), 'Check existing data');
+
+create_subscription($node_A, $node_C, $subname_AC, $node_C_connstr,
+       'tap_pub_C', 'copy_data = off, origin = local');
+create_subscription($node_B, $node_C, $subname_BC, $node_C_connstr,
+       'tap_pub_C', 'copy_data = off, origin = local');
+create_subscription($node_C, $node_A, $subname_CA, $node_A_connstr,
+       'tap_pub_A', 'copy_data = off, origin = local');
+create_subscription($node_C, $node_B, $subname_CB, $node_B_connstr,
+       'tap_pub_B', 'copy_data = off, origin = local');
+
+# insert some data in all the nodes
+$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (11);");
+$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (21);");
+$node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (31);");
+
+verify_data($node_A, $node_B, $node_C, '11
+21
+31');
+
+detach_node_clean_table_data($node_A, $node_B, $node_C);
+
+###############################################################################
+# Join 3rd node (node_C) to the existing 2 nodes(node_A & node_B) bidirectional
+# replication setup when the existing nodes (node_A & node_B) has no data and
+# the new node (node_C) some pre-existing data.
+###############################################################################
+$node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (31);");
+
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;");
+is( $result, qq(), 'Check existing data');
+
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;");
+is( $result, qq(), 'Check existing data');
+
+$result = $node_C->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;");
+is($result, qq(31), 'Check existing data');
+
+create_subscription($node_A, $node_C, $subname_AC, $node_C_connstr,
+       'tap_pub_C', 'copy_data = on, origin = local');
+create_subscription($node_B, $node_C, $subname_BC, $node_C_connstr,
+       'tap_pub_C', 'copy_data = on, origin = local');
+
+$node_C->safe_psql('postgres',
+       "ALTER PUBLICATION tap_pub_C SET (publish='insert,update,delete');");
+
+$node_C->safe_psql('postgres', "TRUNCATE tab_full");
+
+# include truncates now
+$node_C->safe_psql('postgres',
+       "ALTER PUBLICATION tap_pub_C SET (publish='insert,update,delete,truncate');"
+);
+
+create_subscription($node_C, $node_A, $subname_CA, $node_A_connstr,
+       'tap_pub_A', 'copy_data = force, origin = local');
+create_subscription($node_C, $node_B, $subname_CB, $node_B_connstr,
+       'tap_pub_B', 'copy_data = off, origin = local');
+
+# insert some data in all the nodes
+$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (12);");
+$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (22);");
+$node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (32);");
+
+verify_data($node_A, $node_B, $node_C, '12
+22
+31
+32');
+
 # shutdown
 $node_B->stop('fast');
 $node_A->stop('fast');
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 4fb746930a..49ddbae4ed 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -456,6 +456,7 @@ ConvProcInfo
 ConversionLocation
 ConvertRowtypeExpr
 CookedConstraint
+CopyData
 CopyDest
 CopyFormatOptions
 CopyFromState
@@ -2638,6 +2639,7 @@ SubscriptingRef
 SubscriptingRefState
 Subscription
 SubscriptionInfo
+SubscriptionRel
 SubscriptionRelState
 SupportRequestCost
 SupportRequestIndexCondition
-- 
2.32.0



[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic