[prev in list] [next in list] [prev in thread] [next in thread] 

List:       slony1-commit
Subject:    [Slony1-commit] By cbbrowne: Factored log shipping code that was
From:       cvsuser () gborg ! postgresql ! org (CVS User Account)
Date:       2005-02-25 20:59:07
Message-ID: 20050225205905.56436B1CE95 () gborg ! postgresql ! org
[Download RAW message or body]

Log Message:
-----------
Factored log shipping code that was integrated into the function sync_event() 
into separate functions so that they may be used for other events as well.

Further functional changes:

 - archive header contains comment lines to indicate where the "header"
   ends and the 'body' of queries begins.  That will be useful for splitting
   the logs apart so that you test-apply the header (to see if it's valid)
   before then applying the Whole Log.  Blindly applying the Whole Thing
   is a bad idea because it introduces a boatload of parsing work for any
   log that is applied in inappropriate order.

 - more error checking has been introduced

   The "archive log" processing functions check return codes from
   fprintf() and such, and if they encounter errors, this will abort
   processing of the SYNC.

   There's still more to do in this regard; close_log_archive() does not
   yet check rcs...

Modified Files:
--------------
    slony1-engine/src/slon:
        remote_worker.c (r1.74 -> r1.75)

-------------- next part --------------
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.74
retrieving revision 1.75
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.74 -r1.75
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -240,7 +240,18 @@
 		   WorkerGroupData * wd, SlonWorkMsg_event * event);
 static void *sync_helper(void *cdata);
 
-#define TERMINATE_QUERY_AND_ARCHIVE dstring_free(&query); if (archive_fp) { \
fclose(archive_fp); unlink(archive_tmp); } +static char archive_name[SLON_MAX_PATH];
+static char archive_tmp[SLON_MAX_PATH];
+static FILE *archive_fp = NULL;
+static int open_log_archive (int node_id, char *seqbuf);
+static void close_log_archive ();
+static void terminate_log_archive ();
+static int generate_archive_header (int node_id, char *seqbuf);
+static int submit_query_to_archive(SlonDString *ds);
+static int submit_string_to_archive (const char *s);
+static int logarchive_tracking (const char *namespace, int sub_set, const char \
*firstseq, const char *seqbuf); +
+#define TERMINATE_QUERY_AND_ARCHIVE dstring_free(&query); terminate_log_archive();
 
 /*
  * ---------- slon_remoteWorkerThread
@@ -2931,6 +2942,7 @@
 	int			num_errors;
 	WorkerGroupLine *wgline;
 	int			i;
+	int rc;
 	char		seqbuf [64];
 	struct timeval tv_start;
 	struct timeval tv_now;
@@ -2939,10 +2951,6 @@
 	SlonDString query;
 	SlonDString *provider_qual;
 
-	char		archive_name[SLON_MAX_PATH];
-	char		archive_tmp[SLON_MAX_PATH];
-	FILE	   *archive_fp = NULL;
-
 	gettimeofday(&tv_start, NULL);
 	slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: SYNC " INT64_FORMAT
 			 " processing\n",
@@ -2957,28 +2965,21 @@
 	 */
 	if (archive_dir)
 	{
-		int			i;
-
-		sprintf(archive_name, "%s/slony1_log_%d_", archive_dir, node->no_id);
-		for (i = strlen(seqbuf); i < 20; i++)
-			strcat(archive_name, "0");
-		strcat(archive_name, seqbuf);
-		strcat(archive_name, ".sql");
-		strcpy(archive_tmp, archive_name);
-		strcat(archive_tmp, ".tmp");
-
-		if ((archive_fp = fopen(archive_tmp, "w")) == NULL)
-		{
+	  rc = open_log_archive(node->no_id, seqbuf);
+	  if (rc == -1) {
 			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
 						"Cannot open archive file %s - %s\n",
 						node->no_id, archive_tmp, strerror(errno));
 			dstring_free(&query);
 			return 60;
 		}
-		fprintf(archive_fp, "-- Slony-I sync log\n"
-				"-- Event %d,%s\n"
-				"start transaction;\n",
-				node->no_id, seqbuf);
+	  rc = generate_archive_header(node->no_id, seqbuf);
+	  if (rc < 0) {
+	    slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+		     "Cannot write to archive file %s - %s",
+		     node->no_id, archive_tmp, strerror(errno));
+	    return 60;
+	  }
 	}
 
 	/*
@@ -3286,11 +3287,16 @@
 			 * the archive log. This function ensures that all
 			 * archive log files are applied in the right order.
 			 */
-			if (archive_fp)
+			if (archive_dir)
 			{
-				fprintf(archive_fp, "select %s.setsyncTracking_offline(%d, '%s', '%s');\n",
-						rtcfg_namespace,
-						sub_set, PQgetvalue(res1, tupno1, 1), seqbuf);
+			  rc = logarchive_tracking(rtcfg_namespace, sub_set, 
+						   PQgetvalue(res1, tupno1, 1), seqbuf);
+			  if (rc < 0) {
+			    slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+				     "Cannot write to archive file %s - %s",
+				     node->no_id, archive_tmp, strerror(errno));
+			    return 60;
+			  }
 			}
 		}
 		PQclear(res1);
@@ -3321,12 +3327,8 @@
 				 "no sets need syncing for this event\n",
 				 node->no_id);
 		dstring_free(&query);
-		if (archive_fp)
-		{
-			fprintf(archive_fp, "commit;\n");
-			fclose(archive_fp);
-			rename(archive_tmp, archive_name);
-		}
+		if (archive_dir)
+		  close_log_archive();
 		return 0;
 	}
 
@@ -3441,11 +3443,16 @@
 					 * Add the user data modification part to
 					 * the archive log.
 					 */
-					if (archive_fp)
-					{
-						fprintf(archive_fp, "%s", dstring_data(&(wgline->data)));
+					if (archive_dir) {
+					  rc = submit_string_to_archive(dstring_data(&(wgline->data)));
+					  /* rc = fprintf(archive_fp, "%s", dstring_data(&(wgline->data))); */
+					  if (rc < 0) {
+					    slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+						     "Cannot write to archive file %s - %s",
+						     node->no_id, archive_tmp, strerror(errno));
+					    return 60;
+					  }
 					}
-
 					break;
 
 				case SLON_WGLC_DONE:
@@ -3587,13 +3594,19 @@
 			/*
 			 * Add the sequence number adjust call to the archive log.
 			 */
-			if (archive_fp)
+			if (archive_dir)
 			{
 				slon_mkquery(&query,
 						 "select %s.sequenceSetValue_offline(%s,'%s');\n",
 						 rtcfg_namespace,
 						 seql_seqid, seql_last_value);
-				fprintf(archive_fp, dstring_data(&query));
+			  rc = submit_query_to_archive(&query);
+			  if (rc < 0) {
+			    slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+				     "Cannot write to archive file %s - %s",
+				     node->no_id, archive_tmp, strerror(errno));
+			    return 60;
+			  }
 			}
 		}
 		PQclear(res1);
@@ -3716,11 +3729,9 @@
 	 * Add the final commit to the archive log, close it and rename
 	 * the temporary file to the real log chunk filename.
 	 */
-	if (archive_fp)
+	if (archive_dir)
 	{
-		fprintf(archive_fp, "commit;\n");
-		fclose(archive_fp);
-		rename(archive_tmp, archive_name);
+	  close_log_archive();
 	}
 
 	/*
@@ -4014,7 +4025,7 @@
 				pthread_mutex_unlock(&(wd->workdata_lock));
 
 				slon_log(SLON_DEBUG3,
-					   "remoteHelperThread_%d_%d: %d log buffers deliverd\n",
+					   "remoteHelperThread_%d_%d: %d log buffers delivered\n",
 						 node->no_id, provider->no_id, line_no);
 
 				if (line_no < alloc_lines)
@@ -4113,6 +4124,83 @@
 	}
 }
 
+/* Functions for processing log archives...
+
+   - First, you open the log archive using open_log_archive()
+
+   - Second, you generate the header using generate_archive_header()
+
+   - Third, you need to set up the sync tracking function in the log
+     using logarchive_tracking()
+
+   =============  Here Ends The Header of the Log Shipping Archive \
================== +
+   Then come the various queries (inserts/deletes/updates) that
+   comprise the "body" of the SYNC.  Probably submitted using
+   submit_query_to_archive().
+
+   =============  Here Ends The Body of the Log Shipping Archive ==================
+
+   Finally, the log ends, notably with a COMMIT statement, generated
+   using close_log_archive(), which closes the file and renames it
+   from ".tmp" form to the final name.
+*/
+   
+
+/* Stores the archive name in archive_name (as .sql name) and archive_tmp (.tmp \
file) */ +int open_log_archive (int node_id, char *seqbuf) {
+  int i;
+  sprintf(archive_name, "%s/slony1_log_%d_", archive_dir, node_id);
+  for (i = strlen(seqbuf); i < 20; i++)
+    strcat(archive_name, "0");
+  strcat(archive_name, seqbuf);
+  strcat(archive_name, ".sql");
+  strcpy(archive_tmp, archive_name);
+  strcat(archive_tmp, ".tmp");
+  archive_fp = fopen(archive_tmp, "w");
+  if (archive_fp == NULL) {
+    return -1;
+  } else {
+    return 0;
+  }
+}
+
+void close_log_archive () {
+  fprintf(archive_fp, \
"\n------------------------------------------------------------------\n-- End Of \
Archive Log\n------------------------------------------------------------------\ncommit;\n");
 +  fclose(archive_fp);
+  rename(archive_tmp, archive_name);
+}
+
+int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, \
const char *seqbuf) { +  return fprintf(archive_fp, "\nselect \
%s.setsyncTracking_offline(%d, '%s', '%s');\n-- end of log archiving \
header\n------------------------------------------------------------------\n-- start \
of Slony-I data\n------------------------------------------------------------------\n",
 +		 namespace, sub_set, firstseq, seqbuf);
+}
+
+int submit_query_to_archive(SlonDString *ds) {
+  return fprintf(archive_fp, "%s\n", *ds->data);
+}
+
+int submit_string_to_archive (const char *s) {
+  return fprintf(archive_fp, "%s\n", s);
+}
+
+void terminate_log_archive () {
+  if (archive_fp) { 
+    fclose(archive_fp); 
+  }
+}
+
+int generate_archive_header (int node_id, char *seqbuf) {
+  time_t now;
+  now = time(NULL);
+  return fprintf(archive_fp, 
+	  "-- Slony-I sync log\n"
+	  "-- Node %d, Event %s\n"
+	  "-- at... %s\n"
+	  "start transaction;\n",
+	  node_id, seqbuf, ctime(&now));
+}
+
 /*
  * Local Variables:
  *	tab-width: 4


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic