[prev in list] [next in list] [prev in thread] [next in thread]
List: slony1-commit
Subject: [Slony1-commit] By wieck: A little attempt to make the source code
From: cvsuser () gborg ! postgresql ! org (CVS User Account)
Date: 2005-11-22 5:12:06
Message-ID: 20051122051203.BF9753B2831 () gborg ! postgresql ! org
[Download RAW message or body]
Log Message:
-----------
A little attempt to make the source code readable again (reformatting
of comments that had been screwed by Darcy's pgindent run and such).
Jan
Modified Files:
--------------
slony1-engine/src/backend:
slony1_funcs.c (r1.36 -> r1.37)
slony1-engine/src/slon:
cleanup_thread.c (r1.28 -> r1.29)
confoptions.c (r1.14 -> r1.15)
confoptions.h (r1.25 -> r1.26)
dbutils.c (r1.17 -> r1.18)
local_listen.c (r1.34 -> r1.35)
misc.c (r1.21 -> r1.22)
remote_listen.c (r1.24 -> r1.25)
remote_worker.c (r1.98 -> r1.99)
runtime_config.c (r1.26 -> r1.27)
scheduler.c (r1.22 -> r1.23)
slon.c (r1.60 -> r1.61)
slon.h (r1.54 -> r1.55)
snmp_thread.c (r1.2 -> r1.3)
sync_thread.c (r1.16 -> r1.17)
slony1-engine/src/slonik:
dbutil.c (r1.8 -> r1.9)
slonik.c (r1.52 -> r1.53)
slonik.h (r1.24 -> r1.25)
-------------- next part --------------
Index: slony1_funcs.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.c,v
retrieving revision 1.36
retrieving revision 1.37
diff -Lsrc/backend/slony1_funcs.c -Lsrc/backend/slony1_funcs.c -u -w -r1.36 -r1.37
--- src/backend/slony1_funcs.c
+++ src/backend/slony1_funcs.c
@@ -1085,10 +1085,9 @@
slon_quote_identifier(const char *ident)
{
/*
- * Can avoid quoting if ident starts with a lowercase letter or
- * underscore and contains only lowercase letters, digits, and
- * underscores, *and* is not any SQL keyword. Otherwise, supply
- * quotes.
+ * Can avoid quoting if ident starts with a lowercase letter or underscore
+ * and contains only lowercase letters, digits, and underscores, *and* is
+ * not any SQL keyword. Otherwise, supply quotes.
*/
int nquotes = 0;
bool safe;
@@ -1097,8 +1096,8 @@
char *optr;
/*
- * would like to use <ctype.h> macros here, but they might yield
- * unwanted locale-specific results...
+ * would like to use <ctype.h> macros here, but they might yield unwanted
+ * locale-specific results...
*/
safe = ((ident[0] >= 'a' && ident[0] <= 'z') || ident[0] == '_');
@@ -1123,13 +1122,13 @@
if (safe)
{
/*
- * Check for keyword. This test is overly strong, since many of
- * the "keywords" known to the parser are usable as column names,
- * but the parser doesn't provide any easy way to test for whether
- * an identifier is safe or not... so be safe not sorry.
+ * Check for keyword. This test is overly strong, since many of the
+ * "keywords" known to the parser are usable as column names, but the
+ * parser doesn't provide any easy way to test for whether an
+ * identifier is safe or not... so be safe not sorry.
*
- * Note: ScanKeywordLookup() does case-insensitive comparison, but
- * that's fine, since we already know we have all-lower-case.
+ * Note: ScanKeywordLookup() does case-insensitive comparison, but that's
+ * fine, since we already know we have all-lower-case.
*/
if (ScanKeywordLookup(ident) != NULL)
safe = false;
Index: confoptions.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/confoptions.h,v
retrieving revision 1.25
retrieving revision 1.26
diff -Lsrc/slon/confoptions.h -Lsrc/slon/confoptions.h -u -w -r1.25 -r1.26
Index: scheduler.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/scheduler.c,v
retrieving revision 1.22
retrieving revision 1.23
diff -Lsrc/slon/scheduler.c -Lsrc/slon/scheduler.c -u -w -r1.22 -r1.23
--- src/slon/scheduler.c
+++ src/slon/scheduler.c
@@ -1,4 +1,4 @@
-/*-------------------------------------------------------------------------
+/* ----------------------------------------------------------------------
* scheduler.c
*
* Event scheduling subsystem for slon.
@@ -7,7 +7,7 @@
* Author: Jan Wieck, Afilias USA INC.
*
* $Id$
- *-------------------------------------------------------------------------
+ * ----------------------------------------------------------------------
*/
@@ -36,8 +36,9 @@
#define PF_LOCAL PF_UNIX
#endif
-/*
- * ---------- Static data ----------
+/* ----------
+ * Static data
+ * ----------
*/
static int sched_status = SCHED_STATUS_OK;
@@ -54,8 +55,9 @@
static pthread_cond_t sched_master_cond;
-/*
- * ---------- Local functions ----------
+/* ----------
+ * Local functions
+ * ----------
*/
static void *sched_mainloop(void *);
static void sched_add_fdset(int fd, fd_set * fds);
@@ -63,13 +65,14 @@
static void sched_shutdown();
-/*
- * ---------- sched_start_mainloop
+/* ----------
+ * sched_start_mainloop
*
* Called from SlonMain() before starting up any worker thread.
*
* This will spawn the event scheduling thread that does the central select(2)
- * system call. ----------
+ * system call.
+ * ----------
*/
int
sched_start_mainloop(void)
@@ -155,12 +158,13 @@
}
-/*
- * ---------- sched_wait_mainloop
+/* ----------
+ * sched_wait_mainloop
*
* Called from main() after all working threads according to the initial
* configuration are started. Will wait until the scheduler mainloop
- * terminates. ----------
+ * terminates.
+ * ----------
*/
int
sched_wait_mainloop(void)
@@ -177,13 +181,14 @@
}
-/*
- * ---------- sched_wait_conn
+/* ----------
+ * sched_wait_conn
*
* Assumes that the thread holds the lock on conn->conn_lock.
*
* Adds the connection to the central wait queue and wakes up the scheduler
- * thread to reloop onto the select(2) call. ----------
+ * thread to reloop onto the select(2) call.
+ * ----------
*/
int
sched_wait_conn(SlonConn * conn, int condition)
@@ -245,8 +250,8 @@
}
-/*
- * ---------- sched_wait_time
+/* ----------
+ * sched_wait_time
*
* Assumes that the thread holds the lock on conn->conn_lock.
*
@@ -274,10 +279,11 @@
}
-/*
- * ---------- sched_msleep
+/* ----------
+ * sched_msleep
*
- * Use the schedulers event loop to sleep for msec milliseconds. ----------
+ * Use the schedulers event loop to sleep for msec milliseconds.
+ * ----------
*/
int
sched_msleep(SlonNode * node, int msec)
@@ -301,10 +307,11 @@
}
-/*
- * ---------- sched_get_status
+/* ----------
+ * sched_get_status
*
- * Return the current scheduler status in a thread safe fashion ----------
+ * Return the current scheduler status in a thread safe fashion
+ * ----------
*/
int
sched_get_status(void)
@@ -318,12 +325,13 @@
}
-/*
- * ---------- sched_wakeup_node
+/* ----------
+ * sched_wakeup_node
*
* Wakeup the threads (listen and worker) of one or all remote nodes to cause
* them rechecking the current runtime status or adjust their configuration
- * to changes. ----------
+ * to changes.
+ * ----------
*/
int
sched_wakeup_node(int no_id)
@@ -370,10 +378,11 @@
}
-/*
- * ---------- sched_mainloop
+/* ----------
+ * sched_mainloop
*
- * The thread handling the master scheduling. ----------
+ * The thread handling the master scheduling.
+ * ----------
*/
static void *
sched_mainloop(void *dummy)
@@ -626,10 +635,8 @@
*/
/*
- close(sched_wakeuppipe[0]);
- sched_wakeuppipe[0] = -1;
- close(sched_wakeuppipe[1]);
- sched_wakeuppipe[1] = -1;
+ * close(sched_wakeuppipe[0]); sched_wakeuppipe[0] = -1;
+ * close(sched_wakeuppipe[1]); sched_wakeuppipe[1] = -1;
*/
/*
@@ -664,11 +671,12 @@
}
-/*
- * ---------- sched_add_fdset
+/* ----------
+ * sched_add_fdset
*
* Add a file descriptor to one of the global scheduler sets and adjust
- * sched_numfd accordingly. ----------
+ * sched_numfd accordingly.
+ * ----------
*/
static void
sched_add_fdset(int fd, fd_set * fds)
@@ -679,11 +687,12 @@
}
-/*
- * ---------- sched_add_fdset
+/* ----------
+ * sched_add_fdset
*
* Remove a file descriptor from one of the global scheduler sets and adjust
- * sched_numfd accordingly. ----------
+ * sched_numfd accordingly.
+ * ----------
*/
static void
sched_remove_fdset(int fd, fd_set * fds)
@@ -701,3 +710,5 @@
}
}
}
+
+
Index: snmp_thread.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/snmp_thread.c,v
retrieving revision 1.2
retrieving revision 1.3
diff -Lsrc/slon/snmp_thread.c -Lsrc/slon/snmp_thread.c -u -w -r1.2 -r1.3
--- src/slon/snmp_thread.c
+++ src/slon/snmp_thread.c
@@ -17,7 +17,8 @@
extern int slon_log_level;
-void init_nstAgentSubagentObject(void)
+void
+init_nstAgentSubagentObject(void)
{
static oid nstAgentSubagentObject_oid[] =
{ 1, 3, 6, 1, 4, 1, 20366, 32, 2, 3, 32, 1 };
@@ -29,7 +30,8 @@
}
-void *snmpThread_main(void *dummy)
+void *
+snmpThread_main(void *dummy)
{
int agentx_subagent=1;
@@ -52,11 +54,12 @@
netsnmp_ds_set_boolean(NETSNMP_DS_APPLICATION_ID,
NETSNMP_DS_AGENT_ROLE, 1);
- /* ********************************************************************
+ /*
+ * ********************************************************************
* If we are running slon as root allow the snmp agent to have full
* access to it's internals, this is required to run as a master agent
- * (from my understanding)
- **********************************************************************/
+ * (from my understanding) \
******************************************************************** + */
if (getuid() !=0)
{
@@ -68,7 +71,10 @@
init_agent("slon-demon");
- /* initialize the mib code found in: init_nstAgentSubagentObject from \
nstAgentSubagentObject.C */ + /*
+ * initialize the mib code found in: init_nstAgentSubagentObject from
+ * nstAgentSubagentObject.C
+ */
init_nstAgentSubagentObject();
/* initialize vacm/usm access control */
@@ -84,7 +90,8 @@
/* If we're going to be a snmp master agent, initial the ports */
if (!agentx_subagent)
{
- init_master_agent(); /* open the port to listen on (defaults to udp:161) */
+ init_master_agent(); /* open the port to listen on (defaults to
+ * udp:161) */
}
while(true)
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.98
retrieving revision 1.99
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.98 -r1.99
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -29,8 +29,9 @@
#include "confoptions.h"
-/*
- * ---------- Local definitions ----------
+/* ----------
+ * Local definitions
+ * ----------
*/
/*
@@ -224,8 +225,10 @@
int min_sync;
int quit_sync_provider;
int quit_sync_finalsync;
-/*
- * ---------- Local functions ----------
+
+/* ----------
+ * Local functions
+ * ----------
*/
static void adjust_provider_info(SlonNode * node,
WorkerGroupData * wd, int cleanup);
@@ -260,11 +263,12 @@
#define TERMINATE_QUERY_AND_ARCHIVE dstring_free(&query); terminate_log_archive();
-/*
- * ---------- slon_remoteWorkerThread
+/* ----------
+ * slon_remoteWorkerThread
*
* Listen for events on the local database connection. This means, events
- * generated by the local node only. ----------
+ * generated by the local node only.
+ * ----------
*/
void *
remoteWorkerThread_main(void *cdata)
@@ -478,9 +482,12 @@
* Estimate an "ideal" number of syncs based on how long
* they took last time
*/
- if (desired_sync_time != 0) {
+ if (desired_sync_time != 0)
+ {
ideal_sync = (last_sync_group_size * desired_sync_time) / last_sync_length;
- } else {
+ }
+ else
+ {
ideal_sync = sync_group_maxsize;
}
max_sync = ((last_sync_group_size * 110) / 100) + 1;
@@ -494,13 +501,20 @@
}
- /* Quit upon receiving event # quit_sync_number from node # quit_sync_provider \
*/
- if (quit_sync_provider != 0) {
- if (quit_sync_provider == node->no_id) {
- if ((next_sync_group_size + (event->ev_seqno)) > quit_sync_finalsync) {
+ /*
+ * Quit upon receiving event # quit_sync_number from node #
+ * quit_sync_provider
+ */
+ if (quit_sync_provider != 0)
+ {
+ if (quit_sync_provider == node->no_id)
+ {
+ if ((next_sync_group_size + (event->ev_seqno)) > quit_sync_finalsync)
+ {
next_sync_group_size = quit_sync_finalsync - event->ev_seqno;
}
- if (event->ev_seqno >= quit_sync_finalsync) {
+ if (event->ev_seqno >= quit_sync_finalsync)
+ {
slon_log(SLON_FATAL, "ABORT at sync %d per command line request%n", \
quit_sync_finalsync); slon_retry();
}
@@ -609,9 +623,11 @@
no_id, no_comment, no_spool);
need_reloadListen = true;
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_NODE");
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -633,9 +649,11 @@
need_reloadListen = true;
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = write_void_log (rtcfg_nodeid, seqbuf, "-- ENABLE_NODE");
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -690,9 +708,11 @@
rtcfg_cluster_name);
need_reloadListen = true;
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_NODE");
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -715,9 +735,11 @@
pa_server, pa_client, pa_conninfo, pa_connretry);
need_reloadListen = true;
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_PATH");
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -738,9 +760,11 @@
pa_server, pa_client);
need_reloadListen = true;
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_PATH");
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
@@ -761,9 +785,11 @@
"select %s.storeListen_int(%d, %d, %d); ",
rtcfg_namespace,
li_origin, li_provider, li_receiver);
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_LISTEN");
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -783,9 +809,11 @@
"select %s.dropListen_int(%d, %d, %d); ",
rtcfg_namespace,
li_origin, li_provider, li_receiver);
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_LISTEN");
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -807,9 +835,11 @@
rtcfg_namespace,
set_id, set_origin, set_comment);
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_SET");
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -826,17 +856,21 @@
"select %s.dropSet_int(%d); ",
rtcfg_namespace, set_id);
- /* The table deleted needs to be
- * dropped from log shipping too */
- if (archive_dir) {
+ /*
+ * The table deleted needs to be dropped from log shipping too
+ */
+ if (archive_dir)
+ {
rc = open_log_archive(rtcfg_nodeid, seqbuf);
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
rc = generate_archive_header(rtcfg_nodeid, seqbuf);
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -846,13 +880,15 @@
" where ssy_setid= %d;",
rtcfg_namespace, set_id);
rc = submit_query_to_archive(&query1);
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
rc = close_log_archive();
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -863,6 +899,7 @@
{
int set_id = (int)strtol(event->ev_data1, NULL, 10);
int add_id = (int)strtol(event->ev_data2, NULL, 10);
+
rtcfg_dropSet(add_id);
slon_appendquery(&query1,
@@ -870,19 +907,22 @@
rtcfg_namespace,
set_id, add_id);
- /* Log shipping gets the change here
- * that we need to delete the table
- * being merged from the set being
- * maintained. */
- if (archive_dir) {
+ /*
+ * Log shipping gets the change here that we need to delete
+ * the table being merged from the set being maintained.
+ */
+ if (archive_dir)
+ {
rc = open_log_archive(rtcfg_nodeid, seqbuf);
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
rc = generate_archive_header(rtcfg_nodeid, seqbuf);
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -892,13 +932,15 @@
" where ssy_setid= %d;",
rtcfg_namespace, add_id);
rc = submit_query_to_archive(&query1);
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
rc = close_log_archive();
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -912,9 +954,11 @@
* subscribed sets yet and table information is not maintained
* in the runtime configuration.
*/
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = write_void_log (rtcfg_nodeid, seqbuf, "-- SET_ADD_TABLE");
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -928,9 +972,11 @@
* subscribed sets yet and sequences information is not
* maintained in the runtime configuration.
*/
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = write_void_log (rtcfg_nodeid, seqbuf, "-- SET_ADD_SEQUENCE");
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -944,9 +990,11 @@
slon_appendquery(&query1, "select %s.setDropTable_int(%d);",
rtcfg_namespace,
tab_id);
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = write_void_log (rtcfg_nodeid, seqbuf, "-- SET_DROP_TABLE");
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -960,9 +1008,11 @@
slon_appendquery(&query1, "select %s.setDropSequence_int(%d);",
rtcfg_namespace,
seq_id);
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = write_void_log (rtcfg_nodeid, seqbuf, "-- SET_DROP_SEQUENCE");
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -977,9 +1027,11 @@
slon_appendquery(&query1, "select %s.setMoveTable_int(%d, %d);",
rtcfg_namespace,
tab_id, new_set_id);
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = write_void_log (rtcfg_nodeid, seqbuf, "-- SET_MOVE_TABLE");
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -994,9 +1046,11 @@
slon_appendquery(&query1, "select %s.setMoveSequence_int(%d, %d);",
rtcfg_namespace,
seq_id, new_set_id);
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = write_void_log (rtcfg_nodeid, seqbuf, "-- SET_MOVE_SEQUENCE");
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -1012,9 +1066,11 @@
"select %s.storeTrigger_int(%d, '%q'); ",
rtcfg_namespace,
trig_tabid, trig_tgname);
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_TRIGGER");
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -1030,9 +1086,11 @@
"select %s.dropTrigger_int(%d, '%q'); ",
rtcfg_namespace,
trig_tabid, trig_tgname);
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_TRIGGER");
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -1041,8 +1099,10 @@
}
else if (strcmp(event->ev_type, "ACCEPT_SET") == 0)
{
- int set_id, old_origin,
- new_origin, event_no;
+ int set_id,
+ old_origin,
+ new_origin,
+ event_no;
PGresult *res;
slon_log(SLON_DEBUG2, "start processing ACCEPT_SET\n");
@@ -1057,17 +1117,17 @@
slon_log(SLON_DEBUG2, "got parms ACCEPT_SET\n");
- /* If we're a remote node, and haven't yet
- * received the MOVE/FAILOVER_SET event from the
- * new origin, then we'll need to sleep a
- * bit... This avoids a race condition
- * where new SYNCs take place on the new
- * origin, and are ignored on some
- * subscribers (and their children)
- * because the MOVE_SET wasn't yet
- * received and processed */
+ /*
+ * If we're a remote node, and haven't yet received the
+ * MOVE/FAILOVER_SET event from the new origin, then we'll
+ * need to sleep a bit... This avoids a race condition where
+ * new SYNCs take place on the new origin, and are ignored on
+ * some subscribers (and their children) because the MOVE_SET
+ * wasn't yet received and processed
+ */
- if ((rtcfg_nodeid != old_origin) && (rtcfg_nodeid != new_origin)) {
+ if ((rtcfg_nodeid != old_origin) && (rtcfg_nodeid != new_origin))
+ {
slon_log(SLON_DEBUG2, "ACCEPT_SET - node not origin\n");
slon_mkquery(&query2,
"select 1 from %s.sl_event "
@@ -1109,7 +1169,9 @@
slon_retry();
need_reloadListen = true;
- } else {
+ }
+ else
+ {
slon_log(SLON_DEBUG2, "ACCEPT_SET - on origin node...\n");
}
@@ -1178,9 +1240,11 @@
rtcfg_namespace,
failed_node, backup_node, set_id);
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = write_void_log (rtcfg_nodeid, seqbuf, "-- FAILOVER_SET");
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -1202,9 +1266,11 @@
"select %s.subscribeSet_int(%d, %d, %d, '%q'); ",
rtcfg_namespace,
sub_set, sub_provider, sub_receiver, sub_forward);
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = write_void_log (rtcfg_nodeid, seqbuf, "-- SUBSCRIBE_SET");
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -1316,9 +1382,11 @@
rtcfg_namespace,
sub_set, sub_provider, sub_receiver);
}
- /* Note: No need to do anything based
- on archive_dir here; copy_set does
- that nicely already. */
+
+ /*
+ * Note: No need to do anything based on archive_dir here;
+ * copy_set does that nicely already.
+ */
need_reloadListen = true;
}
else if (strcmp(event->ev_type, "UNSUBSCRIBE_SET") == 0)
@@ -1336,15 +1404,18 @@
sub_set, sub_receiver);
need_reloadListen = true;
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = open_log_archive(rtcfg_nodeid, seqbuf);
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
rc = generate_archive_header(rtcfg_nodeid, seqbuf);
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -1354,13 +1425,15 @@
" where ssy_setid= %d;",
rtcfg_namespace, sub_set);
rc = submit_query_to_archive(&query1);
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
rc = close_log_archive();
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
@@ -1380,32 +1453,38 @@
ddl_setid, ddl_script, ddl_only_on_node);
/* DDL_SCRIPT needs to be turned into a log shipping script */
- if (archive_dir) {
- if ((ddl_only_on_node < 1) || (ddl_only_on_node == rtcfg_nodeid)) {
+ if (archive_dir)
+ {
+ if ((ddl_only_on_node < 1) || (ddl_only_on_node == rtcfg_nodeid))
+ {
rc = open_log_archive(node->no_id, seqbuf);
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
"Could not open DDL archive file %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
generate_archive_header(node->no_id, seqbuf);
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
"Could not generate DDL archive header %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
rc = logarchive_tracking(rtcfg_namespace, ddl_setid, seqbuf, seqbuf, \
event->ev_timestamp_c);
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
"Could not generate DDL archive tracker %s - %s",
node->no_id, archive_tmp, strerror(errno));
slon_retry();
}
rc = submit_string_to_archive(ddl_script);
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
"Could not submit DDL Script %s - %s",
node->no_id, archive_tmp, strerror(errno));
@@ -1413,7 +1492,8 @@
}
rc = close_log_archive();
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
"Could not close DDL Script %s - %s",
node->no_id, archive_tmp, strerror(errno));
@@ -1500,6 +1580,10 @@
}
+/* ----------
+ * adjust_provider_info
+ * ----------
+ */
static void
adjust_provider_info(SlonNode * node, WorkerGroupData * wd, int cleanup)
{
@@ -1772,11 +1856,12 @@
}
-/*
- * ---------- remoteWorker_event
+/* ----------
+ * remoteWorker_event
*
* Used by the remoteListeThread to forward events selected from the event
- * provider database to the remote nodes worker thread. ----------
+ * provider database to the remote nodes worker thread.
+ * ----------
*/
void
remoteWorker_event(int event_provider,
@@ -1971,11 +2056,12 @@
}
-/*
- * ---------- remoteWorker_wakeup
+/* ----------
+ * remoteWorker_wakeup
*
* Send a special WAKEUP message to a worker, causing it to recheck the runmode
- * and the runtime configuration. ----------
+ * and the runtime configuration.
+ * ----------
*/
void
remoteWorker_wakeup(int no_id)
@@ -2020,10 +2106,11 @@
}
-/*
- * ---------- remoteWorker_confirm
+/* ----------
+ * remoteWorker_confirm
*
- * Add a confirm message to the remote worker message queue ----------
+ * Add a confirm message to the remote worker message queue
+ * ----------
*/
void
remoteWorker_confirm(int no_id,
@@ -2121,10 +2208,11 @@
}
-/*
- * ---------- query_execute
+/* ----------
+ * query_execute
*
- * Execute a query string that does not return a result set. ----------
+ * Execute a query string that does not return a result set.
+ * ----------
*/
static int
query_execute(SlonNode * node, PGconn *dbconn, SlonDString * dsp)
@@ -2151,8 +2239,8 @@
}
-/*
- * ---------- query_append_event
+/* ----------
+ * query_append_event
*
* Add queries to a dstring that notify for Event and Confirm and that insert a
* duplicate of an event record as well as the confirmation for it.
@@ -2220,10 +2308,11 @@
}
-/*
- * ---------- store_confirm_forward
+/* ----------
+ * store_confirm_forward
*
- * Call the forwardConfirm() stored procedure. ----------
+ * Call the forwardConfirm() stored procedure.
+ * ----------
*/
static void
store_confirm_forward(SlonNode * node, SlonConn * conn,
@@ -2314,11 +2403,12 @@
}
-/*
- * ---------- get_last_forwarded_confirm
+/* ----------
+ * get_last_forwarded_confirm
*
* Look what confirmed event seqno we forwarded last for a given origin+receiver
- * pair. ----------
+ * pair.
+ * ----------
*/
static int64
get_last_forwarded_confirm(int origin, int receiver)
@@ -2348,6 +2438,10 @@
}
+/* ----------
+ * copy_set
+ * ----------
+ */
static int
copy_set(SlonNode * node, SlonConn * local_conn, int set_id,
SlonWorkMsg_event * event)
@@ -2385,7 +2479,6 @@
#ifdef HAVE_PQPUTCOPYDATA
char *copydata = NULL;
-
#else
char copybuf[8192];
int copydone;
@@ -2465,12 +2558,16 @@
/* Log Shipping Support begins... */
- /* - Open the log, put the header in
- Isn't it convenient that seqbuf was just populated??? :-)
+
+ /*
+ * - Open the log, put the header in Isn't it convenient that seqbuf was
+ * just populated??? :-)
*/
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = open_log_archive(rtcfg_nodeid, seqbuf);
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
"Could not open COPY SET archive file %s - %s",
node->no_id, archive_tmp, strerror(errno));
@@ -2483,7 +2580,8 @@
return -1;
}
rc = generate_archive_header(rtcfg_nodeid, seqbuf);
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
"Could not generate COPY SET archive header %s - %s",
node->no_id, archive_tmp, strerror(errno));
@@ -2496,6 +2594,7 @@
return -1;
}
}
+
/*
* Register this connection in sl_nodelock
*/
@@ -2578,12 +2677,16 @@
}
}
- /* check tables/sequences in set to make sure they are there
- * and in good order. Don't copy any data yet; we want to
- * just do a first pass that finds "bozo errors" */
+ /*
+ * check tables/sequences in set to make sure they are there and in good
+ * order. Don't copy any data yet; we want to just do a first pass that
+ * finds "bozo errors"
+ */
- /* Check tables and sequences in set to make sure they are all
- * appropriately configured... */
+ /*
+ * Check tables and sequences in set to make sure they are all
+ * appropriately configured...
+ */
/*
* Select the list of all tables the provider currently has in the set.
@@ -2700,7 +2803,8 @@
slon_mkquery(&query3, "select * from %s limit 0;",
tab_fqname);
res2 = PQexec(loc_dbconn, dstring_data(&query3));
- if (PQresultStatus(res2) != PGRES_TUPLES_OK) {
+ if (PQresultStatus(res2) != PGRES_TUPLES_OK)
+ {
slon_log (SLON_ERROR, "remoteWorkerThread_%d: Could not find table %s "
"on subscriber\n", node->no_id, tab_fqname);
PQclear(res2);
@@ -2718,6 +2822,7 @@
slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
"all tables for set %d found on subscriber\n",
node->no_id, set_id);
+
/*
* Add in the sequences contained in the set
*/
@@ -3002,7 +3107,8 @@
res3 = PQexec(pro_dbconn, dstring_data(&query2));
- if (PQresultStatus(res3) != PGRES_TUPLES_OK) {
+ if (PQresultStatus(res3) != PGRES_TUPLES_OK)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s\n",
node->no_id, dstring_data(&query2),
PQresultErrorMessage(res3));
@@ -3021,7 +3127,8 @@
rtcfg_namespace);
res4 = PQexec(loc_dbconn, dstring_data(&query2));
- if (PQresultStatus(res4) != PGRES_TUPLES_OK) {
+ if (PQresultStatus(res4) != PGRES_TUPLES_OK)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s\n",
node->no_id, dstring_data(&query2),
PQresultErrorMessage(res4));
@@ -3068,12 +3175,14 @@
terminate_log_archive();
return -1;
}
- if (archive_dir) {
+ if (archive_dir)
+ {
slon_mkquery(&query1,
"delete from %s;copy %s %s from stdin;", tab_fqname, tab_fqname,
nodeon73 ? "" : PQgetvalue(res3, 0, 0));
rc = submit_query_to_archive(&query1);
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_d: "
"Could not generate copy_set request for %s - %s",
node->no_id, tab_fqname, strerror(errno));
@@ -3087,6 +3196,7 @@
return -1;
}
}
+
/*
* Begin a COPY to stdout for the table on the provider DB
*/
@@ -3148,9 +3258,11 @@
terminate_log_archive();
return -1;
}
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = fwrite(copydata, 1, len, archive_fp);
- if (rc != len) {
+ if (rc != len)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
"PQputCopyData() - log shipping - %s",
node->no_id, strerror(errno));
@@ -3255,7 +3367,8 @@
terminate_log_archive();
return -1;
}
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = submit_string_to_archive("\\.");
}
#else /* ! HAVE_PQPUTCOPYDATA */
@@ -3293,9 +3406,11 @@
}
}
PQputline(loc_dbconn, "\\.\n");
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = submit_string_to_archive("\\.");
}
+
/*
* End the COPY to stdout on the provider
*/
@@ -3366,7 +3481,8 @@
terminate_log_archive();
return -1;
}
- if (archive_dir) {
+ if (archive_dir)
+ {
submit_query_to_archive(&query1);
}
@@ -3437,7 +3553,8 @@
"select \"pg_catalog\".setval('%q', '%s'); ",
seq_fqname, seql_last_value);
- if (archive_dir) {
+ if (archive_dir)
+ {
submit_query_to_archive(&query1);
}
}
@@ -3722,13 +3839,15 @@
terminate_log_archive();
return -1;
}
- if (archive_dir) {
+ if (archive_dir)
+ {
slon_mkquery(&query1,
"insert into %s.sl_setsync_offline () "
"values ('%d', '%d');",
rtcfg_namespace, set_id, ssy_seqno);
rc = submit_query_to_archive(&query1);
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
" could not insert to sl_setsync_offline",
node->no_id);
@@ -3747,9 +3866,11 @@
node->no_id,
TIMEVAL_DIFF(&tv_start2, &tv_now));
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = close_log_archive();
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
" could not close archive log %s - %s",
node->no_id, archive_tmp, strerror(errno));
@@ -3795,6 +3916,11 @@
return 0;
}
+
+/* ----------
+ * sync_event
+ * ----------
+ */
static int
sync_event(SlonNode * node, SlonConn * local_conn,
WorkerGroupData * wd, SlonWorkMsg_event * event)
@@ -3832,13 +3958,14 @@
dstring_init(&query);
/*
- * If this slon is running in log archiving mode, open a
- * temporary file for it.
+ * If this slon is running in log archiving mode, open a temporary file
+ * for it.
*/
if (archive_dir)
{
rc = open_log_archive(node->no_id, seqbuf);
- if (rc == -1) {
+ if (rc == -1)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
"Cannot open archive file %s - %s\n",
node->no_id, archive_tmp, strerror(errno));
@@ -3846,7 +3973,8 @@
return 60;
}
rc = generate_archive_header(node->no_id, seqbuf);
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
"Cannot write to archive file %s - %s",
node->no_id, archive_tmp, strerror(errno));
@@ -4169,9 +4297,12 @@
slon_log(SLON_DEBUG2, " ssy_action_list value: %s length: %d\n",
ssy_action_list, actionlist_len);
- if (actionlist_len == 0) {
+ if (actionlist_len == 0)
+ {
slon_appendquery(provider_qual, "\n) ");
- } else {
+ }
+ else
+ {
dstring_init(&actionseq_subquery);
compress_actionseq(ssy_action_list, &actionseq_subquery);
slon_appendquery(provider_qual,
@@ -4183,9 +4314,9 @@
PQclear(res2);
/*
- * Add a call to the setsync tracking function to
- * the archive log. This function ensures that all
- * archive log files are applied in the right order.
+ * Add a call to the setsync tracking function to the archive log.
+ * This function ensures that all archive log files are applied in
+ * the right order.
*/
if (archive_dir)
{
@@ -4195,7 +4326,8 @@
rc = logarchive_tracking(rtcfg_namespace, sub_set,
PQgetvalue(res1, tupno1, 1), seqbuf,
event->ev_timestamp_c);
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
"Cannot write to archive file %s - %s",
node->no_id, archive_tmp, strerror(errno));
@@ -4231,9 +4363,11 @@
"no sets need syncing for this event\n",
node->no_id);
dstring_free(&query);
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = close_log_archive();
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
"Could not close out archive file %s - %s",
node->no_id, archive_tmp, strerror(errno));
@@ -4351,13 +4485,18 @@
PQclear(res1);
/*
- * Add the user data modification part to
- * the archive log.
+ * Add the user data modification part to the archive log.
*/
- if (archive_dir) {
+ if (archive_dir)
+ {
rc = submit_string_to_archive(dstring_data(&(wgline->data)));
- /* rc = fprintf(archive_fp, "%s", dstring_data(&(wgline->data))); */
- if (rc < 0) {
+
+ /*
+ * rc = fprintf(archive_fp, "%s",
+ * dstring_data(&(wgline->data)));
+ */
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
"Cannot write to archive file %s - %s",
node->no_id, archive_tmp, strerror(errno));
@@ -4512,7 +4651,8 @@
rtcfg_namespace,
seql_seqid, seql_last_value);
rc = submit_query_to_archive(&query);
- if (rc < 0) {
+ if (rc < 0)
+ {
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
"Cannot write to archive file %s - %s",
node->no_id, archive_tmp, strerror(errno));
@@ -4609,8 +4749,8 @@
PQclear(res1);
/*
- * Add the final commit to the archive log, close it and rename
- * the temporary file to the real log chunk filename.
+ * Add the final commit to the archive log, close it and rename the
+ * temporary file to the real log chunk filename.
*/
if (archive_dir)
{
@@ -4630,6 +4770,10 @@
}
+/* ----------
+ * sync_helper
+ * ----------
+ */
static void *
sync_helper(void *cdata)
{
@@ -5007,32 +5151,43 @@
}
}
-/* Functions for processing log archives...
-
- - First, you open the log archive using open_log_archive()
-
- - Second, you generate the header using generate_archive_header()
-
- - Third, you need to set up the sync tracking function in the log
- using logarchive_tracking()
-
- ============= Here Ends The Header of the Log Shipping Archive \
==================
-
- Then come the various queries (inserts/deletes/updates) that
- comprise the "body" of the SYNC. Probably submitted using
- submit_query_to_archive().
-
- ============= Here Ends The Body of the Log Shipping Archive ==================
-
- Finally, the log ends, notably with a COMMIT statement, generated
- using close_log_archive(), which closes the file and renames it
- from ".tmp" form to the final name.
+/* ----------
+ * Functions for processing log archives...
+ *
+ * - First, you open the log archive using open_log_archive()
+ *
+ * - Second, you generate the header using generate_archive_header()
+ *
+ * - Third, you need to set up the sync tracking function in the log
+ * using logarchive_tracking()
+ *
+ * ======== Here Ends The Header of the Log Shipping Archive ========
+ *
+ * Then come the various queries (inserts/deletes/updates) that
+ * comprise the "body" of the SYNC. Probably submitted using
+ * submit_query_to_archive().
+ *
+ * ======== Here Ends The Body of the Log Shipping Archive ========
+ *
+ * Finally, the log ends, notably with a COMMIT statement, generated
+ * using close_log_archive(), which closes the file and renames it
+ * from ".tmp" form to the final name.
+ * ----------
*/
-/* Stores the archive name in archive_name (as .sql name) and archive_tmp (.tmp \
file) */
-int open_log_archive (int node_id, char *seqbuf) {
+/* ----------
+ * open_log_archive
+ *
+ * Stores the archive name in archive_name (as .sql name) and
+ * archive_tmp (.tmp file)
+ * ----------
+ */
+int
+open_log_archive(int node_id, char *seqbuf)
+{
int i;
+
sprintf(archive_name, "%s/slony1_log_%d_", archive_dir, node_id);
for (i = strlen(seqbuf); i < 20; i++)
strcat(archive_name, "0");
@@ -5041,16 +5196,27 @@
strcpy(archive_tmp, archive_name);
strcat(archive_tmp, ".tmp");
archive_fp = fopen(archive_tmp, "w");
- if (archive_fp == NULL) {
+ if (archive_fp == NULL)
+ {
return -1;
- } else {
+ }
+ else
+ {
return 0;
}
}
-int close_log_archive () {
+/* ----------
+ * close_log_archive
+ * ----------
+ */
+int
+close_log_archive()
+{
int rc = 0;
- if (archive_dir) {
+
+ if (archive_dir)
+ {
rc = fprintf(archive_fp,
"\n------------------------------------------------------------------\n"
"-- End Of Archive Log\n"
@@ -5069,9 +5235,16 @@
return rc;
}
-int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq,
- const char *seqbuf, const char *timestamp) {
- return fprintf(archive_fp, "\nselect %s.setsyncTracking_offline(%d, '%s', '%s', \
'%s');\n" +/* ----------
+ * logarchive_tracking
+ * ----------
+ */
+int
+logarchive_tracking(const char *namespace, int sub_set, const char *firstseq,
+ const char *seqbuf, const char *timestamp)
+{
+ return fprintf(archive_fp,
+ "\nselect %s.setsyncTracking_offline(%d, '%s', '%s', '%s');\n"
"-- end of log archiving header\n"
"------------------------------------------------------------------\n"
"-- start of Slony-I data\n"
@@ -5079,26 +5252,58 @@
namespace, sub_set, firstseq, seqbuf, timestamp);
}
-int submit_query_to_archive(SlonDString *ds) {
+/* ----------
+ * submit_query_to_archive
+ * ----------
+ */
+int
+submit_query_to_archive(SlonDString * ds)
+{
return fprintf(archive_fp, "%s\n", ds->data);
}
-int submit_string_to_archive (const char *s) {
+/* ----------
+ * submit_string_to_archive
+ * ----------
+ */
+int
+submit_string_to_archive(const char *s)
+{
return fprintf(archive_fp, "%s\n", s);
}
-/* Raw form used for COPY where we don't want any extra cr/lf output */
-int submit_raw_data_to_archive (const char *s) {
+/* ----------
+ * submit_raw_data_to_archive
+ *
+ * Raw form used for COPY where we don't want any extra cr/lf output
+ * ----------
+ */
+int
+submit_raw_data_to_archive(const char *s)
+{
return fprintf(archive_fp, "%s", s);
}
-void terminate_log_archive () {
- if (archive_fp) {
+/* ----------
+ * terminate_log_archive
+ * ----------
+ */
+void
+terminate_log_archive()
+{
+ if (archive_fp)
+ {
fclose(archive_fp);
}
}
-int generate_archive_header (int node_id, const char *seqbuf) {
+/* ----------
+ * generate_archive_header
+ * ----------
+ */
+int
+generate_archive_header(int node_id, const char *seqbuf)
+{
return fprintf(archive_fp,
"-- Slony-I log shipping archive\n"
"-- Node %d, Event %s\n"
@@ -5106,11 +5311,18 @@
node_id, seqbuf);
}
-/* write_void_log() writes out a "void" log consisting of the message
- * which must either be a valid SQL query or a SQL comment. */
-
-int write_void_log (int node_id, char *seqbuf, const char *message) {
+/* ----------
+ * write_void_log
+ *
+ * writes out a "void" log consisting of the message which must either
+ * be a valid SQL query or a SQL comment.
+ * ----------
+ */
+int
+write_void_log(int node_id, char *seqbuf, const char *message)
+{
int rc;
+
rc = open_log_archive(node_id, seqbuf);
if (rc < 0)
return rc;
@@ -5124,20 +5336,23 @@
return rc;
}
-/* given a string consisting of a list of actionseq values, return a
- string that compresses this into a set of log_actionseq ranges
-
- Thus, "'13455','13456','13457','13458','13459','13460','13462'"
- compresses into...
-
- log_actionseq not between '13455' and '13460' and
- log_actionseq <> '13462'
-
- There is an expectation that the actionseq values are being
- returned more or less in order; if that is even somewhat loosely
- the case, this will lead to a pretty spectacular compression of
- values if the SUBSCRIBE_SET runs for a long time thereby leading to
- there being Really A Lot of log entries to exclude. */
+/* ----------
+ * given a string consisting of a list of actionseq values, return a
+ * string that compresses this into a set of log_actionseq ranges
+ *
+ * Thus, "'13455','13456','13457','13458','13459','13460','13462'"
+ * compresses into...
+ *
+ * log_actionseq not between '13455' and '13460' and
+ * log_actionseq <> '13462'
+ *
+ * There is an expectation that the actionseq values are being
+ * returned more or less in order; if that is even somewhat loosely
+ * the case, this will lead to a pretty spectacular compression of
+ * values if the SUBSCRIBE_SET runs for a long time thereby leading to
+ * there being Really A Lot of log entries to exclude.
+ * ----------
+ */
#define START_STATE 1
#define COLLECTING_DIGITS 2
@@ -5146,12 +5361,21 @@
#define MINMAXINITIAL -1
-void compress_actionseq (const char *ssy_actionlist, SlonDString *action_subquery) {
+/* ----------
+ * compress_actionseq
+ * ----------
+ */
+void
+compress_actionseq(const char *ssy_actionlist, SlonDString * action_subquery)
+{
int state;
- int curr_number, curr_min, curr_max;
+ int curr_number,
+ curr_min,
+ curr_max;
int curr_digit;
int first_subquery;
char curr_char;
+
curr_min = MINMAXINITIAL;
curr_max = MINMAXINITIAL;
first_subquery = 1;
@@ -5159,143 +5383,196 @@
slon_mkquery(action_subquery, " ");
slon_log(SLON_DEBUG3, "compress_actionseq(list,subquery) Action list: %s\n", \
ssy_actionlist);
- while (state != DONE) {
+ while (state != DONE)
+ {
curr_char = *ssy_actionlist;
- switch (curr_char) {
+ switch (curr_char)
+ {
case '\0':
state = DONE;
break;
case '0':
curr_digit = 0;
- if (state == COLLECTING_DIGITS) {
+ if (state == COLLECTING_DIGITS)
+ {
curr_number = curr_number * 10 + curr_digit;
- } else {
+ }
+ else
+ {
state = COLLECTING_DIGITS;
curr_number = curr_digit;
}
break;
case '1':
curr_digit = 1;
- if (state == COLLECTING_DIGITS) {
+ if (state == COLLECTING_DIGITS)
+ {
curr_number = curr_number * 10 + curr_digit;
- } else {
+ }
+ else
+ {
state = COLLECTING_DIGITS;
curr_number = curr_digit;
}
break;
case '2':
curr_digit = 2;
- if (state == COLLECTING_DIGITS) {
+ if (state == COLLECTING_DIGITS)
+ {
curr_number = curr_number * 10 + curr_digit;
- } else {
+ }
+ else
+ {
state = COLLECTING_DIGITS;
curr_number = curr_digit;
}
break;
case '3':
curr_digit = 3;
- if (state == COLLECTING_DIGITS) {
+ if (state == COLLECTING_DIGITS)
+ {
curr_number = curr_number * 10 + curr_digit;
- } else {
+ }
+ else
+ {
state = COLLECTING_DIGITS;
curr_number = curr_digit;
}
break;
case '4':
curr_digit = 4;
- if (state == COLLECTING_DIGITS) {
+ if (state == COLLECTING_DIGITS)
+ {
curr_number = curr_number * 10 + curr_digit;
- } else {
+ }
+ else
+ {
state = COLLECTING_DIGITS;
curr_number = curr_digit;
}
break;
case '5':
curr_digit = 5;
- if (state == COLLECTING_DIGITS) {
+ if (state == COLLECTING_DIGITS)
+ {
curr_number = curr_number * 10 + curr_digit;
- } else {
+ }
+ else
+ {
state = COLLECTING_DIGITS;
curr_number = curr_digit;
}
break;
case '6':
curr_digit = 6;
- if (state == COLLECTING_DIGITS) {
+ if (state == COLLECTING_DIGITS)
+ {
curr_number = curr_number * 10 + curr_digit;
- } else {
+ }
+ else
+ {
state = COLLECTING_DIGITS;
curr_number = curr_digit;
}
break;
case '7':
curr_digit = 7;
- if (state == COLLECTING_DIGITS) {
+ if (state == COLLECTING_DIGITS)
+ {
curr_number = curr_number * 10 + curr_digit;
- } else {
+ }
+ else
+ {
state = COLLECTING_DIGITS;
curr_number = curr_digit;
}
break;
case '8':
curr_digit = 8;
- if (state == COLLECTING_DIGITS) {
+ if (state == COLLECTING_DIGITS)
+ {
curr_number = curr_number * 10 + curr_digit;
- } else {
+ }
+ else
+ {
state = COLLECTING_DIGITS;
curr_number = curr_digit;
}
break;
case '9':
curr_digit = 9;
- if (state == COLLECTING_DIGITS) {
+ if (state == COLLECTING_DIGITS)
+ {
curr_number = curr_number * 10 + curr_digit;
- } else {
+ }
+ else
+ {
state = COLLECTING_DIGITS;
curr_number = curr_digit;
}
break;
case '\'':
case ',':
- if (state == COLLECTING_DIGITS) {
+ if (state == COLLECTING_DIGITS)
+ {
/* Finished another number... Fold it into the ranges... */
slon_log(SLON_DEBUG4, "Finished number: %d\n", curr_number);
- /* If we haven't a range, then the range is the current
- number */
- if (curr_min == MINMAXINITIAL) {
+
+ /*
+ * If we haven't a range, then the range is the current
+ * number
+ */
+ if (curr_min == MINMAXINITIAL)
+ {
curr_min = curr_number;
}
- if (curr_max == MINMAXINITIAL) {
+ if (curr_max == MINMAXINITIAL)
+ {
curr_max = curr_number;
}
- /* If the number pushes the range outwards by 1,
- then shift the range by 1... */
- if (curr_number == curr_min - 1) {
+
+ /*
+ * If the number pushes the range outwards by 1, then
+ * shift the range by 1...
+ */
+ if (curr_number == curr_min - 1)
+ {
curr_min --;
}
- if (curr_number == curr_max + 1) {
+ if (curr_number == curr_max + 1)
+ {
curr_max ++;
}
/* If the number is inside the range, do nothing */
- if ((curr_number >= curr_min) && (curr_number <= curr_max)) {
+ if ((curr_number >= curr_min) && (curr_number <= curr_max))
+ {
/* Do nothing - inside the range */
}
- /* If the number is outside the range, then
- generate a subquery based on the range, and
- have the new number become the new range */
- if ((curr_number < curr_min - 1) || (curr_number > curr_max + 1)) {
- if (first_subquery) {
+ /*
+ * If the number is outside the range, then generate a
+ * subquery based on the range, and have the new number
+ * become the new range
+ */
+ if ((curr_number < curr_min - 1) || (curr_number > curr_max + 1))
+ {
+ if (first_subquery)
+ {
first_subquery = 0;
- } else {
+ }
+ else
+ {
slon_appendquery(action_subquery, " and ");
}
- if (curr_max == curr_min) {
+ if (curr_max == curr_min)
+ {
slon_log(SLON_DEBUG4, "simple entry - %d\n", curr_max);
slon_appendquery(action_subquery,
" log_actionseq <> '%d' ", curr_max);
- } else {
+ }
+ else
+ {
slon_log(SLON_DEBUG4, "between entry - %d %d\n",
curr_min, curr_max);
slon_appendquery(action_subquery,
@@ -5313,17 +5590,24 @@
ssy_actionlist++;
}
/* process last range, if it exists */
- if (curr_min || curr_max) {
- if (first_subquery) {
+ if (curr_min || curr_max)
+ {
+ if (first_subquery)
+ {
first_subquery = 0;
- } else {
+ }
+ else
+ {
slon_appendquery(action_subquery, " and ");
}
- if (curr_max == curr_min) {
+ if (curr_max == curr_min)
+ {
slon_log(SLON_DEBUG4, "simple entry - %d\n", curr_max);
slon_appendquery(action_subquery,
" log_actionseq <> '%d' ", curr_max);
- } else {
+ }
+ else
+ {
slon_log(SLON_DEBUG4, "between entry - %d %d\n",
curr_min, curr_max);
slon_appendquery(action_subquery,
@@ -5335,3 +5619,5 @@
}
slon_log(SLON_DEBUG3, " compressed actionseq subquery... %s\n", \
dstring_data(action_subquery)); }
+
+
Index: remote_listen.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_listen.c,v
retrieving revision 1.24
retrieving revision 1.25
diff -Lsrc/slon/remote_listen.c -Lsrc/slon/remote_listen.c -u -w -r1.24 -r1.25
--- src/slon/remote_listen.c
+++ src/slon/remote_listen.c
@@ -1,4 +1,4 @@
-/*-------------------------------------------------------------------------
+/* ----------------------------------------------------------------------
* remote_listen.c
*
* Implementation of the thread listening for events on
@@ -8,7 +8,7 @@
* Author: Jan Wieck, Afilias USA INC.
*
* $Id$
- *-------------------------------------------------------------------------
+ * ----------------------------------------------------------------------
*/
@@ -29,8 +29,8 @@
#include "slon.h"
-/*
- * ---------- struct listat
+/* ----------
+ * struct listat
*
* local data structure for nodes we are currently listening for events from.
* ----------
@@ -44,8 +44,9 @@
};
-/*
- * ---------- Local functions ----------
+/* ----------
+ * Local functions
+ * ----------
*/
static void remoteListen_adjust_listat(SlonNode * node,
struct listat **listat_head,
@@ -59,11 +60,12 @@
extern char *lag_interval;
-/*
- * ---------- slon_remoteListenThread
+/* ----------
+ * slon_remoteListenThread
*
* Listen for events on a remote database connection. This means, events
- * generated by every other node we listen for on this one. ----------
+ * generated by every other node we listen for on this one.
+ * ----------
*/
void *
remoteListenThread_main(void *cdata)
@@ -223,8 +225,8 @@
dbconn = conn->dbconn;
/*
- * Listen on the connection for events and confirmations
- * and register the node connection.
+ * Listen on the connection for events and confirmations and
+ * register the node connection.
*/
slon_mkquery(&query1,
"listen \"_%s_Event\"; "
@@ -385,8 +387,8 @@
}
-/*
- * ---------- remoteListen_adjust_listat
+/* ----------
+ * remoteListen_adjust_listat
*
* local function to (re)adjust the known nodes to the global configuration.
* ----------
@@ -489,10 +491,11 @@
}
-/*
- * ---------- remoteListen_cleanup
+/* ----------
+ * remoteListen_cleanup
*
- * Free resources used by the remoteListen thread ----------
+ * Free resources used by the remoteListen thread
+ * ----------
*/
static void
remoteListen_cleanup(struct listat **listat_head, struct listat **listat_tail)
@@ -510,13 +513,14 @@
}
-/*
- * ---------- remoteListen_forward_confirm
+/* ----------
+ * remoteListen_forward_confirm
*
- * Read the last confirmed event sequence for all nodes from the remote database
- * and forward it to the local database so that the cleanup process can know
- * when all nodes have confirmed an event so it may be safely thrown away (together
- * with its log data). ----------
+ * Read the last confirmed event sequence for all nodes from the remote
+ * database and forward it to the local database so that the cleanup
+ * process can know when all nodes have confirmed an event so it may
+ * be safely thrown away (together with its log data).
+ * ----------
*/
static int
remoteListen_forward_confirm(SlonNode * node, SlonConn * conn)
@@ -574,8 +578,8 @@
}
-/*
- * ---------- remoteListen_receive_events
+/* ----------
+ * remoteListen_receive_events
*
* Retrieve all new events that origin from nodes for which we listen on this
* node as provider and add them to the node specific worker message queue.
@@ -622,7 +626,8 @@
rtcfg_lock();
where_or_or = "where";
- if (lag_interval) {
+ if (lag_interval)
+ {
dstring_init(&q2);
slon_mkquery(&q2, "where ev_timestamp < now() - '%s'::interval and (", \
lag_interval); where_or_or = dstring_data(&q2);
@@ -646,7 +651,8 @@
where_or_or = "or";
listat = listat->next;
}
- if (lag_interval) {
+ if (lag_interval)
+ {
slon_appendquery(&query, ")");
}
slon_appendquery(&query, " order by e.ev_origin, e.ev_seqno");
Index: sync_thread.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/sync_thread.c,v
retrieving revision 1.16
retrieving revision 1.17
diff -Lsrc/slon/sync_thread.c -Lsrc/slon/sync_thread.c -u -w -r1.16 -r1.17
--- src/slon/sync_thread.c
+++ src/slon/sync_thread.c
@@ -11,9 +11,6 @@
*/
-/* Note that in 1.1, generate_sync_event() is a stored procedure that
- does roughly the same thing as this... */
-
#include <pthread.h>
#include <stdio.h>
@@ -31,15 +28,16 @@
#include "slon.h"
-/*
- * ---------- Global variables ----------
+/* ----------
+ * Global variables
+ * ----------
*/
int sync_interval;
int sync_interval_timeout;
-/*
- * ---------- slon_localSyncThread
+/* ----------
+ * slon_localSyncThread
*
* Generate SYNC event if local database activity created new log info.
* ----------
Index: slon.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.h,v
retrieving revision 1.54
retrieving revision 1.55
diff -Lsrc/slon/slon.h -Lsrc/slon/slon.h -u -w -r1.54 -r1.55
--- src/slon/slon.h
+++ src/slon/slon.h
@@ -324,6 +324,7 @@
* ----------
*/
extern pid_t slon_pid;
+
#ifndef WIN32
extern pthread_mutex_t slon_watchdog_lock;
extern pid_t slon_watchdog_pid;
@@ -575,7 +576,6 @@
#define pipewrite(a,b,c) write(a,b,c)
#endif
-
#endif /* SLON_H_INCLUDED */
Index: runtime_config.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/runtime_config.c,v
retrieving revision 1.26
retrieving revision 1.27
diff -Lsrc/slon/runtime_config.c -Lsrc/slon/runtime_config.c -u -w -r1.26 -r1.27
--- src/slon/runtime_config.c
+++ src/slon/runtime_config.c
@@ -29,10 +29,12 @@
#include "slon.h"
-/*
- * ---------- Global data ----------
+/* ----------
+ * Global data
+ * ----------
*/
pid_t slon_pid;
+
#ifndef WIN32
pthread_mutex_t slon_watchdog_lock;
pid_t slon_watchdog_pid;
@@ -52,8 +54,9 @@
SlonNode *rtcfg_node_list_tail = NULL;
-/*
- * ---------- Local data ----------
+/* ----------
+ * Local data
+ * ----------
*/
static pthread_mutex_t config_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t cfgseq_lock = PTHREAD_MUTEX_INITIALIZER;
@@ -70,14 +73,16 @@
static struct to_activate *to_activate_tail = NULL;
-/*
- * ---------- Local functions ----------
+/* ----------
+ * Local functions
+ * ----------
*/
static void rtcfg_startStopNodeThread(SlonNode * node);
-/*
- * ---------- rtcfg_lock ----------
+/* ----------
+ * rtcfg_lock
+ * ----------
*/
void
rtcfg_lock(void)
@@ -86,8 +91,9 @@
}
-/*
- * ---------- rtcfg_unlock ----------
+/* ----------
+ * rtcfg_unlock
+ * ----------
*/
void
rtcfg_unlock(void)
@@ -96,8 +102,9 @@
}
-/*
- * ---------- rtcfg_storeNode ----------
+/* ----------
+ * rtcfg_storeNode
+ * ----------
*/
void
rtcfg_storeNode(int no_id, char *no_comment)
@@ -151,13 +158,14 @@
}
-/*
- * ---------- rtcfg_setNodeLastEvent()
+/* ----------
+ * rtcfg_setNodeLastEvent()
*
* Set the last_event field in the node runtime structure.
*
* Returns: 0 if the event_seq is <= the known value -1 if the node is
- * not known event_seq otherwise ----------
+ * not known event_seq otherwise
+ * ----------
*/
int64
rtcfg_setNodeLastEvent(int no_id, int64 event_seq)
@@ -189,10 +197,11 @@
}
-/*
- * ---------- rtcfg_getNodeLastEvent
+/* ----------
+ * rtcfg_getNodeLastEvent
*
- * Read the nodes last_event field ----------
+ * Read the nodes last_event field
+ * ----------
*/
int64
rtcfg_getNodeLastEvent(int no_id)
@@ -214,8 +223,9 @@
}
-/*
- * ---------- rtcfg_enableNode ----------
+/* ----------
+ * rtcfg_enableNode
+ * ----------
*/
void
rtcfg_enableNode(int no_id)
@@ -249,8 +259,9 @@
}
-/*
- * ---------- slon_disableNode ----------
+/* ----------
+ * slon_disableNode
+ * ----------
*/
void
rtcfg_disableNode(int no_id)
@@ -286,8 +297,9 @@
}
-/*
- * ---------- rtcfg_findNode ----------
+/* ----------
+ * rtcfg_findNode
+ * ----------
*/
SlonNode *
rtcfg_findNode(int no_id)
@@ -304,8 +316,9 @@
}
-/*
- * ---------- rtcfg_storePath ----------
+/* ----------
+ * rtcfg_storePath
+ * ----------
*/
void
rtcfg_storePath(int pa_server, char *pa_conninfo, int pa_connretry)
@@ -350,8 +363,9 @@
}
-/*
- * ---------- rtcfg_dropPath ----------
+/* ----------
+ * rtcfg_dropPath
+ * ----------
*/
void
rtcfg_dropPath(int pa_server)
@@ -406,8 +420,9 @@
}
-/*
- * ---------- rtcfg_storeListen ----------
+/* ----------
+ * rtcfg_storeListen
+ * ----------
*/
void
rtcfg_reloadListen(PGconn *db)
@@ -470,8 +485,9 @@
}
-/*
- * ---------- rtcfg_storeListen ----------
+/* ----------
+ * rtcfg_storeListen
+ * ----------
*/
void
rtcfg_storeListen(int li_origin, int li_provider)
@@ -535,8 +551,9 @@
}
-/*
- * ---------- rtcfg_dropListen ----------
+/* ----------
+ * rtcfg_dropListen
+ * ----------
*/
void
rtcfg_dropListen(int li_origin, int li_provider)
@@ -593,6 +610,10 @@
}
+/* ----------
+ * rtcfg_storeSet
+ * ----------
+ */
void
rtcfg_storeSet(int set_id, int set_origin, char *set_comment)
{
@@ -656,6 +677,10 @@
}
+/* ----------
+ * rtcfg_dropSet
+ * ----------
+ */
void
rtcfg_dropSet(int set_id)
{
@@ -690,6 +715,10 @@
rtcfg_unlock();
}
+/* ----------
+ * rtcfg_moveSet
+ * ----------
+ */
void
rtcfg_moveSet(int set_id, int old_origin, int new_origin, int sub_provider)
{
@@ -738,6 +767,10 @@
}
+/* ----------
+ * rtcfg_storeSubscribe
+ * ----------
+ */
void
rtcfg_storeSubscribe(int sub_set, int sub_provider, char *sub_forward)
{
@@ -783,6 +816,10 @@
}
+/* ----------
+ * rtcfg_enableSubscription
+ * ----------
+ */
void
rtcfg_enableSubscription(int sub_set, int sub_provider, char *sub_forward)
{
@@ -825,6 +862,10 @@
}
+/* ----------
+ * rtcfg_unsubscribeSet
+ * ----------
+ */
void
rtcfg_unsubscribeSet(int sub_set)
{
@@ -866,8 +907,9 @@
}
-/*
- * ---------- rtcfg_startStopNodeThread ----------
+/* ----------
+ * rtcfg_startStopNodeThread
+ * ----------
*/
static void
rtcfg_startStopNodeThread(SlonNode * node)
@@ -1011,6 +1053,10 @@
}
+/* ----------
+ * rtcfg_needActivate
+ * ----------
+ */
void
rtcfg_needActivate(int no_id)
{
@@ -1027,6 +1073,10 @@
}
+/* ----------
+ * rtcfg_doActivate
+ * ----------
+ */
void
rtcfg_doActivate(void)
{
@@ -1041,6 +1091,10 @@
}
+/* ----------
+ * rtcfg_joinAllRemoteThreads
+ * ----------
+ */
void
rtcfg_joinAllRemoteThreads(void)
{
@@ -1096,6 +1150,10 @@
}
+/* ----------
+ * rtcfg_seq_bump
+ * ----------
+ */
void
rtcfg_seq_bump(void)
{
@@ -1105,6 +1163,10 @@
}
+/* ----------
+ * rtcfg_seq_get
+ * ----------
+ */
int64
rtcfg_seq_get(void)
{
Index: misc.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/misc.c,v
retrieving revision 1.21
retrieving revision 1.22
diff -Lsrc/slon/misc.c -Lsrc/slon/misc.c -u -w -r1.21 -r1.22
--- src/slon/misc.c
+++ src/slon/misc.c
@@ -63,13 +63,16 @@
extern char *Syslog_ident;
static void write_syslog(int level, const char *line);
-
#else
#define Use_syslog 0
#endif /* HAVE_SYSLOG */
+/* ----------
+ * slon_log
+ * ----------
+ */
void
slon_log(Slon_Log_Level level, char *fmt,...)
{
@@ -196,11 +199,12 @@
}
-/*
+/* ----------
* scanint8 --- try to parse a string into an int8.
*
* If errorOK is false, ereport a useful error message if the string is bad. If
* errorOK is true, just return "false" for bad input.
+ * ----------
*/
int
slon_scanint64(char *str, int64 * result)
@@ -263,6 +267,10 @@
}
#if HAVE_SYSLOG
+/* ----------
+ * write_syslog
+ * ----------
+ */
static void
write_syslog(int level, const char *line)
{
@@ -365,3 +373,5 @@
}
#endif /* HAVE_SYSLOG */
+
+
Index: local_listen.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/local_listen.c,v
retrieving revision 1.34
retrieving revision 1.35
diff -Lsrc/slon/local_listen.c -Lsrc/slon/local_listen.c -u -w -r1.34 -r1.35
Index: cleanup_thread.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/cleanup_thread.c,v
retrieving revision 1.28
retrieving revision 1.29
diff -Lsrc/slon/cleanup_thread.c -Lsrc/slon/cleanup_thread.c -u -w -r1.28 -r1.29
--- src/slon/cleanup_thread.c
+++ src/slon/cleanup_thread.c
@@ -28,8 +28,9 @@
#include "slon.h"
-/*
- * ---------- Global data ----------
+/* ----------
+ * Global data
+ * ----------
*/
int vac_frequency = SLON_VACUUM_FREQUENCY;
static int vac_bias = 0;
@@ -49,15 +50,18 @@
"pg_catalog.pg_listener"
};
-#define MAX_VAC_TABLE 8 /* Add to this if additional tables are added \
above */ +#define MAX_VAC_TABLE 8 /* Add to this if additional tables are added
+ * above */
-static char tstring[255]; /* string used to store table names for the VACUUM \
statements */ +static char tstring[255]; /* string used to store table names for the
+ * VACUUM statements */
-/*
- * ---------- cleanupThread_main
+/* ----------
+ * cleanupThread_main
*
* Periodically calls the stored procedure to remove old events and log data and
- * vacuums those tables. ----------
+ * vacuums those tables.
+ * ----------
*/
void *
cleanupThread_main(void *dummy)
@@ -79,9 +83,12 @@
slon_log(SLON_DEBUG1, "cleanupThread: thread starts\n");
- /* Want the vacuum time bias to be between 0 and 100 seconds,
- * hence between 0 and 100000 */
- if (vac_bias == 0) {
+ /*
+ * Want the vacuum time bias to be between 0 and 100 seconds, hence
+ * between 0 and 100000
+ */
+ if (vac_bias == 0)
+ {
vac_bias = rand() % ( SLON_CLEANUP_SLEEP * 166 );
}
slon_log(SLON_DEBUG4, "cleanupThread: bias = %d\n", vac_bias);
@@ -111,10 +118,10 @@
/*
* Loop until shutdown time arrived
*
- * Note the introduction of vac_bias and an up-to-100s random
- * "fuzz"; this reduces the likelihood that having multiple
- * slons hitting the same cluster will run into conflicts due
- * to trying to vacuum pg_listener concurrently
+ * Note the introduction of vac_bias and an up-to-100s random "fuzz"; this
+ * reduces the likelihood that having multiple slons hitting the same
+ * cluster will run into conflicts due to trying to vacuum pg_listener
+ * concurrently
*/
while (sched_wait_time(conn, SCHED_WAIT_SOCK_READ, SLON_CLEANUP_SLEEP * 1000 + \
vac_bias + (rand() % (SLON_CLEANUP_SLEEP * 166))) == SCHED_STATUS_OK) {
@@ -205,23 +212,30 @@
if (vac_frequency != 0 && ++vac_count >= vac_frequency)
{
unsigned long latest_xid;
+
vac_count = 0;
latest_xid = get_earliest_xid(dbconn);
- if (earliest_xid != latest_xid) {
+ if (earliest_xid != latest_xid)
+ {
vacuum_action = "vacuum analyze";
- } else {
+ }
+ else
+ {
vacuum_action = "analyze";
- slon_log(SLON_DEBUG4, "cleanupThread: xid %d still active - analyze instead\n",
+ slon_log(SLON_DEBUG4,
+ "cleanupThread: xid %d still active - analyze instead\n",
earliest_xid);
}
earliest_xid = latest_xid;
+
/*
* Build the query string for vacuuming replication runtime data
* and event tables
*/
dstring_init(&query3);
gettimeofday(&tv_start, NULL);
- for (t=0; t < MAX_VAC_TABLE; t++) {
+ for (t = 0; t < MAX_VAC_TABLE; t++)
+ {
sprintf(tstring, table_list[t], rtcfg_namespace);
slon_mkquery(&query3,
"%s %s;",
@@ -235,8 +249,10 @@
"cleanupThread: \"%s\" - %s",
dstring_data(&query3), PQresultErrorMessage(res));
PQclear(res);
- /* slon_retry();
- break; */
+
+ /*
+ * slon_retry(); break;
+ */
}
}
PQclear(res);
@@ -271,22 +287,30 @@
pthread_exit(NULL);
}
-/* get_earliest_xid() reads the earliest XID that is still active.
- The idea is that if, between cleanupThread iterations, this XID has
- not changed, then an old transaction is still in progress,
- PostgreSQL is holding onto the tuples, and there is no value in
- doing VACUUMs of the various Slony-I tables.
+/* ----------
+ * get_earliest_xid()
+ *
+ * reads the earliest XID that is still active.
+ *
+ * The idea is that if, between cleanupThread iterations, this XID has
+ * not changed, then an old transaction is still in progress,
+ * PostgreSQL is holding onto the tuples, and there is no value in
+ * doing VACUUMs of the various Slony-I tables.
+ * ----------
*/
-
-static unsigned long get_earliest_xid (PGconn *dbconn) {
+static unsigned long
+get_earliest_xid(PGconn *dbconn)
+{
long long xid;
PGresult *res;
SlonDString query1;
+
dstring_init(&query1);
slon_mkquery(&query1, "select %s.getMinXid();", rtcfg_namespace);
res = PQexec(dbconn, dstring_data(&query1));
- if (PQresultStatus(res) != PGRES_TUPLES_OK) {
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
slon_log(SLON_FATAL, "cleanupThread: could not getMinXid()!\n");
PQclear(res);
slon_retry();
@@ -296,3 +320,5 @@
slon_log(SLON_DEBUG3, "cleanupThread: minxid: %d\n", xid);
return (unsigned long) xid;
}
+
+
Index: confoptions.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/confoptions.c,v
retrieving revision 1.14
retrieving revision 1.15
diff -Lsrc/slon/confoptions.c -Lsrc/slon/confoptions.c -u -w -r1.14 -r1.15
Index: dbutils.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/dbutils.c,v
retrieving revision 1.17
retrieving revision 1.18
diff -Lsrc/slon/dbutils.c -Lsrc/slon/dbutils.c -u -w -r1.17 -r1.18
--- src/slon/dbutils.c
+++ src/slon/dbutils.c
@@ -1,4 +1,4 @@
-/*-------------------------------------------------------------------------
+/* ----------------------------------------------------------------------
* dbutils.c
*
* Database utility functions for Slony-I
@@ -7,7 +7,7 @@
* Author: Jan Wieck, Afilias USA INC.
*
* $Id$
- *-------------------------------------------------------------------------
+ * ----------------------------------------------------------------------
*/
@@ -31,20 +31,21 @@
static int slon_appendquery_int(SlonDString * dsp, char *fmt, va_list ap);
-/*
+/* ----
* This mutex is used to wrap around PQconnectdb. There's a problem that
* occurs when your libpq is compiled with libkrb (kerberos) which is not
* threadsafe. It is especially odd because I'm not using kerberos.
*
* This is fixed in libpq in 8.0, but for now (and for older versions we'll just
* use this mutex.
- *
+ * ----
*/
static pthread_mutex_t slon_connect_lock = PTHREAD_MUTEX_INITIALIZER;
-/*
- * ---------- slon_connectdb ----------
+/* ----------
+ * slon_connectdb
+ * ----------
*/
SlonConn *
slon_connectdb(char *conninfo, char *symname)
@@ -103,8 +104,9 @@
}
-/*
- * ---------- slon_disconnectdb ----------
+/* ----------
+ * slon_disconnectdb
+ * ----------
*/
void
slon_disconnectdb(SlonConn * conn)
@@ -124,8 +126,9 @@
}
-/*
- * ---------- slon_make_dummyconn ----------
+/* ----------
+ * slon_make_dummyconn
+ * ----------
*/
SlonConn *
slon_make_dummyconn(char *symname)
@@ -155,8 +158,9 @@
}
-/*
- * ---------- slon_free_dummyconn ----------
+/* ----------
+ * slon_free_dummyconn
+ * ----------
*/
void
slon_free_dummyconn(SlonConn * conn)
@@ -179,10 +183,11 @@
}
-/*
- * ---------- db_getLocalNodeId
+/* ----------
+ * db_getLocalNodeId
*
- * Query a connection for the value of sequence sl_local_node_id ----------
+ * Query a connection for the value of sequence sl_local_node_id
+ * ----------
*/
int
db_getLocalNodeId(PGconn *conn)
@@ -323,13 +328,15 @@
}
-/*
- * ---------- slon_mkquery
+/* ----------
+ * slon_mkquery
*
* A simple query formatting and quoting function using dynamic string buffer
- * allocation. Similar to sprintf() it uses formatting symbols: %s
- * tring argument %q Quoted literal (\ and ' will be escaped) %d
- * nteger argument ----------
+ * allocation. Similar to sprintf() it uses formatting symbols:
+ * %s String argument
+ * %q Quoted literal (\ and ' will be escaped)
+ * %d Integer argument
+ * ----------
*/
int
slon_mkquery(SlonDString * dsp, char *fmt,...)
@@ -348,10 +355,11 @@
}
-/*
- * ---------- slon_appendquery
+/* ----------
+ * slon_appendquery
*
- * Append query string material to an existing dynamic string. ----------
+ * Append query string material to an existing dynamic string.
+ * ----------
*/
int
slon_appendquery(SlonDString * dsp, char *fmt,...)
@@ -368,10 +376,11 @@
}
-/*
- * ---------- slon_appendquery_int
+/* ----------
+ * slon_appendquery_int
*
- * Implementation of slon_mkquery() and slon_appendquery(). ----------
+ * Implementation of slon_mkquery() and slon_appendquery().
+ * ----------
*/
static int
slon_appendquery_int(SlonDString * dsp, char *fmt, va_list ap)
Index: slon.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.c,v
retrieving revision 1.60
retrieving revision 1.61
diff -Lsrc/slon/slon.c -Lsrc/slon/slon.c -u -w -r1.60 -r1.61
--- src/slon/slon.c
+++ src/slon/slon.c
@@ -36,8 +36,9 @@
#include "confoptions.h"
-/*
- * ---------- Global data ----------
+/* ----------
+ * Global data
+ * ----------
*/
#ifndef WIN32
#define SLON_WATCHDOG_NORMAL 0
@@ -51,8 +52,9 @@
pthread_mutex_t slon_wait_listen_lock;
pthread_cond_t slon_wait_listen_cond;
-/*
- * ---------- Local data ----------
+/* ----------
+ * Local data
+ * ----------
*/
static pthread_t local_event_thread;
static pthread_t local_cleanup_thread;
@@ -66,6 +68,7 @@
static char *const *main_argv;
static void SlonMain(void);
+
#ifndef WIN32
static void SlonWatchdog(void);
static void sighandler(int signo);
@@ -78,7 +81,12 @@
int child_status;
-void Usage(char * const argv[])
+/* ----------
+ * Usage
+ * ----------
+ */
+void
+Usage(char *const argv[])
{
fprintf(stderr, "usage: %s [options] clustername conninfo\n", argv[0]);
fprintf(stderr, "\n");
@@ -106,8 +114,9 @@
}
-/*
- * ---------- main ----------
+/* ----------
+ * main
+ * ----------
*/
int
main(int argc, char *const argv[])
@@ -296,11 +305,13 @@
}
#ifdef WIN32
+
/*
* Startup the network subsystem, in case our libpq doesn't
*/
err = WSAStartup(MAKEWORD(1, 1), &wsaData);
- if (err != 0) {
+ if (err != 0)
+ {
slon_log(SLON_FATAL, "main: Cannot start the network subsystem - %d\n", err);
exit(-1);
}
@@ -331,9 +342,11 @@
slon_exit(-1);
}
- /* There is no watchdog process on win32. We delegate restarting and
- * other such tasks to the Service Control Manager. And win32 doesn't
- * support signals, so we don't need to catch them... */
+ /*
+ * There is no watchdog process on win32. We delegate restarting and other
+ * such tasks to the Service Control Manager. And win32 doesn't support
+ * signals, so we don't need to catch them...
+ */
#ifndef WIN32
SlonWatchdog();
#else
@@ -343,11 +356,17 @@
}
-static void SlonMain(void)
+/* ----------
+ * SlonMain
+ * ----------
+ */
+static void
+SlonMain(void)
{
PGresult *res;
SlonDString query;
- int i,n;
+ int i,
+ n;
char pipe_c;
PGconn *startup_conn;
@@ -431,7 +450,6 @@
slon_log(SLON_FATAL, "main: SIGQUIT signal handler setup failed -(%d) %s\n", \
errno,strerror(errno)); slon_abort();
}
-
#endif
slon_log(SLON_DEBUG2, "main: main process started\n");
@@ -573,7 +591,8 @@
PQclear(res);
/*
- * Read configuration table sl_subscribe - only subscriptions for local node
+ * Read configuration table sl_subscribe - only subscriptions for local
+ * node
*/
slon_mkquery(&query,
"select sub_set, sub_provider, sub_forward, sub_active "
@@ -651,10 +670,10 @@
slon_log(SLON_CONFIG, "main: configuration complete - starting threads\n");
/*
- * Create the local event thread that monitors the local node
- * for administrative events to adjust the configuration at
- * runtime. We wait here until the local listen thread has
- * checked that there is no other slon daemon running.
+ * Create the local event thread that monitors the local node for
+ * administrative events to adjust the configuration at runtime. We wait
+ * here until the local listen thread has checked that there is no other
+ * slon daemon running.
*/
pthread_mutex_lock(&slon_wait_listen_lock);
if (pthread_create(&local_event_thread, NULL, localListenThread_main, NULL) < 0)
@@ -700,6 +719,7 @@
slon_retry();
}
#endif
+
/*
* Wait until the scheduler has shut down all remote connections
*/
@@ -746,9 +766,15 @@
}
#ifndef WIN32
-static void SlonWatchdog(void)
+/* ----------
+ * SlonWatchdog
+ * ----------
+ */
+static void
+SlonWatchdog(void)
{
pid_t pid;
+
#if !defined(CYGWIN) && !defined(WIN32)
struct sigaction act;
#endif
@@ -860,6 +886,10 @@
}
+/* ----------
+ * sighandler
+ * ----------
+ */
static void
sighandler(int signo)
{
@@ -900,6 +930,10 @@
}
}
+/* ----------
+ * slon_terminate_worker
+ * ----------
+ */
void
slon_terminate_worker()
{
@@ -916,6 +950,10 @@
}
#endif
+/* ----------
+ * slon_exit
+ * ----------
+ */
void
slon_exit(int code)
{
Index: slonik.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonik/slonik.h,v
retrieving revision 1.24
retrieving revision 1.25
diff -Lsrc/slonik/slonik.h -Lsrc/slonik/slonik.h -u -w -r1.24 -r1.25
--- src/slonik/slonik.h
+++ src/slonik/slonik.h
@@ -556,7 +556,6 @@
#ifdef HAVE_PQSETNOTICERECEIVER
void db_notice_recv(void *arg, const PGresult *res);
-
#else
void db_notice_recv(void *arg, const char *msg);
#endif
Index: dbutil.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonik/dbutil.c,v
retrieving revision 1.8
retrieving revision 1.9
diff -Lsrc/slonik/dbutil.c -Lsrc/slonik/dbutil.c -u -w -r1.8 -r1.9
--- src/slonik/dbutil.c
+++ src/slonik/dbutil.c
@@ -70,7 +70,6 @@
PQresultErrorMessage(res));
}
}
-
#else /* !HAVE_PQSETNOTICERECEIVER */
/* ----------
Index: slonik.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonik/slonik.c,v
retrieving revision 1.52
retrieving revision 1.53
diff -Lsrc/slonik/slonik.c -Lsrc/slonik/slonik.c -u -w -r1.52 -r1.53
--- src/slonik/slonik.c
+++ src/slonik/slonik.c
@@ -81,7 +81,8 @@
replace_token(char *resout, char *lines, const char *token, const char *replacement)
{
int numlines = 1;
- int i,o;
+ int i,
+ o;
char result_set[4096];
int toklen,
replen;
@@ -123,13 +124,15 @@
* expanded? If so there is more complete code available in the
* PostgreSQL backend that could be adapted.
*/
-char *get_sharepath(const char *path)
+char *
+get_sharepath(const char *path)
{
int i;
char *result;
result = (char *)malloc(MAX_PATH+1);
- if (!result) {
+ if (!result)
+ {
printf("memory allocation failure.\n");
exit(1);
}
@@ -1918,7 +1921,8 @@
#define ROWIDBITS "_Slony-I__rowID"
- if (strlen(stmt->script->clustername) + strlen("ROWIDBITS") > NAMEDATALEN) {
+ if (strlen(stmt->script->clustername) + strlen("ROWIDBITS") > NAMEDATALEN)
+ {
printf ("Cluster name %s too long to permit creation of columns containing %s - \
maximum name length: %d\n", stmt->script->clustername, ROWIDBITS, NAMEDATALEN);
return -1;
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic