[prev in list] [next in list] [prev in thread] [next in thread] 

List:       cyrus-devel
Subject:    [RFC] multiplexing cyrus replication with log/log-run sharding & multiple sync_client
From:       Thomas Cataldo <thomas.cataldo () bluemind ! net>
Date:       2019-11-21 10:50:46
Message-ID: 62498D72-A5C0-4D9C-B675-3E263B6C66B7 () bluemind ! net
[Download RAW message or body]

Hi,

In our workload, cyrus replication latency is pretty critical as we serve most read \
requests from the replica. Having a single network channel between master & replica \
is a big issue for us.

Trying to improve our latency, we implemented the following approach : instead of \
writing "channel/log" we write "channel/log.<shard_index>". We compute our shard key \
this way :

# cat log.0 
APPEND devenv.blue!user.tom.Sent
MAILBOX devenv.blue!user.tom.Sent

# cat log.2 
SEEN tom@devenv.blue 9f799278-a6cd-45b7-9546-0e861d5e15d6
root@bm1804:/var/lib/cyrus/sync/core# cat log.3 
…
APPEND devenv.blue!user.sga
MAILBOX devenv.blue!user.sga

We compute an hashcode of the first argument. We normalize it so \
devenv.blue!user.tom.Sent and devenv.blue!user.tom have the same hashcode then we \
"hashcode % shard_count" to figure out which log file to use. We patched sync_client \
to add a "-i <shard_index>". sync_client -i 0 will process log.0 and use log-run.0, \
etc.

We don't spawn sync_client from cyrus.conf but we prefer systemd tricks :

/lib/systemd/system/bm-cyrus-syncclient@.service which is a template and we then \
enable : systemctl enable bm-cyrus-syncclient@{0..3} to spawn 4 sync_client.


Attached diff of what we changed. 

As a side note, our usage forbids moving a mailbox folder into another mailbox (ie. \
moving user.tom.titi into user.sga.stuff is forbidden in our setup). I guess this \
approach would be problematic we moving a mailbox subfolder to another mailbox as \
they might be sharded to separate log files.

Any feedback on this approach ? I read that you planned to turn sync_client into a \
sync daemon. Any schedule estimate on that ?

Regards,
Thomas.


sync_client systemd configuration template :
/lib/systemd/system/bm-cyrus-syncclient@.service (%i is expanded to 42 by systemd \
when you enable syncclient@42) [Unit]
Description=BlueMind Cyrus sync_client service
After=bm-cyrus-imapd.service
PartOf=bm-cyrus-imapd.service
ConditionPathExists=!/etc/bm/bm-cyrus-imapd.disabled

[Service]
Type=forking
Environment=CONF=/etc/imapd.conf
ExecStartPre=/usr/bin/find /var/lib/cyrus/sync -name ‘log*.%i' -type f -exec rm -f \
{} \; ExecStart=/usr/sbin/sync_client -C $CONF -t 1800 -n core -i %i -l -r
SuccessExitStatus=75
RemainAfterExit=no
Restart=always
RestartSec=5s
TimeoutStopSec=20s

[Install]
WantedBy=bm-cyrus-imapd.service





Thomas Cataldo
Directeur Technique

(+33) 6 42 25 91 38

BlueMind
+33 (0)5 81 91 55 60
Hotel des Télécoms, 40 rue du village d'entreprises
31670 Labège, France
www.bluemind.net / https://blog.bluemind.net/fr/


["replication_multiplexing.diff" (replication_multiplexing.diff)]

commit 99c0965ffe737f1ccb4f8f10d584cbc63cfa8cdf
Author: Thomas Cataldo <thomas.cataldo@blue-mind.net>
Date:   Mon Nov 18 18:55:54 2019 +0100

    [replication] FEATBL-971 Feat: shard cyrus replication using a hashcode on mailbox names

diff --git a/imap/cyr_synclog.c b/imap/cyr_synclog.c
index c1e151cbf..b0e50e36d 100644
--- a/imap/cyr_synclog.c
+++ b/imap/cyr_synclog.c
@@ -82,6 +82,7 @@ int main(int argc, char *argv[])
     char *alt_config = NULL;
     char cmd = '\0';
     int opt;
+    int shard = "dummy";
 
     if ((geteuid()) == 0 && (become_cyrus(/*is_master*/0) != 0)) {
         fatal("must run as the Cyrus user", EC_USAGE);
@@ -173,7 +174,7 @@ int main(int argc, char *argv[])
             break;
         default:
             /* just as is! */
-            sync_log(argv[optind]);
+            sync_log("dummy", argv[optind]);
             break;
     }
 
diff --git a/imap/sync_client.c b/imap/sync_client.c
index 882928026..94425c569 100644
--- a/imap/sync_client.c
+++ b/imap/sync_client.c
@@ -584,7 +584,7 @@ enum {
     RESTART_RECONNECT
 };
 
-static int do_daemon_work(const char *channel, const char *sync_shutdown_file,
+static int do_daemon_work(const char *channel, int shard, const char *sync_shutdown_file,
                    unsigned long timeout, unsigned long min_delta,
                    int *restartp)
 {
@@ -596,7 +596,7 @@ static int do_daemon_work(const char *channel, const char *sync_shutdown_file,
     sync_log_reader_t *slr;
 
     *restartp = RESTART_NONE;
-    slr = sync_log_reader_create_with_channel(channel);
+    slr = sync_log_reader_create_with_channel(channel, shard);
 
     session_start = time(NULL);
 
@@ -776,7 +776,7 @@ static void replica_disconnect(void)
     backend_disconnect(sync_backend);
 }
 
-static void do_daemon(const char *channel, const char *sync_shutdown_file,
+static void do_daemon(const char *channel, int shard, const char *sync_shutdown_file,
                       unsigned long timeout, unsigned long min_delta)
 {
     int r = 0;
@@ -786,7 +786,7 @@ static void do_daemon(const char *channel, const char *sync_shutdown_file,
 
     while (restart) {
         replica_connect(channel);
-        r = do_daemon_work(channel, sync_shutdown_file,
+        r = do_daemon_work(channel, shard, sync_shutdown_file,
                            timeout, min_delta, &restart);
         if (r) {
             /* See if we're still connected to the server.
@@ -883,6 +883,7 @@ int main(int argc, char **argv)
     int   mode = MODE_UNKNOWN;
     int   wait     = 0;
     int   timeout  = 600;
+    int   shard = 0;
     int   min_delta = 0;
     const char *channel = NULL;
     const char *sync_shutdown_file = NULL;
@@ -898,7 +899,7 @@ int main(int argc, char **argv)
 
     setbuf(stdout, NULL);
 
-    while ((opt = getopt(argc, argv, "C:vlLS:F:f:w:t:d:n:rRumsozOAp:")) != EOF) {
+    while ((opt = getopt(argc, argv, "C:vlLS:F:f:w:t:i:d:n:rRumsozOAp:")) != EOF) {
         switch (opt) {
         case 'C': /* alt config file */
             alt_config = optarg;
@@ -941,6 +942,10 @@ int main(int argc, char **argv)
             wait = atoi(optarg);
             break;
 
+        case 'i': /* shard index */
+            shard = atoi(optarg);
+            break;
+
         case 't':
             timeout = atoi(optarg);
             break;
@@ -1201,7 +1206,8 @@ int main(int argc, char **argv)
             if (!min_delta)
                 min_delta = sync_get_intconfig(channel, "sync_repeat_interval");
 
-            do_daemon(channel, sync_shutdown_file, timeout, min_delta);
+            syslog(LOG_INFO, "Running in daemon mode for channel %s shard-index %d", channel, shard);
+            do_daemon(channel, shard, sync_shutdown_file, timeout, min_delta);
         }
 
         break;
diff --git a/imap/sync_log.c b/imap/sync_log.c
index 4e4764be1..166044ccc 100644
--- a/imap/sync_log.c
+++ b/imap/sync_log.c
@@ -75,17 +75,21 @@ static int sync_log_suppressed = 0;
 static strarray_t *channels = NULL;
 static strarray_t *unsuppressable = NULL;
 
+static int shards = 4;
+
 EXPORTED void sync_log_init(void)
 {
     const char *conf;
     int i;
-
+    int shards;
 
     /* sync_log_init() may be called more than once */
-    if (channels) strarray_free(channels);
+    if (channels)
+        strarray_free(channels);
 
     conf = config_getstring(IMAPOPT_SYNC_LOG_CHANNELS);
-    if (!conf) conf = "\"\"";
+    if (!conf)
+        conf = "\"\"";
     channels = strarray_split(conf, " ", 0);
     /*
      * The sysadmin can specify "" in the value of sync_log_channels to
@@ -102,6 +106,8 @@ EXPORTED void sync_log_init(void)
     conf = config_getstring(IMAPOPT_SYNC_LOG_UNSUPPRESSABLE_CHANNELS);
     if (conf)
         unsuppressable = strarray_split(conf, " ", 0);
+
+    shards = config_getint(IMAPOPT_SYNC_LOG_SHARDS);
 }
 
 EXPORTED void sync_log_suppress(void)
@@ -118,16 +124,66 @@ EXPORTED void sync_log_done(void)
     unsuppressable = NULL;
 }
 
-static char *sync_log_fname(const char *channel)
+static int hashcode(const char *shard)
+{
+    int len = strlen(shard);
+    int hash = 0;
+    int i;
+    for (i = 0; i < len; i++)
+    {
+        hash = 31 * hash + shard[i];
+    }
+    return hash;
+}
+
+static char *normalize_shard(const char *shard)
+{
+    int suffix_cut = 0;
+    int total_len = strlen(shard);
+    char *mark = strrchr(shard, '!');
+    if (mark)
+    {
+        if (strncmp("user.", mark + 1, 5) == 0)
+        {
+            // prefixed by user.
+            char *sec_dot = strchr(mark + 6, '.');
+            if (sec_dot != NULL)
+            {
+                suffix_cut = strlen(sec_dot);
+            }
+        }
+        else
+        {
+            // not prefixed by user. aka mailshare
+            char *first_dot = strchr(mark + 1, '.');
+            if (first_dot)
+            {
+                suffix_cut = strlen(first_dot);
+            }
+        }
+    }
+    return strndup(shard, total_len - suffix_cut);
+}
+
+static int shard_index(const char* shard) {
+    char* norm = normalize_shard(shard);
+    int ret = abs(hashcode(norm) % shards);
+    free(norm);
+    return ret;
+}
+
+
+
+static char *sync_log_fname(const char *channel, int shard)
 {
     static char buf[MAX_MAILBOX_PATH];
 
     if (channel)
         snprintf(buf, MAX_MAILBOX_PATH,
-                 "%s/sync/%s/log", config_dir, channel);
+                 "%s/sync/%s/log.%d", config_dir, channel, shard);
     else
         snprintf(buf, MAX_MAILBOX_PATH,
-                 "%s/sync/log", config_dir);
+                 "%s/sync/log.%d", config_dir, shard);
 
     return buf;
 }
@@ -135,37 +191,42 @@ static char *sync_log_fname(const char *channel)
 static int sync_log_enabled(const char *channel)
 {
     if (!config_getswitch(IMAPOPT_SYNC_LOG))
-        return 0;       /* entire mechanism is disabled */
+        return 0; /* entire mechanism is disabled */
     if (!sync_log_suppressed)
-        return 1;       /* _suppress() wasn't called */
+        return 1; /* _suppress() wasn't called */
     if (unsuppressable && strarray_find(unsuppressable, channel, 0) >= 0)
-        return 1;       /* channel is unsuppressable */
-    return 0;           /* suppressed */
+        return 1; /* channel is unsuppressable */
+    return 0;     /* suppressed */
 }
 
-static void sync_log_base(const char *channel, const char *string)
+static void sync_log_base(const char *channel, int shard, const char *string)
 {
     int fd;
     struct stat sbuffile, sbuffd;
     int retries = 0;
     const char *fname;
 
-    fname = sync_log_fname(channel);
+    fname = sync_log_fname(channel, shard);
 
-    while (retries++ < SYNC_LOG_RETRIES) {
-        fd = open(fname, O_WRONLY|O_APPEND|O_CREAT, 0640);
-        if (fd < 0 && errno == ENOENT) {
-            if (!cyrus_mkdir(fname, 0755)) {
-                fd = open(fname, O_WRONLY|O_APPEND|O_CREAT, 0640);
+    while (retries++ < SYNC_LOG_RETRIES)
+    {
+        fd = open(fname, O_WRONLY | O_APPEND | O_CREAT, 0640);
+        if (fd < 0 && errno == ENOENT)
+        {
+            if (!cyrus_mkdir(fname, 0755))
+            {
+                fd = open(fname, O_WRONLY | O_APPEND | O_CREAT, 0640);
             }
         }
-        if (fd < 0) {
+        if (fd < 0)
+        {
             syslog(LOG_ERR, "sync_log(): Unable to write to log file %s: %s",
                    fname, strerror(errno));
             return;
         }
 
-        if (lock_blocking(fd, fname) == -1) {
+        if (lock_blocking(fd, fname) == -1)
+        {
             syslog(LOG_ERR, "sync_log(): Failed to lock %s for %s: %m",
                    fname, string);
             xclose(fd);
@@ -181,7 +242,8 @@ static void sync_log_base(const char *channel, const char *string)
         lock_unlock(fd, fname);
         xclose(fd);
     }
-    if (retries >= SYNC_LOG_RETRIES) {
+    if (retries >= SYNC_LOG_RETRIES)
+    {
         xclose(fd);
         syslog(LOG_ERR,
                "sync_log(): Failed to lock %s for %s after %d attempts",
@@ -200,7 +262,7 @@ static void sync_log_base(const char *channel, const char *string)
 
 static const char *sync_quote_name(const char *name)
 {
-    static char buf[MAX_MAILBOX_BUFFER+3]; /* "x2 plus \0 */
+    static char buf[MAX_MAILBOX_BUFFER + 3]; /* "x2 plus \0 */
     char c;
     int src;
     int dst = 0;
@@ -210,24 +272,28 @@ static const char *sync_quote_name(const char *name)
     buf[dst++] = '"';
 
     /* degenerate case - no name is the empty string, quote it */
-    if (!name || !*name) {
+    if (!name || !*name)
+    {
         need_quote = 1;
         goto end;
     }
 
-    for (src = 0; name[src]; src++) {
+    for (src = 0; name[src]; src++)
+    {
         c = name[src];
         if ((c == '\r') || (c == '\n'))
             fatal("Illegal line break in folder name", EC_IOERR);
 
         /* quoteable characters */
-        if ((c == '\\') || (c == '\"') || (c == '{') || (c == '}')) {
+        if ((c == '\\') || (c == '\"') || (c == '{') || (c == '}'))
+        {
             need_quote = 1;
             buf[dst++] = '\\';
         }
 
         /* non-atom characters */
-        else if ((c == ' ') || (c == '\t') || (c == '(') || (c == ')')) {
+        else if ((c == ' ') || (c == '\t') || (c == '(') || (c == ')'))
+        {
             need_quote = 1;
         }
 
@@ -238,12 +304,14 @@ static const char *sync_quote_name(const char *name)
     }
 
 end:
-    if (need_quote) {
+    if (need_quote)
+    {
         buf[dst++] = '\"';
         buf[dst] = '\0';
         return buf;
     }
-    else {
+    else
+    {
         buf[dst] = '\0';
         return buf + 1; /* skip initial quote */
     }
@@ -253,26 +321,29 @@ end:
 
 static char *va_format(const char *fmt, va_list ap)
 {
-    static char buf[BUFSIZE+1];
+    static char buf[BUFSIZE + 1];
     size_t len;
     int ival;
     const char *sval;
     const char *p;
 
-    for (len = 0, p = fmt; *p && len < BUFSIZE; p++) {
-        if (*p != '%') {
+    for (len = 0, p = fmt; *p && len < BUFSIZE; p++)
+    {
+        if (*p != '%')
+        {
             buf[len++] = *p;
             continue;
         }
-        switch (*++p) {
+        switch (*++p)
+        {
         case 'd':
             ival = va_arg(ap, int);
-            len += snprintf(buf+len, BUFSIZE-len, "%d", ival);
+            len += snprintf(buf + len, BUFSIZE - len, "%d", ival);
             break;
         case 's':
             sval = va_arg(ap, const char *);
             sval = sync_quote_name(sval);
-            strlcpy(buf+len, sval, BUFSIZE-len);
+            strlcpy(buf + len, sval, BUFSIZE - len);
             len += strlen(sval);
             break;
         default:
@@ -281,32 +352,35 @@ static char *va_format(const char *fmt, va_list ap)
         }
     }
 
-    if (buf[len-1] != '\n') buf[len++] = '\n';
+    if (buf[len - 1] != '\n')
+        buf[len++] = '\n';
     buf[len] = '\0';
 
     return buf;
 }
 
-EXPORTED void sync_log(const char *fmt, ...)
+EXPORTED void sync_log(const char *shard, const char *fmt, ...)
 {
     va_list ap;
     const char *val;
     int i;
 
-    if (!channels) return;
+    if (!channels)
+        return;
 
     va_start(ap, fmt);
     val = va_format(fmt, ap);
     va_end(ap);
 
-    for (i = 0 ; i < channels->count ; i++) {
+    for (i = 0; i < channels->count; i++)
+    {
         const char *channel = channels->data[i];
         if (sync_log_enabled(channel))
-            sync_log_base(channel, val);
+	  sync_log_base(channel, shard_index(shard), val);
     }
 }
 
-EXPORTED void sync_log_channel(const char *channel, const char *fmt, ...)
+EXPORTED void sync_log_channel(const char *shard, const char *channel, const char *fmt, ...)
 {
     va_list ap;
     const char *val;
@@ -315,7 +389,7 @@ EXPORTED void sync_log_channel(const char *channel, const char *fmt, ...)
     val = va_format(fmt, ap);
     va_end(ap);
 
-    sync_log_base(channel, val);
+    sync_log_base(channel, shard_index(shard), val);
 }
 
 /*
@@ -368,16 +442,16 @@ static sync_log_reader_t *sync_log_reader_alloc(void)
  * Returns a new object which must be freed with sync_log_reader_free().
  * Does not return NULL.
  */
-EXPORTED sync_log_reader_t *sync_log_reader_create_with_channel(const char *channel)
+EXPORTED sync_log_reader_t *sync_log_reader_create_with_channel(const char *channel, int shard)
 {
     sync_log_reader_t *slr = sync_log_reader_alloc();
     struct buf buf = BUF_INITIALIZER;
 
-    slr->log_file = xstrdup(sync_log_fname(channel));
+    slr->log_file = xstrdup(sync_log_fname(channel, shard));
 
     /* Create a work log filename.  We will process this
      * first if it exists */
-    buf_printf(&buf, "%s-run", slr->log_file);
+    buf_printf(&buf, "%s-run.%d", slr->log_file, shard);
     slr->work_file = buf_release(&buf);
 
     return slr;
@@ -416,9 +490,12 @@ EXPORTED sync_log_reader_t *sync_log_reader_create_with_fd(int fd)
  */
 EXPORTED void sync_log_reader_free(sync_log_reader_t *slr)
 {
-    if (!slr) return;
-    if (slr->input) prot_free(slr->input);
-    if (slr->fd_is_ours && slr->fd >= 0) close(slr->fd);
+    if (!slr)
+        return;
+    if (slr->input)
+        prot_free(slr->input);
+    if (slr->fd_is_ours && slr->fd >= 0)
+        close(slr->fd);
     free(slr->log_file);
     free(slr->work_file);
     buf_free(&slr->type);
@@ -446,47 +523,57 @@ EXPORTED int sync_log_reader_begin(sync_log_reader_t *slr)
     struct stat sbuf;
     int r;
 
-    if (slr->input) {
+    if (slr->input)
+    {
         r = sync_log_reader_end(slr);
-        if (r) return r;
+        if (r)
+            return r;
     }
 
-    if (stat(slr->work_file, &sbuf) == 0) {
+    if (stat(slr->work_file, &sbuf) == 0)
+    {
         /* Existing work log file - process this first */
         syslog(LOG_NOTICE,
                "Reprocessing sync log file %s", slr->work_file);
     }
-    else if (!slr->log_file) {
+    else if (!slr->log_file)
+    {
         syslog(LOG_ERR, "Failed to stat %s: %m",
                slr->log_file);
         return IMAP_IOERROR;
     }
-    else {
+    else
+    {
         /* Check for sync_log file */
-        if (stat(slr->log_file, &sbuf) < 0) {
+        if (stat(slr->log_file, &sbuf) < 0)
+        {
             if (errno == ENOENT)
-                return IMAP_AGAIN;  /* no problem, try again later */
+                return IMAP_AGAIN; /* no problem, try again later */
             syslog(LOG_ERR, "Failed to stat %s: %m",
                    slr->log_file);
             return IMAP_IOERROR;
         }
 
         /* Move sync_log to our work file */
-        if (rename(slr->log_file, slr->work_file) < 0) {
+        if (rename(slr->log_file, slr->work_file) < 0)
+        {
             syslog(LOG_ERR, "Rename %s -> %s failed: %m",
                    slr->log_file, slr->work_file);
             return IMAP_IOERROR;
         }
     }
 
-    if (slr->fd < 0) {
+    if (slr->fd < 0)
+    {
         int fd = open(slr->work_file, O_RDWR, 0);
-        if (fd < 0) {
+        if (fd < 0)
+        {
             syslog(LOG_ERR, "Failed to open %s: %m", slr->work_file);
             return IMAP_IOERROR;
         }
 
-        if (lock_blocking(fd, slr->work_file) < 0) {
+        if (lock_blocking(fd, slr->work_file) < 0)
+        {
             syslog(LOG_ERR, "Failed to lock %s: %m", slr->work_file);
             close(fd);
             return IMAP_IOERROR;
@@ -502,7 +589,7 @@ EXPORTED int sync_log_reader_begin(sync_log_reader_t *slr)
         lock_unlock(slr->fd, slr->work_file);
     }
 
-    slr->input = prot_new(slr->fd, /*write*/0);
+    slr->input = prot_new(slr->fd, /*write*/ 0);
 
     return 0;
 }
@@ -522,23 +609,27 @@ EXPORTED int sync_log_reader_end(sync_log_reader_t *slr)
     if (!slr->input)
         return 0;
 
-    if (slr->input) {
+    if (slr->input)
+    {
         prot_free(slr->input);
         slr->input = NULL;
     }
 
-    if (slr->fd_is_ours && slr->fd >= 0) {
+    if (slr->fd_is_ours && slr->fd >= 0)
+    {
         lock_unlock(slr->fd, slr->work_file);
         close(slr->fd);
         slr->fd = -1;
     }
 
-    if (slr->log_file) {
+    if (slr->log_file)
+    {
         /* We were initialised with a sync log channel, whose
          * log file we rename()d to the work file.  Now that
          * we've done with the work file we can unlink it.
          * Further checks at this point are just paranoia. */
-        if (slr->work_file && unlink(slr->work_file) < 0) {
+        if (slr->work_file && unlink(slr->work_file) < 0)
+        {
             syslog(LOG_ERR, "Unlink %s failed: %m", slr->work_file);
             return IMAP_IOERROR;
         }
@@ -566,32 +657,40 @@ EXPORTED int sync_log_reader_getitem(sync_log_reader_t *slr,
     if (!slr->input)
         return EOF;
 
-    for (;;) {
+    for (;;)
+    {
         if ((c = getword(slr->input, &slr->type)) == EOF)
             return EOF;
 
         /* Ignore blank lines */
-        if (c == '\r') c = prot_getc(slr->input);
+        if (c == '\r')
+            c = prot_getc(slr->input);
         if (c == '\n')
             continue;
 
-        if (c != ' ') {
+        if (c != ' ')
+        {
             syslog(LOG_ERR, "Invalid input");
             eatline(slr->input, c);
             continue;
         }
 
-        if ((c = getastring(slr->input, 0, &slr->arg1)) == EOF) return EOF;
+        if ((c = getastring(slr->input, 0, &slr->arg1)) == EOF)
+            return EOF;
         arg1s = slr->arg1.s;
 
         arg2s = NULL;
-        if (c == ' ') {
-            if ((c = getastring(slr->input, 0, &slr->arg2)) == EOF) return EOF;
+        if (c == ' ')
+        {
+            if ((c = getastring(slr->input, 0, &slr->arg2)) == EOF)
+                return EOF;
             arg2s = slr->arg2.s;
         }
 
-        if (c == '\r') c = prot_getc(slr->input);
-        if (c != '\n') {
+        if (c == '\r')
+            c = prot_getc(slr->input);
+        if (c != '\n')
+        {
             syslog(LOG_ERR, "Garbage at end of input line");
             eatline(slr->input, c);
             continue;
diff --git a/imap/sync_log.h b/imap/sync_log.h
index 0dcc0a203..70a8be507 100644
--- a/imap/sync_log.h
+++ b/imap/sync_log.h
@@ -52,79 +52,79 @@ void sync_log_init(void);
 void sync_log_suppress(void);
 void sync_log_done(void);
 
-void sync_log(const char *fmt, ...);
-void sync_log_channel(const char *channel, const char *fmt, ...);
+void sync_log(const char* shard, const char *fmt, ...);
+void sync_log_channel(const char* shard, const char *channel, const char *fmt, ...);
 
 #define sync_log_user(user) \
-    sync_log("USER %s\n", user)
+    sync_log(user, "USER %s\n", user)
 
 #define sync_log_unuser(user) \
-    sync_log("UNUSER %s\n", user)
+    sync_log(user, "UNUSER %s\n", user)
 
 #define sync_log_sieve(user) \
-    sync_log("META %s\n", user)
+    sync_log(user, "META %s\n", user)
 
 #define sync_log_append(name) \
-    sync_log("APPEND %s\n", name)
+    sync_log(name, "APPEND %s\n", name)
 
 #define sync_log_mailbox(name) \
-    sync_log("MAILBOX %s\n", name)
+    sync_log(name, "MAILBOX %s\n", name)
 
 #define sync_log_unmailbox(name) \
-    sync_log("UNMAILBOX %s\n", name)
+    sync_log(name, "UNMAILBOX %s\n", name)
 
 #define sync_log_mailbox_double(name1, name2) \
-    sync_log("MAILBOX %s\nMAILBOX %s\n", name1, name2)
+    sync_log(name1, "MAILBOX %s\nMAILBOX %s\n", name1, name2)
 
 #define sync_log_quota(name) \
-    sync_log("QUOTA %s\n", name)
+    sync_log(name, "QUOTA %s\n", name)
 
 #define sync_log_annotation(name) \
-    sync_log("ANNOTATION %s\n", name)
+    sync_log(name, "ANNOTATION %s\n", name)
 
 #define sync_log_seen(user, name) \
-    sync_log("SEEN %s %s\n", user, name)
+    sync_log(user, "SEEN %s %s\n", user, name)
 
 #define sync_log_subscribe(user, name) \
-    sync_log("SUB %s %s\n", user, name)
+    sync_log(user, "SUB %s %s\n", user, name)
 
 #define sync_log_channel_user(channel, user) \
-    sync_log_channel(channel, "USER %s\n", user)
+    sync_log_channel(user, channel, "USER %s\n", user)
 
 #define sync_log_channel_unuser(channel, user) \
-    sync_log_channel(channel, "UNUSER %s\n", user)
+    sync_log_channel(user, channel, "UNUSER %s\n", user)
 
 #define sync_log_channel_sieve(channel, user) \
-    sync_log_channel(channel, "META %s\n", user)
+    sync_log_channel(user, channel, "META %s\n", user)
 
 #define sync_log_channel_append(channel, name) \
-    sync_log_channel(channel, "APPEND %s\n", name)
+    sync_log_channel(user, channel, "APPEND %s\n", name)
 
 #define sync_log_channel_mailbox(channel, name) \
-    sync_log_channel(channel, "MAILBOX %s\n", name)
+    sync_log_channel(name, channel, "MAILBOX %s\n", name)
 
 #define sync_log_channel_unmailbox(channel, name) \
-    sync_log_channel(channel, "UNMAILBOX %s\n", name)
+    sync_log_channel(name, channel, "UNMAILBOX %s\n", name)
 
 #define sync_log_channel_mailbox_double(channel, name1, name2) \
-    sync_log_channel(channel, "MAILBOX %s\nMAILBOX %s\n", name1, name2)
+    sync_log_channel(name1, channel, "MAILBOX %s\nMAILBOX %s\n", name1, name2)
 
 #define sync_log_channel_quota(channel, name) \
-    sync_log_channel(channel, "QUOTA %s\n", name)
+    sync_log_channel(name, channel, "QUOTA %s\n", name)
 
 #define sync_log_channel_annotation(channel, name) \
-    sync_log_channel(channel, "ANNOTATION %s\n", name)
+    sync_log_channel(name, channel, "ANNOTATION %s\n", name)
 
 #define sync_log_channel_seen(channel, user, name) \
-    sync_log_channel(channel, "SEEN %s %s\n", user, name)
+    sync_log_channel(user, channel, "SEEN %s %s\n", user, name)
 
 #define sync_log_channel_subscribe(channel, user, name) \
-    sync_log_channel(channel, "SUB %s %s\n", user, name)
+    sync_log_channel(user, channel, "SUB %s %s\n", user, name)
 
 /* read-side sync log code */
 typedef struct sync_log_reader sync_log_reader_t;
 
-sync_log_reader_t *sync_log_reader_create_with_channel(const char *channel);
+sync_log_reader_t *sync_log_reader_create_with_channel(const char *channel, int shard);
 sync_log_reader_t *sync_log_reader_create_with_filename(const char *filename);
 sync_log_reader_t *sync_log_reader_create_with_fd(int fd);
 void sync_log_reader_free(sync_log_reader_t *slr);
diff --git a/lib/imapoptions b/lib/imapoptions
index e86300061..60d838e43 100644
--- a/lib/imapoptions
+++ b/lib/imapoptions
@@ -2107,6 +2107,9 @@ product version in the capabilities
 /* Enable replication action logging by sync_server as well, allowing
    chaining of replicas.  Use this on 'B' for A => B => C replication layout */
 
+{ "sync_log_shards", 4, INT }
+/* Specifies how much shards of the replication log files will be created. */
+
 { "sync_log_channels", NULL, STRING }
 /* If specified, log all events to multiple log files in directories
    specified by each "channel".  Each channel can then be processed




[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic