[prev in list] [next in list] [prev in thread] [next in thread]
List: cyrus-devel
Subject: [RFC] multiplexing cyrus replication with log/log-run sharding & multiple sync_client
From: Thomas Cataldo <thomas.cataldo () bluemind ! net>
Date: 2019-11-21 10:50:46
Message-ID: 62498D72-A5C0-4D9C-B675-3E263B6C66B7 () bluemind ! net
[Download RAW message or body]
Hi,
In our workload, cyrus replication latency is pretty critical as we serve most read \
requests from the replica. Having a single network channel between master & replica \
is a big issue for us.
Trying to improve our latency, we implemented the following approach : instead of \
writing "channel/log" we write "channel/log.<shard_index>". We compute our shard key \
this way :
# cat log.0
APPEND devenv.blue!user.tom.Sent
MAILBOX devenv.blue!user.tom.Sent
# cat log.2
SEEN tom@devenv.blue 9f799278-a6cd-45b7-9546-0e861d5e15d6
root@bm1804:/var/lib/cyrus/sync/core# cat log.3
…
APPEND devenv.blue!user.sga
MAILBOX devenv.blue!user.sga
We compute an hashcode of the first argument. We normalize it so \
devenv.blue!user.tom.Sent and devenv.blue!user.tom have the same hashcode then we \
"hashcode % shard_count" to figure out which log file to use. We patched sync_client \
to add a "-i <shard_index>". sync_client -i 0 will process log.0 and use log-run.0, \
etc.
We don't spawn sync_client from cyrus.conf but we prefer systemd tricks :
/lib/systemd/system/bm-cyrus-syncclient@.service which is a template and we then \
enable : systemctl enable bm-cyrus-syncclient@{0..3} to spawn 4 sync_client.
Attached diff of what we changed.
As a side note, our usage forbids moving a mailbox folder into another mailbox (ie. \
moving user.tom.titi into user.sga.stuff is forbidden in our setup). I guess this \
approach would be problematic we moving a mailbox subfolder to another mailbox as \
they might be sharded to separate log files.
Any feedback on this approach ? I read that you planned to turn sync_client into a \
sync daemon. Any schedule estimate on that ?
Regards,
Thomas.
sync_client systemd configuration template :
/lib/systemd/system/bm-cyrus-syncclient@.service (%i is expanded to 42 by systemd \
when you enable syncclient@42) [Unit]
Description=BlueMind Cyrus sync_client service
After=bm-cyrus-imapd.service
PartOf=bm-cyrus-imapd.service
ConditionPathExists=!/etc/bm/bm-cyrus-imapd.disabled
[Service]
Type=forking
Environment=CONF=/etc/imapd.conf
ExecStartPre=/usr/bin/find /var/lib/cyrus/sync -name ‘log*.%i' -type f -exec rm -f \
{} \; ExecStart=/usr/sbin/sync_client -C $CONF -t 1800 -n core -i %i -l -r
SuccessExitStatus=75
RemainAfterExit=no
Restart=always
RestartSec=5s
TimeoutStopSec=20s
[Install]
WantedBy=bm-cyrus-imapd.service
Thomas Cataldo
Directeur Technique
(+33) 6 42 25 91 38
BlueMind
+33 (0)5 81 91 55 60
Hotel des Télécoms, 40 rue du village d'entreprises
31670 Labège, France
www.bluemind.net / https://blog.bluemind.net/fr/
["replication_multiplexing.diff" (replication_multiplexing.diff)]
commit 99c0965ffe737f1ccb4f8f10d584cbc63cfa8cdf
Author: Thomas Cataldo <thomas.cataldo@blue-mind.net>
Date: Mon Nov 18 18:55:54 2019 +0100
[replication] FEATBL-971 Feat: shard cyrus replication using a hashcode on mailbox names
diff --git a/imap/cyr_synclog.c b/imap/cyr_synclog.c
index c1e151cbf..b0e50e36d 100644
--- a/imap/cyr_synclog.c
+++ b/imap/cyr_synclog.c
@@ -82,6 +82,7 @@ int main(int argc, char *argv[])
char *alt_config = NULL;
char cmd = '\0';
int opt;
+ int shard = "dummy";
if ((geteuid()) == 0 && (become_cyrus(/*is_master*/0) != 0)) {
fatal("must run as the Cyrus user", EC_USAGE);
@@ -173,7 +174,7 @@ int main(int argc, char *argv[])
break;
default:
/* just as is! */
- sync_log(argv[optind]);
+ sync_log("dummy", argv[optind]);
break;
}
diff --git a/imap/sync_client.c b/imap/sync_client.c
index 882928026..94425c569 100644
--- a/imap/sync_client.c
+++ b/imap/sync_client.c
@@ -584,7 +584,7 @@ enum {
RESTART_RECONNECT
};
-static int do_daemon_work(const char *channel, const char *sync_shutdown_file,
+static int do_daemon_work(const char *channel, int shard, const char *sync_shutdown_file,
unsigned long timeout, unsigned long min_delta,
int *restartp)
{
@@ -596,7 +596,7 @@ static int do_daemon_work(const char *channel, const char *sync_shutdown_file,
sync_log_reader_t *slr;
*restartp = RESTART_NONE;
- slr = sync_log_reader_create_with_channel(channel);
+ slr = sync_log_reader_create_with_channel(channel, shard);
session_start = time(NULL);
@@ -776,7 +776,7 @@ static void replica_disconnect(void)
backend_disconnect(sync_backend);
}
-static void do_daemon(const char *channel, const char *sync_shutdown_file,
+static void do_daemon(const char *channel, int shard, const char *sync_shutdown_file,
unsigned long timeout, unsigned long min_delta)
{
int r = 0;
@@ -786,7 +786,7 @@ static void do_daemon(const char *channel, const char *sync_shutdown_file,
while (restart) {
replica_connect(channel);
- r = do_daemon_work(channel, sync_shutdown_file,
+ r = do_daemon_work(channel, shard, sync_shutdown_file,
timeout, min_delta, &restart);
if (r) {
/* See if we're still connected to the server.
@@ -883,6 +883,7 @@ int main(int argc, char **argv)
int mode = MODE_UNKNOWN;
int wait = 0;
int timeout = 600;
+ int shard = 0;
int min_delta = 0;
const char *channel = NULL;
const char *sync_shutdown_file = NULL;
@@ -898,7 +899,7 @@ int main(int argc, char **argv)
setbuf(stdout, NULL);
- while ((opt = getopt(argc, argv, "C:vlLS:F:f:w:t:d:n:rRumsozOAp:")) != EOF) {
+ while ((opt = getopt(argc, argv, "C:vlLS:F:f:w:t:i:d:n:rRumsozOAp:")) != EOF) {
switch (opt) {
case 'C': /* alt config file */
alt_config = optarg;
@@ -941,6 +942,10 @@ int main(int argc, char **argv)
wait = atoi(optarg);
break;
+ case 'i': /* shard index */
+ shard = atoi(optarg);
+ break;
+
case 't':
timeout = atoi(optarg);
break;
@@ -1201,7 +1206,8 @@ int main(int argc, char **argv)
if (!min_delta)
min_delta = sync_get_intconfig(channel, "sync_repeat_interval");
- do_daemon(channel, sync_shutdown_file, timeout, min_delta);
+ syslog(LOG_INFO, "Running in daemon mode for channel %s shard-index %d", channel, shard);
+ do_daemon(channel, shard, sync_shutdown_file, timeout, min_delta);
}
break;
diff --git a/imap/sync_log.c b/imap/sync_log.c
index 4e4764be1..166044ccc 100644
--- a/imap/sync_log.c
+++ b/imap/sync_log.c
@@ -75,17 +75,21 @@ static int sync_log_suppressed = 0;
static strarray_t *channels = NULL;
static strarray_t *unsuppressable = NULL;
+static int shards = 4;
+
EXPORTED void sync_log_init(void)
{
const char *conf;
int i;
-
+ int shards;
/* sync_log_init() may be called more than once */
- if (channels) strarray_free(channels);
+ if (channels)
+ strarray_free(channels);
conf = config_getstring(IMAPOPT_SYNC_LOG_CHANNELS);
- if (!conf) conf = "\"\"";
+ if (!conf)
+ conf = "\"\"";
channels = strarray_split(conf, " ", 0);
/*
* The sysadmin can specify "" in the value of sync_log_channels to
@@ -102,6 +106,8 @@ EXPORTED void sync_log_init(void)
conf = config_getstring(IMAPOPT_SYNC_LOG_UNSUPPRESSABLE_CHANNELS);
if (conf)
unsuppressable = strarray_split(conf, " ", 0);
+
+ shards = config_getint(IMAPOPT_SYNC_LOG_SHARDS);
}
EXPORTED void sync_log_suppress(void)
@@ -118,16 +124,66 @@ EXPORTED void sync_log_done(void)
unsuppressable = NULL;
}
-static char *sync_log_fname(const char *channel)
+static int hashcode(const char *shard)
+{
+ int len = strlen(shard);
+ int hash = 0;
+ int i;
+ for (i = 0; i < len; i++)
+ {
+ hash = 31 * hash + shard[i];
+ }
+ return hash;
+}
+
+static char *normalize_shard(const char *shard)
+{
+ int suffix_cut = 0;
+ int total_len = strlen(shard);
+ char *mark = strrchr(shard, '!');
+ if (mark)
+ {
+ if (strncmp("user.", mark + 1, 5) == 0)
+ {
+ // prefixed by user.
+ char *sec_dot = strchr(mark + 6, '.');
+ if (sec_dot != NULL)
+ {
+ suffix_cut = strlen(sec_dot);
+ }
+ }
+ else
+ {
+ // not prefixed by user. aka mailshare
+ char *first_dot = strchr(mark + 1, '.');
+ if (first_dot)
+ {
+ suffix_cut = strlen(first_dot);
+ }
+ }
+ }
+ return strndup(shard, total_len - suffix_cut);
+}
+
+static int shard_index(const char* shard) {
+ char* norm = normalize_shard(shard);
+ int ret = abs(hashcode(norm) % shards);
+ free(norm);
+ return ret;
+}
+
+
+
+static char *sync_log_fname(const char *channel, int shard)
{
static char buf[MAX_MAILBOX_PATH];
if (channel)
snprintf(buf, MAX_MAILBOX_PATH,
- "%s/sync/%s/log", config_dir, channel);
+ "%s/sync/%s/log.%d", config_dir, channel, shard);
else
snprintf(buf, MAX_MAILBOX_PATH,
- "%s/sync/log", config_dir);
+ "%s/sync/log.%d", config_dir, shard);
return buf;
}
@@ -135,37 +191,42 @@ static char *sync_log_fname(const char *channel)
static int sync_log_enabled(const char *channel)
{
if (!config_getswitch(IMAPOPT_SYNC_LOG))
- return 0; /* entire mechanism is disabled */
+ return 0; /* entire mechanism is disabled */
if (!sync_log_suppressed)
- return 1; /* _suppress() wasn't called */
+ return 1; /* _suppress() wasn't called */
if (unsuppressable && strarray_find(unsuppressable, channel, 0) >= 0)
- return 1; /* channel is unsuppressable */
- return 0; /* suppressed */
+ return 1; /* channel is unsuppressable */
+ return 0; /* suppressed */
}
-static void sync_log_base(const char *channel, const char *string)
+static void sync_log_base(const char *channel, int shard, const char *string)
{
int fd;
struct stat sbuffile, sbuffd;
int retries = 0;
const char *fname;
- fname = sync_log_fname(channel);
+ fname = sync_log_fname(channel, shard);
- while (retries++ < SYNC_LOG_RETRIES) {
- fd = open(fname, O_WRONLY|O_APPEND|O_CREAT, 0640);
- if (fd < 0 && errno == ENOENT) {
- if (!cyrus_mkdir(fname, 0755)) {
- fd = open(fname, O_WRONLY|O_APPEND|O_CREAT, 0640);
+ while (retries++ < SYNC_LOG_RETRIES)
+ {
+ fd = open(fname, O_WRONLY | O_APPEND | O_CREAT, 0640);
+ if (fd < 0 && errno == ENOENT)
+ {
+ if (!cyrus_mkdir(fname, 0755))
+ {
+ fd = open(fname, O_WRONLY | O_APPEND | O_CREAT, 0640);
}
}
- if (fd < 0) {
+ if (fd < 0)
+ {
syslog(LOG_ERR, "sync_log(): Unable to write to log file %s: %s",
fname, strerror(errno));
return;
}
- if (lock_blocking(fd, fname) == -1) {
+ if (lock_blocking(fd, fname) == -1)
+ {
syslog(LOG_ERR, "sync_log(): Failed to lock %s for %s: %m",
fname, string);
xclose(fd);
@@ -181,7 +242,8 @@ static void sync_log_base(const char *channel, const char *string)
lock_unlock(fd, fname);
xclose(fd);
}
- if (retries >= SYNC_LOG_RETRIES) {
+ if (retries >= SYNC_LOG_RETRIES)
+ {
xclose(fd);
syslog(LOG_ERR,
"sync_log(): Failed to lock %s for %s after %d attempts",
@@ -200,7 +262,7 @@ static void sync_log_base(const char *channel, const char *string)
static const char *sync_quote_name(const char *name)
{
- static char buf[MAX_MAILBOX_BUFFER+3]; /* "x2 plus \0 */
+ static char buf[MAX_MAILBOX_BUFFER + 3]; /* "x2 plus \0 */
char c;
int src;
int dst = 0;
@@ -210,24 +272,28 @@ static const char *sync_quote_name(const char *name)
buf[dst++] = '"';
/* degenerate case - no name is the empty string, quote it */
- if (!name || !*name) {
+ if (!name || !*name)
+ {
need_quote = 1;
goto end;
}
- for (src = 0; name[src]; src++) {
+ for (src = 0; name[src]; src++)
+ {
c = name[src];
if ((c == '\r') || (c == '\n'))
fatal("Illegal line break in folder name", EC_IOERR);
/* quoteable characters */
- if ((c == '\\') || (c == '\"') || (c == '{') || (c == '}')) {
+ if ((c == '\\') || (c == '\"') || (c == '{') || (c == '}'))
+ {
need_quote = 1;
buf[dst++] = '\\';
}
/* non-atom characters */
- else if ((c == ' ') || (c == '\t') || (c == '(') || (c == ')')) {
+ else if ((c == ' ') || (c == '\t') || (c == '(') || (c == ')'))
+ {
need_quote = 1;
}
@@ -238,12 +304,14 @@ static const char *sync_quote_name(const char *name)
}
end:
- if (need_quote) {
+ if (need_quote)
+ {
buf[dst++] = '\"';
buf[dst] = '\0';
return buf;
}
- else {
+ else
+ {
buf[dst] = '\0';
return buf + 1; /* skip initial quote */
}
@@ -253,26 +321,29 @@ end:
static char *va_format(const char *fmt, va_list ap)
{
- static char buf[BUFSIZE+1];
+ static char buf[BUFSIZE + 1];
size_t len;
int ival;
const char *sval;
const char *p;
- for (len = 0, p = fmt; *p && len < BUFSIZE; p++) {
- if (*p != '%') {
+ for (len = 0, p = fmt; *p && len < BUFSIZE; p++)
+ {
+ if (*p != '%')
+ {
buf[len++] = *p;
continue;
}
- switch (*++p) {
+ switch (*++p)
+ {
case 'd':
ival = va_arg(ap, int);
- len += snprintf(buf+len, BUFSIZE-len, "%d", ival);
+ len += snprintf(buf + len, BUFSIZE - len, "%d", ival);
break;
case 's':
sval = va_arg(ap, const char *);
sval = sync_quote_name(sval);
- strlcpy(buf+len, sval, BUFSIZE-len);
+ strlcpy(buf + len, sval, BUFSIZE - len);
len += strlen(sval);
break;
default:
@@ -281,32 +352,35 @@ static char *va_format(const char *fmt, va_list ap)
}
}
- if (buf[len-1] != '\n') buf[len++] = '\n';
+ if (buf[len - 1] != '\n')
+ buf[len++] = '\n';
buf[len] = '\0';
return buf;
}
-EXPORTED void sync_log(const char *fmt, ...)
+EXPORTED void sync_log(const char *shard, const char *fmt, ...)
{
va_list ap;
const char *val;
int i;
- if (!channels) return;
+ if (!channels)
+ return;
va_start(ap, fmt);
val = va_format(fmt, ap);
va_end(ap);
- for (i = 0 ; i < channels->count ; i++) {
+ for (i = 0; i < channels->count; i++)
+ {
const char *channel = channels->data[i];
if (sync_log_enabled(channel))
- sync_log_base(channel, val);
+ sync_log_base(channel, shard_index(shard), val);
}
}
-EXPORTED void sync_log_channel(const char *channel, const char *fmt, ...)
+EXPORTED void sync_log_channel(const char *shard, const char *channel, const char *fmt, ...)
{
va_list ap;
const char *val;
@@ -315,7 +389,7 @@ EXPORTED void sync_log_channel(const char *channel, const char *fmt, ...)
val = va_format(fmt, ap);
va_end(ap);
- sync_log_base(channel, val);
+ sync_log_base(channel, shard_index(shard), val);
}
/*
@@ -368,16 +442,16 @@ static sync_log_reader_t *sync_log_reader_alloc(void)
* Returns a new object which must be freed with sync_log_reader_free().
* Does not return NULL.
*/
-EXPORTED sync_log_reader_t *sync_log_reader_create_with_channel(const char *channel)
+EXPORTED sync_log_reader_t *sync_log_reader_create_with_channel(const char *channel, int shard)
{
sync_log_reader_t *slr = sync_log_reader_alloc();
struct buf buf = BUF_INITIALIZER;
- slr->log_file = xstrdup(sync_log_fname(channel));
+ slr->log_file = xstrdup(sync_log_fname(channel, shard));
/* Create a work log filename. We will process this
* first if it exists */
- buf_printf(&buf, "%s-run", slr->log_file);
+ buf_printf(&buf, "%s-run.%d", slr->log_file, shard);
slr->work_file = buf_release(&buf);
return slr;
@@ -416,9 +490,12 @@ EXPORTED sync_log_reader_t *sync_log_reader_create_with_fd(int fd)
*/
EXPORTED void sync_log_reader_free(sync_log_reader_t *slr)
{
- if (!slr) return;
- if (slr->input) prot_free(slr->input);
- if (slr->fd_is_ours && slr->fd >= 0) close(slr->fd);
+ if (!slr)
+ return;
+ if (slr->input)
+ prot_free(slr->input);
+ if (slr->fd_is_ours && slr->fd >= 0)
+ close(slr->fd);
free(slr->log_file);
free(slr->work_file);
buf_free(&slr->type);
@@ -446,47 +523,57 @@ EXPORTED int sync_log_reader_begin(sync_log_reader_t *slr)
struct stat sbuf;
int r;
- if (slr->input) {
+ if (slr->input)
+ {
r = sync_log_reader_end(slr);
- if (r) return r;
+ if (r)
+ return r;
}
- if (stat(slr->work_file, &sbuf) == 0) {
+ if (stat(slr->work_file, &sbuf) == 0)
+ {
/* Existing work log file - process this first */
syslog(LOG_NOTICE,
"Reprocessing sync log file %s", slr->work_file);
}
- else if (!slr->log_file) {
+ else if (!slr->log_file)
+ {
syslog(LOG_ERR, "Failed to stat %s: %m",
slr->log_file);
return IMAP_IOERROR;
}
- else {
+ else
+ {
/* Check for sync_log file */
- if (stat(slr->log_file, &sbuf) < 0) {
+ if (stat(slr->log_file, &sbuf) < 0)
+ {
if (errno == ENOENT)
- return IMAP_AGAIN; /* no problem, try again later */
+ return IMAP_AGAIN; /* no problem, try again later */
syslog(LOG_ERR, "Failed to stat %s: %m",
slr->log_file);
return IMAP_IOERROR;
}
/* Move sync_log to our work file */
- if (rename(slr->log_file, slr->work_file) < 0) {
+ if (rename(slr->log_file, slr->work_file) < 0)
+ {
syslog(LOG_ERR, "Rename %s -> %s failed: %m",
slr->log_file, slr->work_file);
return IMAP_IOERROR;
}
}
- if (slr->fd < 0) {
+ if (slr->fd < 0)
+ {
int fd = open(slr->work_file, O_RDWR, 0);
- if (fd < 0) {
+ if (fd < 0)
+ {
syslog(LOG_ERR, "Failed to open %s: %m", slr->work_file);
return IMAP_IOERROR;
}
- if (lock_blocking(fd, slr->work_file) < 0) {
+ if (lock_blocking(fd, slr->work_file) < 0)
+ {
syslog(LOG_ERR, "Failed to lock %s: %m", slr->work_file);
close(fd);
return IMAP_IOERROR;
@@ -502,7 +589,7 @@ EXPORTED int sync_log_reader_begin(sync_log_reader_t *slr)
lock_unlock(slr->fd, slr->work_file);
}
- slr->input = prot_new(slr->fd, /*write*/0);
+ slr->input = prot_new(slr->fd, /*write*/ 0);
return 0;
}
@@ -522,23 +609,27 @@ EXPORTED int sync_log_reader_end(sync_log_reader_t *slr)
if (!slr->input)
return 0;
- if (slr->input) {
+ if (slr->input)
+ {
prot_free(slr->input);
slr->input = NULL;
}
- if (slr->fd_is_ours && slr->fd >= 0) {
+ if (slr->fd_is_ours && slr->fd >= 0)
+ {
lock_unlock(slr->fd, slr->work_file);
close(slr->fd);
slr->fd = -1;
}
- if (slr->log_file) {
+ if (slr->log_file)
+ {
/* We were initialised with a sync log channel, whose
* log file we rename()d to the work file. Now that
* we've done with the work file we can unlink it.
* Further checks at this point are just paranoia. */
- if (slr->work_file && unlink(slr->work_file) < 0) {
+ if (slr->work_file && unlink(slr->work_file) < 0)
+ {
syslog(LOG_ERR, "Unlink %s failed: %m", slr->work_file);
return IMAP_IOERROR;
}
@@ -566,32 +657,40 @@ EXPORTED int sync_log_reader_getitem(sync_log_reader_t *slr,
if (!slr->input)
return EOF;
- for (;;) {
+ for (;;)
+ {
if ((c = getword(slr->input, &slr->type)) == EOF)
return EOF;
/* Ignore blank lines */
- if (c == '\r') c = prot_getc(slr->input);
+ if (c == '\r')
+ c = prot_getc(slr->input);
if (c == '\n')
continue;
- if (c != ' ') {
+ if (c != ' ')
+ {
syslog(LOG_ERR, "Invalid input");
eatline(slr->input, c);
continue;
}
- if ((c = getastring(slr->input, 0, &slr->arg1)) == EOF) return EOF;
+ if ((c = getastring(slr->input, 0, &slr->arg1)) == EOF)
+ return EOF;
arg1s = slr->arg1.s;
arg2s = NULL;
- if (c == ' ') {
- if ((c = getastring(slr->input, 0, &slr->arg2)) == EOF) return EOF;
+ if (c == ' ')
+ {
+ if ((c = getastring(slr->input, 0, &slr->arg2)) == EOF)
+ return EOF;
arg2s = slr->arg2.s;
}
- if (c == '\r') c = prot_getc(slr->input);
- if (c != '\n') {
+ if (c == '\r')
+ c = prot_getc(slr->input);
+ if (c != '\n')
+ {
syslog(LOG_ERR, "Garbage at end of input line");
eatline(slr->input, c);
continue;
diff --git a/imap/sync_log.h b/imap/sync_log.h
index 0dcc0a203..70a8be507 100644
--- a/imap/sync_log.h
+++ b/imap/sync_log.h
@@ -52,79 +52,79 @@ void sync_log_init(void);
void sync_log_suppress(void);
void sync_log_done(void);
-void sync_log(const char *fmt, ...);
-void sync_log_channel(const char *channel, const char *fmt, ...);
+void sync_log(const char* shard, const char *fmt, ...);
+void sync_log_channel(const char* shard, const char *channel, const char *fmt, ...);
#define sync_log_user(user) \
- sync_log("USER %s\n", user)
+ sync_log(user, "USER %s\n", user)
#define sync_log_unuser(user) \
- sync_log("UNUSER %s\n", user)
+ sync_log(user, "UNUSER %s\n", user)
#define sync_log_sieve(user) \
- sync_log("META %s\n", user)
+ sync_log(user, "META %s\n", user)
#define sync_log_append(name) \
- sync_log("APPEND %s\n", name)
+ sync_log(name, "APPEND %s\n", name)
#define sync_log_mailbox(name) \
- sync_log("MAILBOX %s\n", name)
+ sync_log(name, "MAILBOX %s\n", name)
#define sync_log_unmailbox(name) \
- sync_log("UNMAILBOX %s\n", name)
+ sync_log(name, "UNMAILBOX %s\n", name)
#define sync_log_mailbox_double(name1, name2) \
- sync_log("MAILBOX %s\nMAILBOX %s\n", name1, name2)
+ sync_log(name1, "MAILBOX %s\nMAILBOX %s\n", name1, name2)
#define sync_log_quota(name) \
- sync_log("QUOTA %s\n", name)
+ sync_log(name, "QUOTA %s\n", name)
#define sync_log_annotation(name) \
- sync_log("ANNOTATION %s\n", name)
+ sync_log(name, "ANNOTATION %s\n", name)
#define sync_log_seen(user, name) \
- sync_log("SEEN %s %s\n", user, name)
+ sync_log(user, "SEEN %s %s\n", user, name)
#define sync_log_subscribe(user, name) \
- sync_log("SUB %s %s\n", user, name)
+ sync_log(user, "SUB %s %s\n", user, name)
#define sync_log_channel_user(channel, user) \
- sync_log_channel(channel, "USER %s\n", user)
+ sync_log_channel(user, channel, "USER %s\n", user)
#define sync_log_channel_unuser(channel, user) \
- sync_log_channel(channel, "UNUSER %s\n", user)
+ sync_log_channel(user, channel, "UNUSER %s\n", user)
#define sync_log_channel_sieve(channel, user) \
- sync_log_channel(channel, "META %s\n", user)
+ sync_log_channel(user, channel, "META %s\n", user)
#define sync_log_channel_append(channel, name) \
- sync_log_channel(channel, "APPEND %s\n", name)
+ sync_log_channel(user, channel, "APPEND %s\n", name)
#define sync_log_channel_mailbox(channel, name) \
- sync_log_channel(channel, "MAILBOX %s\n", name)
+ sync_log_channel(name, channel, "MAILBOX %s\n", name)
#define sync_log_channel_unmailbox(channel, name) \
- sync_log_channel(channel, "UNMAILBOX %s\n", name)
+ sync_log_channel(name, channel, "UNMAILBOX %s\n", name)
#define sync_log_channel_mailbox_double(channel, name1, name2) \
- sync_log_channel(channel, "MAILBOX %s\nMAILBOX %s\n", name1, name2)
+ sync_log_channel(name1, channel, "MAILBOX %s\nMAILBOX %s\n", name1, name2)
#define sync_log_channel_quota(channel, name) \
- sync_log_channel(channel, "QUOTA %s\n", name)
+ sync_log_channel(name, channel, "QUOTA %s\n", name)
#define sync_log_channel_annotation(channel, name) \
- sync_log_channel(channel, "ANNOTATION %s\n", name)
+ sync_log_channel(name, channel, "ANNOTATION %s\n", name)
#define sync_log_channel_seen(channel, user, name) \
- sync_log_channel(channel, "SEEN %s %s\n", user, name)
+ sync_log_channel(user, channel, "SEEN %s %s\n", user, name)
#define sync_log_channel_subscribe(channel, user, name) \
- sync_log_channel(channel, "SUB %s %s\n", user, name)
+ sync_log_channel(user, channel, "SUB %s %s\n", user, name)
/* read-side sync log code */
typedef struct sync_log_reader sync_log_reader_t;
-sync_log_reader_t *sync_log_reader_create_with_channel(const char *channel);
+sync_log_reader_t *sync_log_reader_create_with_channel(const char *channel, int shard);
sync_log_reader_t *sync_log_reader_create_with_filename(const char *filename);
sync_log_reader_t *sync_log_reader_create_with_fd(int fd);
void sync_log_reader_free(sync_log_reader_t *slr);
diff --git a/lib/imapoptions b/lib/imapoptions
index e86300061..60d838e43 100644
--- a/lib/imapoptions
+++ b/lib/imapoptions
@@ -2107,6 +2107,9 @@ product version in the capabilities
/* Enable replication action logging by sync_server as well, allowing
chaining of replicas. Use this on 'B' for A => B => C replication layout */
+{ "sync_log_shards", 4, INT }
+/* Specifies how much shards of the replication log files will be created. */
+
{ "sync_log_channels", NULL, STRING }
/* If specified, log all events to multiple log files in directories
specified by each "channel". Each channel can then be processed
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic